# -*- coding: utf-8 -*-
|
import logging
|
import marshal
|
import queue
|
import threading
|
import time
|
|
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
|
from log_module.async_log_util import huaxin_l2_log
|
import collections
|
|
from log_module.log import logger_local_huaxin_contact_debug
|
|
order_detail_upload_active_time_dict = {}
|
transaction_upload_active_time_dict = {}
|
# 临时数据
|
tmep_order_detail_queue_dict = {}
|
tmep_transaction_queue_dict = {}
|
target_codes = set()
|
target_codes_add_time = {}
|
common_queue = queue.Queue()
|
|
|
# L2上传数据管理器
|
class L2DataUploadManager:
|
def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
|
self.data_callback_distribute_manager = data_callback_distribute_manager
|
# 代码分配的对象
|
self.temp_order_queue_dict = {}
|
self.temp_transaction_queue_dict = {}
|
self.temp_log_queue_dict = {}
|
|
self.filter_order_condition_dict = {}
|
self.upload_l2_data_task_dict = {}
|
self.l2_order_codes = set()
|
self.l2_transaction_codes = set()
|
self.__real_time_buy1_data = {}
|
|
# 过滤订单
|
def __filter_order(self, item):
|
if item[1] * item[2] < 500000:
|
return None
|
return item
|
# 过滤订单
|
|
def __filter_transaction(self, item):
|
return item
|
|
# 添加委托详情
|
def add_l2_order_detail(self, data, start_time=0, istransaction=False):
|
code = data["SecurityID"]
|
if code in self.__real_time_buy1_data:
|
if self.__real_time_buy1_data[code][1] == data["Price"]:
|
# 与买的价格一致
|
if data["Side"] == '1':
|
if data["OrderStatus"] == 'D':
|
# 买撤
|
self.__real_time_buy1_data[code][3] -= data["Volume"]
|
else:
|
# 买
|
self.__real_time_buy1_data[code][3] += data["Volume"]
|
q: collections.deque = self.temp_order_queue_dict.get(code)
|
q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
|
data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
|
|
# 添加逐笔成交
|
def add_transaction_detail(self, data):
|
code = data["SecurityID"]
|
if code in self.__real_time_buy1_data:
|
if self.__real_time_buy1_data[code][1] == data["TradePrice"]:
|
# 与买的价格一致
|
self.__real_time_buy1_data[code][3] -= data["TradeVolume"]
|
q: collections.deque = self.temp_transaction_queue_dict.get(code)
|
q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
|
data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
|
data['SellNo'], data['ExecType']))
|
|
def add_market_data(self, data):
|
code = data["securityID"]
|
# [时间,买1价格,原始买1量,计算后的买1量]}
|
self.__real_time_buy1_data[code] = [data["dataTimeStamp"], data["buy"][0][0], data["buy"][0][1],
|
data["buy"][0][1]]
|
|
self.data_callback_distribute_manager.get_distributed_callback(code).OnMarketData(code, [data])
|
|
# 分配上传队列
|
def distribute_upload_queue(self, code):
|
if not self.data_callback_distribute_manager.get_distributed_callback(code):
|
self.data_callback_distribute_manager.distribute_callback(code)
|
|
if code not in self.temp_order_queue_dict:
|
self.temp_order_queue_dict[code] = collections.deque()
|
if code not in self.temp_transaction_queue_dict:
|
self.temp_transaction_queue_dict[code] = collections.deque()
|
if code not in self.temp_log_queue_dict:
|
self.temp_log_queue_dict[code] = queue.Queue()
|
if code not in self.upload_l2_data_task_dict:
|
t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
|
t1.start()
|
t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
|
t2.start()
|
t3 = threading.Thread(target=lambda: self.__run_upload_real_time_buy1_task(code), daemon=True)
|
t3.start()
|
self.upload_l2_data_task_dict[code] = (t1, t2, t3)
|
# 释放已经分配的队列
|
|
def release_distributed_upload_queue(self, code):
|
self.data_callback_distribute_manager.release_distribute_callback(code)
|
if code in self.temp_order_queue_dict:
|
self.temp_order_queue_dict[code].clear()
|
self.temp_order_queue_dict.pop(code)
|
if code in self.temp_transaction_queue_dict:
|
self.temp_transaction_queue_dict[code].clear()
|
self.temp_transaction_queue_dict.pop(code)
|
if code in self.temp_log_queue_dict:
|
self.temp_log_queue_dict.pop(code)
|
|
if code in self.upload_l2_data_task_dict:
|
self.upload_l2_data_task_dict.pop(code)
|
|
def __upload_l2_data(self, code, _queue, datas):
|
_queue.put_nowait(marshal.dumps([code, datas, time.time()]))
|
|
# 处理订单数据并上传
|
def __run_upload_order_task(self, code):
|
q: collections.deque = self.temp_order_queue_dict.get(code)
|
temp_list = []
|
while True:
|
try:
|
while len(q) > 0:
|
data = q.popleft()
|
# 前置数据处理,过滤掉无用的数据
|
data = self.__filter_order(data)
|
if data:
|
temp_list.append(data)
|
|
if temp_list:
|
# 上传数据
|
# self.__upload_l2_data(code, upload_queue, temp_list)
|
# self.__upload_l2_order_data(code, temp_list)
|
__start_time = time.time()
|
last_data = temp_list[-1]
|
self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list,
|
time.time())
|
use_time = time.time() - __start_time
|
if use_time > 0.01:
|
# 记录10ms以上的数据
|
huaxin_l2_log.info(logger_local_huaxin_contact_debug, f"耗时:{use_time}s 结束数据:{last_data}")
|
temp_list = []
|
else:
|
if code not in self.temp_order_queue_dict:
|
self.l2_order_codes.discard(code)
|
break
|
self.l2_order_codes.add(code)
|
time.sleep(0.001)
|
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
pass
|
|
# 处理成交数据并上传
|
def __run_upload_transaction_task(self, code):
|
q: collections.deque = self.temp_transaction_queue_dict.get(code)
|
temp_list = []
|
while True:
|
try:
|
while len(q) > 0:
|
data = q.popleft()
|
data = self.__filter_transaction(data)
|
if data:
|
temp_list.append(data)
|
if temp_list:
|
# 上传数据
|
# self.__upload_l2_data(code, upload_queue, temp_list)
|
self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code,
|
temp_list)
|
temp_list = []
|
else:
|
if code not in self.temp_transaction_queue_dict:
|
self.l2_transaction_codes.discard(code)
|
break
|
self.l2_transaction_codes.add(code)
|
time.sleep(0.001)
|
except:
|
pass
|
finally:
|
pass
|
|
# 处理实时买1数据
|
def __run_upload_real_time_buy1_task(self, code):
|
while True:
|
try:
|
if code in self.__real_time_buy1_data:
|
data = self.__real_time_buy1_data[code]
|
# 如果最新的买1是原来买1的1/2时开始上传
|
if data[2] > 0 and data[3] / data[2] <= 0.5:
|
self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code, data)
|
except:
|
pass
|
finally:
|
time.sleep(0.1)
|
|
|
def add_target_code(code):
|
target_codes.add(code)
|
# 记录代码加入时间
|
target_codes_add_time[code] = time.time()
|
|
|
def del_target_code(code):
|
target_codes.discard(code)
|
if code in target_codes_add_time:
|
target_codes_add_time.pop(code)
|
|
|
def add_subscript_codes(codes):
|
pass
|
|
|
if __name__ == "__main__":
|
add_subscript_codes(["000333"])
|