Administrator
2023-09-15 1ff185866bcf0796d2367699bc000abb326360d5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# -*- coding: utf-8 -*-
import contextlib
import json
import logging
import mmap
import queue
import random
import threading
import time
from huaxin_client import socket_util, l2_data_transform_protocol
 
from huaxin_client.client_network import SendResponseSkManager
 
# 活动时间
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module import log_export, async_log_util
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail
from utils import tool
 
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
# 临时数据
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
trading_canceled_queue = queue.Queue()
log_buy_no_queue = queue.Queue()
# 买入订单号的字典
buy_order_nos_dict = {}
# 最近的大单成交单号
latest_big_order_transaction_orders_dict = {}
 
 
def add_target_code(code):
    target_codes.add(code)
    # 记录代码加入时间
    target_codes_add_time[code] = time.time()
 
 
def del_target_code(code):
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
 
 
# 获取最近的大单成交订单号
def get_latest_transaction_order_nos(code):
    return latest_big_order_transaction_orders_dict.get(code)
 
 
# 正在成交的订单撤单了
def trading_order_canceled(code_, order_no):
    trading_canceled_queue.put((code_, order_no))
 
 
# 添加委托详情
def add_l2_order_detail(data, istransaction=False):
    code = data["SecurityID"]
    # 异步日志记录
    async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail, data)
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    # 原来的格式
    # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
    #                 "Volume": pOrderDetail['Volume'],
    #                 "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
    #                 "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
    #                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
    #                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
    if data['Side'] == "1":
        # 记录所有买入的订单号
        if data['SecurityID'] not in buy_order_nos_dict:
            buy_order_nos_dict[data['SecurityID']] = set()
        buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
        # 买入订单号需要记录日志
        async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}")
 
    tmep_order_detail_queue_dict[code].put(
        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], int(time.time() * 1000)))
 
 
# 添加逐笔成交
def add_transaction_detail(data):
    code = data["SecurityID"]
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    # 原来的格式
    #  item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
    #                     "TradeVolume": pTransaction['TradeVolume'],
    #                     "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
    #                     "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'],
    #                     "ExecType": pTransaction['ExecType'].decode()}
 
    # 判断是否为大单成交
    code = data['SecurityID']
    if code in buy_order_nos_dict:
        if data['BuyNo'] in buy_order_nos_dict[code]:
            try:
                temp_list = latest_big_order_transaction_orders_dict.get(code)
                if not temp_list:
                    temp_list = []
                if temp_list:
                    if temp_list[-1] != data['BuyNo']:
                        # 不加入重复订单号
                        temp_list.append(data['BuyNo'])
                        if len(temp_list) > 10:
                            # 最多加10个订单号
                            temp_list = temp_list[-10:]
                else:
                    temp_list.append(data['BuyNo'])
                latest_big_order_transaction_orders_dict[code] = temp_list
            except:
                pass
    tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                           data['SellNo'], data['ExecType']))
 
 
def add_market_data(data):
    code = data['securityID']
    # 加入上传队列
    common_queue.put((code, "l2_market_data", data))
 
 
def add_subscript_codes(codes):
    print("add_subscript_codes", codes)
    # 加入上传队列
    common_queue.put(('', "l2_subscript_codes", list(codes)))
 
 
def __send_response(sk, msg):
    msg = socket_util.load_header(msg)
    sk.sendall(msg)
    result, header_str = socket_util.recv_data(sk)
    if result:
        result_json = json.loads(result)
        if result_json.get("code") == 0:
            return True
    return False
 
 
# 发送消息
def send_response(type, msg):
    try:
        sk = SendResponseSkManager.get_send_response_sk(type)
        if __send_response(sk, msg):
            return True
        else:
            # 再次发送
            print("再次发送")
            return __send_response(sk, msg)
    except ConnectionResetError as e:
        SendResponseSkManager.del_send_response_sk(type)
        sk = SendResponseSkManager.get_send_response_sk(type)
        return __send_response(sk, msg)
    except BrokenPipeError as e:
        SendResponseSkManager.del_send_response_sk(type)
        sk = SendResponseSkManager.get_send_response_sk(type)
        return __send_response(sk, msg)
 
 
