# -*- coding: utf-8 -*-
|
import contextlib
|
import json
|
import logging
|
import mmap
|
import queue
|
import random
|
import threading
|
import time
|
from huaxin_client import socket_util, l2_data_transform_protocol
|
|
from huaxin_client.client_network import SendResponseSkManager
|
|
# 活动时间
|
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
|
from log_module import log_export, async_log_util
|
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
|
logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail
|
from utils import tool
|
|
order_detail_upload_active_time_dict = {}
|
transaction_upload_active_time_dict = {}
|
# 临时数据
|
tmep_order_detail_queue_dict = {}
|
tmep_transaction_queue_dict = {}
|
target_codes = set()
|
target_codes_add_time = {}
|
common_queue = queue.Queue()
|
trading_canceled_queue = queue.Queue()
|
log_buy_no_queue = queue.Queue()
|
# 买入订单号的字典
|
buy_order_nos_dict = {}
|
# 最近的大单成交单号
|
latest_big_order_transaction_orders_dict = {}
|
|
|
def add_target_code(code):
|
target_codes.add(code)
|
# 记录代码加入时间
|
target_codes_add_time[code] = time.time()
|
|
|
def del_target_code(code):
|
target_codes.discard(code)
|
if code in target_codes_add_time:
|
target_codes_add_time.pop(code)
|
|
|
# 获取最近的大单成交订单号
|
def get_latest_transaction_order_nos(code):
|
return latest_big_order_transaction_orders_dict.get(code)
|
|
|
# 正在成交的订单撤单了
|
def trading_order_canceled(code_, order_no):
|
trading_canceled_queue.put((code_, order_no))
|
|
|
# 添加委托详情
|
def add_l2_order_detail(data, istransaction=False):
|
code = data["SecurityID"]
|
# 异步日志记录
|
async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail, data)
|
if code not in tmep_order_detail_queue_dict:
|
tmep_order_detail_queue_dict[code] = queue.Queue()
|
# 原来的格式
|
# {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'],
|
# "Volume": pOrderDetail['Volume'],
|
# "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(),
|
# "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
|
# "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
|
# "OrderStatus": pOrderDetail['OrderStatus'].decode()}
|
if data['Side'] == "1":
|
# 记录所有买入的订单号
|
if data['SecurityID'] not in buy_order_nos_dict:
|
buy_order_nos_dict[data['SecurityID']] = set()
|
buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
|
# 买入订单号需要记录日志
|
async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}")
|
|
tmep_order_detail_queue_dict[code].put(
|
(data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
|
data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], int(time.time() * 1000)))
|
|
|
# 添加逐笔成交
|
def add_transaction_detail(data):
|
code = data["SecurityID"]
|
if code not in tmep_transaction_queue_dict:
|
tmep_transaction_queue_dict[code] = queue.Queue()
|
# 原来的格式
|
# item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
|
# "TradeVolume": pTransaction['TradeVolume'],
|
# "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'],
|
# "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'],
|
# "ExecType": pTransaction['ExecType'].decode()}
|
|
# 判断是否为大单成交
|
code = data['SecurityID']
|
if code in buy_order_nos_dict:
|
if data['BuyNo'] in buy_order_nos_dict[code]:
|
try:
|
temp_list = latest_big_order_transaction_orders_dict.get(code)
|
if not temp_list:
|
temp_list = []
|
if temp_list:
|
if temp_list[-1] != data['BuyNo']:
|
# 不加入重复订单号
|
temp_list.append(data['BuyNo'])
|
if len(temp_list) > 10:
|
# 最多加10个订单号
|
temp_list = temp_list[-10:]
|
else:
|
temp_list.append(data['BuyNo'])
|
latest_big_order_transaction_orders_dict[code] = temp_list
|
except:
|
pass
|
tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
|
data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
|
data['SellNo'], data['ExecType']))
|
|
|
def add_market_data(data):
|
code = data['securityID']
|
# 加入上传队列
|
common_queue.put((code, "l2_market_data", data))
|
|
|
def add_subscript_codes(codes):
|
print("add_subscript_codes", codes)
|
# 加入上传队列
|
common_queue.put(('', "l2_subscript_codes", list(codes)))
|
|
|
def __send_response(sk, msg):
|
msg = socket_util.load_header(msg)
|
sk.sendall(msg)
|
result, header_str = socket_util.recv_data(sk)
|
if result:
|
result_json = json.loads(result)
|
if result_json.get("code") == 0:
|
return True
|
return False
|
|
|
# 发送消息
|
def send_response(type, msg):
|
try:
|
sk = SendResponseSkManager.get_send_response_sk(type)
|
if __send_response(sk, msg):
|
return True
|
else:
|
# 再次发送
|
print("再次发送")
|
return __send_response(sk, msg)
|
except ConnectionResetError as e:
|
SendResponseSkManager.del_send_response_sk(type)
|
sk = SendResponseSkManager.get_send_response_sk(type)
|
return __send_response(sk, msg)
|
except BrokenPipeError as e:
|
SendResponseSkManager.del_send_response_sk(type)
|
sk = SendResponseSkManager.get_send_response_sk(type)
|
return __send_response(sk, msg)
|
|
|
# 上传数据
|
def upload_data(code, _type, datas, new_sk=False):
|
uid = random.randint(0, 100000)
|
key = f"{_type}_{code}"
|
fdata = json.dumps(
|
{"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
|
result = None
|
try:
|
if new_sk:
|
sk = SendResponseSkManager.create_send_response_sk()
|
result = __send_response(sk, fdata.encode('utf-8'))
|
else:
|
result = send_response(key, fdata.encode('utf-8'))
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
pass
|
# print("请求结束", uid, result)
|
# logger_local_huaxin_l2_upload.info(
|
# f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}")
|
# print("上传结果", result)
|
|
|
# 循环读取上传数据
|
def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None:
|
if code not in tmep_order_detail_queue_dict:
|
tmep_order_detail_queue_dict[code] = queue.Queue()
|
if True:
|
while True:
|
# print("order task")
|
try:
|
if code not in target_codes:
|
break
|
# 打开共享内存
|
order_detail_upload_active_time_dict[code] = time.time()
|
udatas = []
|
while not tmep_order_detail_queue_dict[code].empty():
|
temp = tmep_order_detail_queue_dict[code].get()
|
udatas.append(temp)
|
if udatas:
|
start_time = time.time()
|
# upload_data(code, "l2_order", udatas)
|
l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000))
|
# l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
|
use_time = int((time.time() - start_time) * 1000)
|
if use_time > 20:
|
async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-上传代码耗时:{use_time}ms")
|
time.sleep(0.01)
|
|
except Exception as e:
|
hx_logger_contact_debug.exception(e)
|
logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
|
pass
|
|
|
def __run_upload_trans(code, l2_data_callback: L2DataCallBack):
|
if code not in tmep_transaction_queue_dict:
|
tmep_transaction_queue_dict[code] = queue.Queue()
|
while True:
|
# print("trans task")
|
try:
|
if code not in target_codes:
|
break
|
transaction_upload_active_time_dict[code] = time.time()
|
udatas = []
|
while not tmep_transaction_queue_dict[code].empty():
|
temp = tmep_transaction_queue_dict[code].get()
|
udatas.append(temp)
|
if udatas:
|
# upload_data(code, "l2_trans", udatas)
|
l2_data_callback.OnL2Transaction(code, udatas)
|
time.sleep(0.01)
|
except Exception as e:
|
logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}")
|
|
|
def __run_upload_common(l2_data_callback: L2DataCallBack):
|
print("__run_upload_common")
|
logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}")
|
while True:
|
try:
|
while not common_queue.empty():
|
temp = common_queue.get()
|
if temp[1] == "l2_market_data":
|
l2_data_callback.OnMarketData(temp[0], temp[2])
|
else:
|
upload_data(temp[0], temp[1], temp[2])
|
|
except Exception as e:
|
logger_local_huaxin_l2_error.exception(e)
|
logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
|
finally:
|
time.sleep(0.01)
|
|
|
def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
|
print("__run_upload_trading_canceled")
|
logger_system.info(f"l2_client __run_upload_trading_canceled 线程ID:{tool.get_thread_id()}")
|
while True:
|
try:
|
temp = trading_canceled_queue.get()
|
if temp:
|
logger_local_huaxin_g_cancel.info(f"准备上报:{temp}")
|
# upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True)
|
l2_data_callback.OnTradingOrderCancel(temp[0], temp[1])
|
logger_local_huaxin_g_cancel.info(f"上报成功:{temp}")
|
except Exception as e:
|
logger_local_huaxin_l2_error.exception(e)
|
logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
|
|
|
def __run_log():
|
print("__run_log")
|
logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
|
async_log_util.huaxin_l2_log.run_sync()
|
|
|
__upload_order_threads = {}
|
__upload_trans_threads = {}
|
|
|
# 运行上传任务
|
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
|
try:
|
# 如果代码没有在目标代码中就不需要运行
|
if code not in target_codes:
|
return
|
# 如果最近的活动时间小于2s就不需要运行
|
if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[
|
code] > 2:
|
t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
|
t.start()
|
__upload_order_threads[code] = t
|
|
if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[
|
code] > 2:
|
t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
|
t.start()
|
__upload_trans_threads[code] = t
|
finally:
|
pass
|
|
|
def run_upload_common(l2_data_callback: L2DataCallBack):
|
t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True)
|
t.start()
|
|
|
def run_upload_trading_canceled(l2_data_callback: L2DataCallBack):
|
t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True)
|
t.start()
|
|
|
def run_log():
|
fdatas = log_export.load_huaxin_local_buy_no()
|
global buy_order_nos_dict
|
buy_order_nos_dict = fdatas
|
t = threading.Thread(target=lambda: __run_log(), daemon=True)
|
t.start()
|
|
|
# 运行守护线程
|
def run_upload_daemon(_l2_data_callback):
|
def upload_daemon():
|
logger_system.info(f"l2_client upload_daemon 线程ID:{tool.get_thread_id()}")
|
while True:
|
try:
|
for code in target_codes_add_time:
|
# 目标代码加入2s之后启动守护
|
if time.time() - target_codes_add_time[code] > 2:
|
if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
|
t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback),
|
daemon=True)
|
t.start()
|
__upload_order_threads[code] = t
|
logger_local_huaxin_l2_upload.info(f"重新创建L2订单上传线程:{code}")
|
if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
|
t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback),
|
daemon=True)
|
t.start()
|
__upload_trans_threads[code] = t
|
logger_local_huaxin_l2_upload.info(f"重新创建L2成交上传线程:{code}")
|
except:
|
pass
|
finally:
|
time.sleep(3)
|
|
t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
|
t.start()
|
|
|
def __test(_l2_data_callback):
|
code = "002073"
|
if code not in tmep_order_detail_queue_dict:
|
tmep_order_detail_queue_dict[code] = queue.Queue()
|
target_codes.add(code)
|
t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
|
t.start()
|
while True:
|
try:
|
tmep_order_detail_queue_dict[code].put_nowait(
|
['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224])
|
time.sleep(5)
|
except:
|
pass
|
|
|
def run_test(_l2_data_callback):
|
t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True)
|
t.start()
|
|
|
def test():
|
pass
|