import json import logging import queue import random import threading import time import socket_util from client_network import SendResponseSkManager from mylog import logger_l2_error, logger_l2_upload # 活动时间 order_detail_upload_active_time_dict = {} transaction_upload_active_time_dict = {} # 临时数据 tmep_order_detail_queue_dict = {} tmep_transaction_queue_dict = {} target_codes = set() common_queue = queue.Queue() # 添加委托详情 def add_l2_order_detail(data, istransaction=False): code = data["SecurityID"] if code not in tmep_order_detail_queue_dict: tmep_order_detail_queue_dict[code] = queue.Queue() if istransaction: pass else: pass # 原来的格式 # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], # "Volume": pOrderDetail['Volume'], # "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(), # "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], # "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], # "OrderStatus": pOrderDetail['OrderStatus'].decode()} tmep_order_detail_queue_dict[code].put( (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'])) # 添加逐笔成交 def add_transaction_detail(data): code = data["SecurityID"] if code not in tmep_transaction_queue_dict: tmep_transaction_queue_dict[code] = queue.Queue() # 原来的格式 # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], # "TradeVolume": pTransaction['TradeVolume'], # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], # "ExecType": pTransaction['ExecType'].decode()} tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType'])) def add_market_data(data): code = data['securityID'] # 加入上传队列 common_queue.put((code, "l2_market_data", data)) def add_subscript_codes(codes): print("add_subscript_codes", codes) # 加入上传队列 common_queue.put(('', "l2_subscript_codes", list(codes))) def __send_response(sk, msg): msg = socket_util.load_header(msg) sk.sendall(msg) result, header_str = socket_util.recv_data(sk) if result: result_json = json.loads(result) if result_json.get("code") == 0: return True return False # 发送消息 def send_response(type, msg): try: sk = SendResponseSkManager.get_send_response_sk(type) if __send_response(sk, msg): return True else: # 再次发送 print("再次发送") return __send_response(sk, msg) except ConnectionResetError as e: SendResponseSkManager.del_send_response_sk(type) sk = SendResponseSkManager.get_send_response_sk(type) return __send_response(sk, msg) except BrokenPipeError as e: SendResponseSkManager.del_send_response_sk(type) sk = SendResponseSkManager.get_send_response_sk(type) return __send_response(sk, msg) # 上传数据 def upload_data(code, _type, datas): uid = random.randint(0, 100000) key = f"{_type}_{code}" fdata = json.dumps( {"type": _type, "data": {"code": code, "data": datas}}) # print("数据长度:", len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}", ) # print("请求开始", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}") result = None start_time = time.time() logger_l2_upload.info(f"{code} 上传数据开始-{_type}") try: result = send_response(key, fdata.encode('utf-8')) except Exception as e: logging.exception(e) finally: # print("请求结束", uid, result) logger_l2_upload.info(f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}") # print("上传结果", result) # 循环读取上传数据 def __run_upload_order(code): if code not in tmep_order_detail_queue_dict: tmep_order_detail_queue_dict[code] = queue.Queue() while True: # print("order task") try: if code not in target_codes: break order_detail_upload_active_time_dict[code] = time.time() udatas = [] while not tmep_order_detail_queue_dict[code].empty(): temp = tmep_order_detail_queue_dict[code].get() udatas.append(temp) if udatas: upload_data(code, "l2_order", udatas) time.sleep(0.01) except Exception as e: logger_l2_error.error(f"上传订单数据出错:{str(e)}") pass def __run_upload_trans(code): if code not in tmep_transaction_queue_dict: tmep_transaction_queue_dict[code] = queue.Queue() while True: # print("trans task") try: if code not in target_codes: break transaction_upload_active_time_dict[code] = time.time() udatas = [] while not tmep_transaction_queue_dict[code].empty(): temp = tmep_transaction_queue_dict[code].get() udatas.append(temp) if udatas: upload_data(code, "l2_trans", udatas) time.sleep(0.01) except Exception as e: logger_l2_error.error(f"上传成交数据出错:{str(e)}") def __run_upload_common(): print("__run_upload_common") while True: try: while not common_queue.empty(): temp = common_queue.get() upload_data(temp[0], temp[1], temp[2]) time.sleep(0.01) except Exception as e: logger_l2_error.exception(e) logger_l2_error.error(f"上传普通数据出错:{str(e)}") # 运行上传任务 def run_upload_task(code): # 如果代码没有在目标代码中就不需要运行 if code not in target_codes: return # 如果最近的活动时间小于2s就不需要运行 if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2: t = threading.Thread(target=lambda: __run_upload_order(code), daemon=True) t.start() if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: t = threading.Thread(target=lambda: __run_upload_trans(code), daemon=True) t.start() def run_upload_common(): t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) t.start() if __name__ == "__main__": code = "603809" target_codes.add(code) run_upload_task(code) while True: for i in range(0, 5): add_l2_order_detail({"SecurityID": code, "Price": 11.28, "Volume": 500, "Side": "2", "OrderType": "\u0000", "OrderTime": 14591555, "MainSeq": 1, "SubSeq": 11050942, "OrderNO": 10692868, "OrderStatus": "A"}, False) time.sleep(0.001)