# 上传数据
def upload_data(code, _type, datas, new_sk=False):
    uid = random.randint(0, 100000)
    key = f"{_type}_{code}"
    fdata = json.dumps(
        {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
    result = None
    try:
        if new_sk:
            sk = SendResponseSkManager.create_send_response_sk()
            result = __send_response(sk, fdata.encode('utf-8'))
        else:
            result = send_response(key, fdata.encode('utf-8'))
    except Exception as e:
        logging.exception(e)
    finally:
        pass
        # print("请求结束", uid, result)
        # logger_local_huaxin_l2_upload.info(
        #     f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}")
    # print("上传结果", result)
 
 
# 循环读取上传数据
def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None:
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    if True:
        while True:
            # print("order task")
            try:
                if code not in target_codes:
                    break
                # 打开共享内存
                order_detail_upload_active_time_dict[code] = time.time()
                udatas = []
                while not tmep_order_detail_queue_dict[code].empty():
                    temp = tmep_order_detail_queue_dict[code].get()
                    udatas.append(temp)
                if udatas:
                    start_time = time.time()
                    # upload_data(code, "l2_order", udatas)
                    l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
                    # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
                        async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-上传代码耗时:{use_time}ms")
                time.sleep(0.01)
 
            except Exception as e:
                hx_logger_contact_debug.exception(e)
                logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
                pass
 
 
def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    while True:
        # print("trans task")
        try:
            if code not in target_codes:
                break
            transaction_upload_active_time_dict[code] = time.time()
            udatas = []
            while not tmep_transaction_queue_dict[code].empty():
                temp = tmep_transaction_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                # upload_data(code, "l2_trans", udatas)
                l2_data_callback.OnL2Transaction(code, udatas)
            time.sleep(0.01)
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}")
 
 
def __run_upload_common(l2_data_callback: L2DataCallBack):
    print("__run_upload_common")
    logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            while not common_queue.empty():
                temp = common_queue.get()
                if temp[1] == "l2_market_data":
                    l2_data_callback.OnMarketData(temp[0], temp[2])
                else:
                    upload_data(temp[0], temp[1], temp[2])
 
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
        finally:
            time.sleep(0.01)
 
 
def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    print("__run_upload_trading_canceled")
    logger_system.info(f"l2_client __run_upload_trading_canceled 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            temp = trading_canceled_queue.get()
            if temp:
                logger_local_huaxin_g_cancel.info(f"准备上报:{temp}")
                # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
                l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
                logger_local_huaxin_g_cancel.info(f"上报成功:{temp}")
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
 
 
def __run_log():
    print("__run_log")
    logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
    async_log_util.huaxin_l2_log.run_sync()
 
 
__upload_order_threads = {}
__upload_trans_threads = {}
 
 
# 运行上传任务
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
    try:
        # 如果代码没有在目标代码中就不需要运行
        if code not in target_codes:
            return
        # 如果最近的活动时间小于2s就不需要运行
        if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[
            code] > 2:
            t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
            t.start()
            __upload_order_threads[code] = t
 
        if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[
            code] > 2:
            t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
            t.start()
            __upload_trans_threads[code] = t
    finally:
        pass
 
 
def run_upload_common(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
    t.start()
 
 
def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
    t.start()
 
 
def run_log():
    fdatas = log_export.load_huaxin_local_buy_no()
    global buy_order_nos_dict
    buy_order_nos_dict = fdatas
    t = threading.Thread(target=lambda: __run_log(), daemon=True)
    t.start()
 
 
# 运行守护线程
def run_upload_daemon(_l2_data_callback):
    def upload_daemon():
        logger_system.info(f"l2_client upload_daemon 线程ID:{tool.get_thread_id()}")
        while True:
            try:
                for code in target_codes_add_time:
                    # 目标代码加入2s之后启动守护
                    if time.time() - target_codes_add_time[code] > 2:
                        if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback),
                                                 daemon=True)
                            t.start()
                            __upload_order_threads[code] = t
                            logger_local_huaxin_l2_upload.info(f"重新创建L2订单上传线程:{code}")
                        if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback),
                                                 daemon=True)
                            t.start()
                            __upload_trans_threads[code] = t
                            logger_local_huaxin_l2_upload.info(f"重新创建L2成交上传线程:{code}")
            except:
                pass
            finally:
                time.sleep(3)
 
    t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
    t.start()
 
 
def __test(_l2_data_callback):
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    target_codes.add(code)
    t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
    t.start()
    while True:
        try:
            tmep_order_detail_queue_dict[code].put_nowait(
                ['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224])
            time.sleep(5)
        except:
            pass
 
 
def run_test(_l2_data_callback):
    t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True)
    t.start()
 
 
def test():
    pass