Administrator
2023-08-23 f273791e2337215a2a3bd7e3c46c23c69bcb1c7c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# -*- coding: utf-8 -*-
import json
import logging
import queue
import random
import threading
import time
from huaxin_client import socket_util
 
from huaxin_client.client_network import SendResponseSkManager
 
# 活动时间
from log_module.log import logger_local_huaxin_l2_error
 
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
# 临时数据
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
common_queue = queue.Queue()
trading_canceled_queue = queue.Queue()
# 买入订单号的字典
buy_order_nos_dict = {}
# 最近的大单成交单号
latest_big_order_transaction_order_dict = {}
 
 
# 获取最近的大单成交订单号
def get_latest_transaction_order_no(code):
    return latest_big_order_transaction_order_dict.get(code)
 
 
# 正在成交的订单撤单了
def trading_order_canceled(code_, order_no):
    trading_canceled_queue.put((code_, order_no))
 
 
# 添加委托详情
def add_l2_order_detail(data, istransaction=False):
    code = data["SecurityID"]
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    if istransaction:
        pass
    else:
        pass
    # 原来的格式
    # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
    #                 "Volume": pOrderDetail['Volume'],
    #                 "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
    #                 "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
    #                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
    #                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
    if data['Side'] == "1":
        # 记录所有买入的订单号
        if data['SecurityID'] not in buy_order_nos_dict:
            buy_order_nos_dict[data['SecurityID']] = set()
        buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
 
    tmep_order_detail_queue_dict[code].put(
        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], int(time.time()*1000)))
 
 
# 添加逐笔成交
def add_transaction_detail(data):
    code = data["SecurityID"]
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    # 原来的格式
    #  item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
    #                     "TradeVolume": pTransaction['TradeVolume'],
    #                     "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
    #                     "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'],
    #                     "ExecType": pTransaction['ExecType'].decode()}
 
    # 判断是否为大单成交
    code = data['SecurityID']
    if code in buy_order_nos_dict:
        if data['BuyNo'] in buy_order_nos_dict[code]:
            latest_big_order_transaction_order_dict[code] = data['BuyNo']
    tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                           data['SellNo'], data['ExecType']))
 
 
def add_market_data(data):
    code = data['securityID']
    # 加入上传队列
    common_queue.put((code, "l2_market_data", data))
 
 
def add_subscript_codes(codes):
    print("add_subscript_codes", codes)
    # 加入上传队列
    common_queue.put(('', "l2_subscript_codes", list(codes)))
 
 
def __send_response(sk, msg):
    msg = socket_util.load_header(msg)
    sk.sendall(msg)
    result, header_str = socket_util.recv_data(sk)
    if result:
        result_json = json.loads(result)
        if result_json.get("code") == 0:
            return True
    return False
 
 
# 发送消息
def send_response(type, msg):
    try:
        sk = SendResponseSkManager.get_send_response_sk(type)
        if __send_response(sk, msg):
            return True
        else:
            # 再次发送
            print("再次发送")
            return __send_response(sk, msg)
    except ConnectionResetError as e:
        SendResponseSkManager.del_send_response_sk(type)
        sk = SendResponseSkManager.get_send_response_sk(type)
        return __send_response(sk, msg)
    except BrokenPipeError as e:
        SendResponseSkManager.del_send_response_sk(type)
        sk = SendResponseSkManager.get_send_response_sk(type)
        return __send_response(sk, msg)
 
 
# 上传数据
def upload_data(code, _type, datas):
    uid = random.randint(0, 100000)
    key = f"{_type}_{code}"
    fdata = json.dumps(
        {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
    # print("数据长度:", len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}", )
    # print("请求开始", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}")
    result = None
    start_time = time.time()
    # logger_local_huaxin_l2_upload.info(f"{code} 上传数据开始-{_type}")
    try:
        result = send_response(key, fdata.encode('utf-8'))
    except Exception as e:
        logging.exception(e)
    finally:
        pass
        # print("请求结束", uid, result)
        # logger_local_huaxin_l2_upload.info(
        #     f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}")
    # print("上传结果", result)
 
 
# 循环读取上传数据
def __run_upload_order(code):
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    while True:
        # print("order task")
        try:
            if code not in target_codes:
                break
            order_detail_upload_active_time_dict[code] = time.time()
            udatas = []
            while not tmep_order_detail_queue_dict[code].empty():
                temp = tmep_order_detail_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                upload_data(code, "l2_order", udatas)
 
            time.sleep(0.01)
 
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
            pass
 
 
def __run_upload_trans(code):
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    while True:
        # print("trans task")
        try:
            if code not in target_codes:
                break
            transaction_upload_active_time_dict[code] = time.time()
            udatas = []
            while not tmep_transaction_queue_dict[code].empty():
                temp = tmep_transaction_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                upload_data(code, "l2_trans", udatas)
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}")
 
 
def __run_upload_common():
    print("__run_upload_common")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                upload_data(temp[0], temp[1], temp[2])
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
 
 
def __run_upload_trading_canceled():
    print("__run_upload_trading_canceled")
    while True:
        try:
            temp = trading_canceled_queue.get()
            upload_data(temp[0], "trading_order_canceled", temp[1])
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
 
 
# 运行上传任务
def run_upload_task(code):
    # 如果代码没有在目标代码中就不需要运行
    if code not in target_codes:
        return
    # 如果最近的活动时间小于2s就不需要运行
    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_order(code), daemon=True)
        t.start()
 
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_trans(code), daemon=True)
        t.start()
 
 
def run_upload_common():
    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
    t.start()
 
 
def run_upload_trading_canceled():
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True)
    t.start()
 
 
if __name__ == "__main__":
    code = "603809"
    target_codes.add(code)
    run_upload_task(code)
    while True:
        for i in range(0, 5):
            add_l2_order_detail({"SecurityID": code, "Price": 11.28, "Volume": 500, "Side": "2", "OrderType": "\u0000",
                                 "OrderTime": 14591555, "MainSeq": 1, "SubSeq": 11050942, "OrderNO": 10692868,
                                 "OrderStatus": "A"}, False)
        time.sleep(0.001)