Administrator
2023-08-21 a019311b0edee6df82a8ec7c0b28b06b22aa4d31
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# -*- coding: utf-8 -*-
"""
交易API
"""
import json
import logging
import random
import threading
import time
 
import crypt
from huaxin_client import socket_util
 
 
class ClientSocketManager:
    # 客户端类型
    CLIENT_TYPE_TRADE = "trade"
    CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
    CLIENT_TYPE_DEAL_LIST = "deal_list"
    CLIENT_TYPE_POSITION_LIST = "position_list"
    CLIENT_TYPE_MONEY = "money"
    CLIENT_TYPE_DEAL = "deal"
    CLIENT_TYPE_CMD_L2 = "l2_cmd"
    socket_client_dict = {}
    socket_client_lock_dict = {}
    active_client_dict = {}
 
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_TRADE:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
            cls.socket_client_dict[_type].append((rid, sk))
            cls.socket_client_lock_dict[rid] = threading.Lock()
        else:
            cls.socket_client_dict[_type] = (rid, sk)
            cls.socket_client_lock_dict[rid] = threading.Lock()
 
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_TRADE:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
                                                                                                                          0] in cls.active_client_dict else 0,
                                     reverse=True)
                for d in client_list:
                    if d[0] in cls.socket_client_lock_dict:
                        try:
                            if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                                return d
                        except threading.TimeoutError:
                            pass
        else:
            if _type in cls.socket_client_dict:
                try:
                    d = cls.socket_client_dict[_type]
                    if d[0] in cls.socket_client_lock_dict:
                        if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                            return d
                except threading.TimeoutError:
                    pass
        return None
 
    @classmethod
    def release_client(cls, rid):
        if rid in cls.socket_client_lock_dict:
            # 释放锁
            cls.socket_client_lock_dict[rid].release()
 
    @classmethod
    def del_client(cls, rid):
        # 删除线程锁
        if rid in cls.socket_client_lock_dict:
            cls.socket_client_lock_dict.pop(rid)
        # 删除sk
        for t in cls.socket_client_dict:
            if type(cls.socket_client_dict[t]) == list:
                for d in cls.socket_client_dict[t]:
                    if d[0] == rid:
                        cls.socket_client_dict[t].remove(d)
                        break
 
            elif type(cls.socket_client_dict[t]) == tuple:
                if cls.socket_client_dict[t][0] == rid:
                    cls.socket_client_dict.pop(t)
                    break
 
    # 心跳信息
    @classmethod
    def heart(cls, rid):
        cls.active_client_dict[rid] = time.time()
 
 
TRADE_DIRECTION_BUY = 1
TRADE_DIRECTION_SELL = 2
 
# 超时时间2s
TIMEOUT = 2.0
 
# 等待响应的request_id
__request_response_dict = {}
 
 
def __get_request_id(type):
    return f"{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
 
 
# 网络请求
def __request(_type, data):
    client = ClientSocketManager.acquire_client(_type)
    if not client:
        raise Exception("无可用的交易client")
    try:
        request_id = __get_request_id(_type)
        root_data = {"type": "cmd",
                     "data": data,
                     "request_id": request_id}
        str_list = []
        for k in root_data:
            if type(root_data[k]) == dict:
                str_list.append(f"{k}={json.dumps(root_data[k])}")
            else:
                str_list.append(f"{k}={root_data[k]}")
        str_list.sort()
        str_list.append("%Yeshi2014@#.")
        root_data["sign"] = crypt.md5_encrypt("&".join(str_list))
        print("请求前对象", root_data)
        # 添加请求头
        client[1].sendall(socket_util.load_header(json.dumps(root_data).encode(encoding='utf-8')))
        result = client[1].recv(1024)
        print("请求发送成功", result.decode(encoding='utf-8'))
    except BrokenPipeError as e:
        ClientSocketManager.del_client(client[0])
        raise e
    except Exception as e:
        logging.exception(e)
        raise e
    return request_id, client
 
 
def __read_response(client, request_id, blocking, timeout=TIMEOUT):
    if blocking:
        start_time = time.time()
        while True:
            time.sleep(0.01)
            if request_id in __request_response_dict:
                # 获取到了响应内容
                result = __request_response_dict.pop(request_id)
                return result
            if time.time() - start_time > timeout:
                ClientSocketManager.release_client(client[0])
                raise Exception(f"读取内容超时: request_id={request_id}")
    return None
 
 
def set_response(client_id, request_id, response):
    if client_id and request_id:
        # 主动触发
        ClientSocketManager.release_client(client_id)
        __request_response_dict[request_id] = response
    else:
        # 被动触发
        pass
 
 
# 下单委托
# direction 1-买  2-卖
# code:代码
# volume:交易量
# price:价格(如果是卖时不传价格就按照5挡价卖)
# blocking是否阻塞进程
def order(direction, code, volume, price, price_type=2, blocking=True):
    print("客户端", ClientSocketManager.socket_client_dict)
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                                   {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
                                    "direction": direction,
                                    "code": code,
                                    "volume": volume,
                                    "price_type": price_type,
                                    "price": price, "sinfo": f"b_{code}_{round(time.time() * 1000)}"})
 
    return __read_response(client, request_id, blocking)
 
 
def cancel_order(direction, code, orderSysID, blocking=False):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                                   {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
                                    "direction": direction,
                                    "code": code,
                                    "orderSysID": orderSysID, "sinfo": f"cb_{code}_{round(time.time() * 1000)}"})
 
    return __read_response(client, request_id, blocking)
 
 
# CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
# CLIENT_TYPE_DEAL_LIST = "deal_list"
# CLIENT_TYPE_POSITION_LIST = "position_list"
# CLIENT_TYPE_MONEY = "money"
# CLIENT_TYPE_DEAL = "deal"
 
# 获取委托列表
# can_cancel:是否可以撤
def get_delegate_list(can_cancel=True, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
                                   {"type": ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
                                    "can_cancel": 1 if can_cancel else 0})
 
    return __read_response(client, request_id, blocking)
 
 
# 获取成交列表
def get_deal_list(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
                                   {"type": ClientSocketManager.CLIENT_TYPE_DEAL_LIST})
    return __read_response(client, request_id, blocking)
 
 
# 获取持仓列表
def get_position_list(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_POSITION_LIST,
                                   {"type": ClientSocketManager.CLIENT_TYPE_POSITION_LIST})
    return __read_response(client, request_id, blocking)
 
 
# 获取账户资金状况
def get_money(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_MONEY,
                                   {"type": ClientSocketManager.CLIENT_TYPE_MONEY})
    return __read_response(client, request_id, blocking)
 
 
# 设置L2订阅数据
def set_l2_codes_data(codes_data, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                   {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": codes_data})
    return __read_response(client, request_id, blocking)
 
 
if __name__ == "__main__":
    d = {"id": "123123"}
    print(d.pop("id"))