# -*- coding: utf-8 -*-
|
import json
|
import logging
|
import marshal
|
import queue
|
import threading
|
import time
|
|
import constant
|
from huaxin_client import socket_util
|
|
from huaxin_client.client_network import SendResponseSkManager
|
|
# 活动时间
|
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
|
from log_module import async_log_util
|
from log_module.async_log_util import huaxin_l2_log
|
from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \
|
logger_local_huaxin_l2_special_volume
|
from utils import tool
|
import collections
|
import zmq
|
|
order_detail_upload_active_time_dict = {}
|
transaction_upload_active_time_dict = {}
|
# 临时数据
|
tmep_order_detail_queue_dict = {}
|
tmep_transaction_queue_dict = {}
|
target_codes = set()
|
target_codes_add_time = {}
|
common_queue = queue.Queue()
|
|
|
# L2上传数据管理器
|
class L2DataUploadManager:
|
def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
|
self.data_callback_distribute_manager = data_callback_distribute_manager
|
# 代码分配的对象
|
self.temp_order_queue_dict = {}
|
self.temp_transaction_queue_dict = {}
|
self.temp_log_queue_dict = {}
|
|
self.filter_order_condition_dict = {}
|
self.upload_l2_data_task_dict = {}
|
self.l2_order_codes = set()
|
self.l2_transaction_codes = set()
|
|
# 设置订单过滤条件
|
# special_price:过滤的1手的价格
|
def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume):
|
if code not in self.filter_order_condition_dict:
|
self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume,
|
min_volume // 50)]
|
huaxin_l2_log.info(logger_local_huaxin_l2_subscript,
|
f"({code})常规过滤条件设置:{self.filter_order_condition_dict[code]}")
|
|
# 过滤订单
|
def __filter_order(self, item):
|
filter_condition = self.filter_order_condition_dict.get(item[0])
|
if filter_condition:
|
# item[2]为量
|
if item[2] >= filter_condition[0][0]:
|
return item
|
# 1手的买单满足价格
|
if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001:
|
return item
|
# 买量
|
if item[2] == filter_condition[0][3]:
|
return item
|
|
# 卖大于2w且是涨停卖
|
if item[3] != '1' and item[2] > filter_condition[0][4] and item[1] == filter_condition[0][1]:
|
return item
|
|
return None
|
return item
|
# 过滤订单
|
|
def __filter_transaction(self, item):
|
filter_condition = self.filter_order_condition_dict.get(item[0])
|
if filter_condition:
|
# item[2]为量
|
if abs(item[1] - filter_condition[0][1]) < 0.201:
|
return item
|
return None
|
return item
|
|
# 添加委托详情
|
def add_l2_order_detail(self, data, start_time=0, istransaction=False):
|
code = data["SecurityID"]
|
# 不直接加入
|
# queue_info = self.order_queue_distribute_manager.get_distributed_queue(code)
|
# if not queue_info:
|
# return
|
# queue_info[1].put_nowait(
|
# (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
|
# data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
|
# if data['Volume'] == 100:
|
# log_queue = self.temp_log_queue_dict.get(code)
|
# if log_queue:
|
# log_queue.put_nowait(data)
|
|
q: collections.deque = self.temp_order_queue_dict.get(code)
|
q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
|
data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
|
|
# 添加逐笔成交
|
def add_transaction_detail(self, data):
|
code = data["SecurityID"]
|
# 不直接加入
|
# queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code)
|
# if not queue_info:
|
# return
|
# # 判断是否为大单成交
|
# queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
|
# data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
|
# data['SellNo'], data['ExecType']))
|
|
q: collections.deque = self.temp_transaction_queue_dict.get(code)
|
q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
|
data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
|
data['SellNo'], data['ExecType']))
|
|
def add_market_data(self, data):
|
# 加入上传队列
|
# self.market_data_queue.put_nowait(data)
|
code = data['securityID']
|
callback = self.data_callback_distribute_manager.get_distributed_callback(code)
|
if callback:
|
callback.OnMarketData(code, data)
|
|
# 分配上传队列
|
def distribute_upload_queue(self, code):
|
if not self.data_callback_distribute_manager.get_distributed_callback(code):
|
self.data_callback_distribute_manager.distribute_callback(code)
|
|
if code not in self.temp_order_queue_dict:
|
self.temp_order_queue_dict[code] = collections.deque()
|
if code not in self.temp_transaction_queue_dict:
|
self.temp_transaction_queue_dict[code] = collections.deque()
|
if code not in self.temp_log_queue_dict:
|
self.temp_log_queue_dict[code] = queue.Queue()
|
if code not in self.upload_l2_data_task_dict:
|
t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
|
t1.start()
|
t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
|
t2.start()
|
# t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True)
|
# t3.start()
|
self.upload_l2_data_task_dict[code] = (t1, t2)
|
# 释放已经分配的队列
|
|
def release_distributed_upload_queue(self, code):
|
self.data_callback_distribute_manager.release_distribute_callback(code)
|
if code in self.temp_order_queue_dict:
|
self.temp_order_queue_dict[code].clear()
|
self.temp_order_queue_dict.pop(code)
|
if code in self.temp_transaction_queue_dict:
|
self.temp_transaction_queue_dict[code].clear()
|
self.temp_transaction_queue_dict.pop(code)
|
if code in self.temp_log_queue_dict:
|
self.temp_log_queue_dict.pop(code)
|
|
if code in self.upload_l2_data_task_dict:
|
self.upload_l2_data_task_dict.pop(code)
|
|
def __upload_l2_data(self, code, _queue, datas):
|
_queue.put_nowait(marshal.dumps([code, datas, time.time()]))
|
|
# 处理订单数据并上传
|
def __run_upload_order_task(self, code):
|
q: collections.deque = self.temp_order_queue_dict.get(code)
|
temp_list = []
|
while True:
|
try:
|
while len(q) > 0:
|
data = q.popleft()
|
# 前置数据处理,过滤掉无用的数据
|
data = self.__filter_order(data)
|
if data:
|
temp_list.append(data)
|
|
if temp_list:
|
# 上传数据
|
# self.__upload_l2_data(code, upload_queue, temp_list)
|
# self.__upload_l2_order_data(code, temp_list)
|
__start_time = time.time()
|
last_data = temp_list[-1]
|
self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list,
|
time.time())
|
use_time = time.time() - __start_time
|
if use_time > 0.01:
|
# 记录10ms以上的数据
|
huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s 结束数据:{last_data}")
|
temp_list = []
|
else:
|
if code not in self.temp_order_queue_dict:
|
self.l2_order_codes.discard(code)
|
break
|
self.l2_order_codes.add(code)
|
time.sleep(0.001)
|
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
temp_list.clear()
|
|
# 处理成交数据并上传
|
def __run_upload_transaction_task(self, code):
|
q: collections.deque = self.temp_transaction_queue_dict.get(code)
|
temp_list = []
|
while True:
|
try:
|
while len(q) > 0:
|
data = q.popleft()
|
data = self.__filter_transaction(data)
|
if data:
|
temp_list.append(data)
|
if temp_list:
|
# 上传数据
|
# self.__upload_l2_data(code, upload_queue, temp_list)
|
self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code,
|
temp_list)
|
temp_list = []
|
else:
|
if code not in self.temp_transaction_queue_dict:
|
self.l2_transaction_codes.discard(code)
|
break
|
self.l2_transaction_codes.add(code)
|
time.sleep(0.001)
|
except:
|
pass
|
finally:
|
temp_list.clear()
|
|
def __run_log_task(self, code):
|
q: queue.Queue = self.temp_log_queue_dict.get(code)
|
while True:
|
try:
|
temp = q.get(timeout=10)
|
huaxin_l2_log.info(logger_local_huaxin_l2_special_volume,
|
f"{temp}")
|
except:
|
time.sleep(0.02)
|
finally:
|
if code not in self.temp_log_queue_dict:
|
break
|
|
|
class L2DataUploadProtocolManager:
|
|
# ipchosts IPC协议
|
def __init__(self, ipchosts):
|
self.ipchosts = ipchosts
|
# 所有的client
|
self.socket_client_dict = {}
|
# 保存代码分配的client 格式:{code:(host, socket)}
|
self.code_socket_client_dict = {}
|
self.rlock = threading.RLock()
|
context = zmq.Context()
|
if constant.is_windows():
|
return
|
for host in self.ipchosts:
|
socket = context.socket(zmq.REQ)
|
socket.connect(host)
|
self.socket_client_dict[host] = socket
|
|
# 获取
|
def __get_available_ipchost(self):
|
if len(self.code_socket_client_dict) >= len(self.socket_client_dict):
|
raise Exception("无可用host")
|
used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict])
|
for host in self.socket_client_dict:
|
if host not in used_hosts:
|
return host, self.socket_client_dict[host]
|
raise Exception("无可用host")
|
|
# 分配HOST
|
def distribute_upload_host(self, code):
|
if code in self.code_socket_client_dict:
|
return
|
self.rlock.acquire()
|
try:
|
host_info = self.__get_available_ipchost()
|
if host_info:
|
self.code_socket_client_dict[code] = host_info
|
finally:
|
self.rlock.release()
|
|
def release_distributed_upload_host(self, code):
|
if code not in self.code_socket_client_dict:
|
return
|
self.rlock.acquire()
|
try:
|
if code in self.code_socket_client_dict:
|
self.code_socket_client_dict.pop(code)
|
finally:
|
self.rlock.release()
|
|
def upload_data_as_json(self, code, data):
|
if code not in self.code_socket_client_dict:
|
raise Exception("尚未分配host")
|
host, socket = self.code_socket_client_dict[code]
|
socket.send(marshal.dumps(data))
|
socket.recv_string()
|
|
|
def add_target_code(code):
|
target_codes.add(code)
|
# 记录代码加入时间
|
target_codes_add_time[code] = time.time()
|
|
|
def del_target_code(code):
|
target_codes.discard(code)
|
if code in target_codes_add_time:
|
target_codes_add_time.pop(code)
|
|
|
def add_subscript_codes(codes):
|
print("add_subscript_codes", codes)
|
# 加入上传队列
|
common_queue.put(('', "l2_subscript_codes", list(codes)))
|
|
|
def __send_response(sk, msg):
|
msg = socket_util.load_header(msg)
|
sk.sendall(msg)
|
result, header_str = socket_util.recv_data(sk)
|
if result:
|
result_json = json.loads(result)
|
if result_json.get("code") == 0:
|
return True
|
return False
|
|
|
# 发送消息
|
def send_response(type, msg):
|
try:
|
sk = SendResponseSkManager.get_send_response_sk(type)
|
if __send_response(sk, msg):
|
return True
|
else:
|
# 再次发送
|
print("再次发送")
|
return __send_response(sk, msg)
|
except ConnectionResetError as e:
|
SendResponseSkManager.del_send_response_sk(type)
|
sk = SendResponseSkManager.get_send_response_sk(type)
|
return __send_response(sk, msg)
|
except BrokenPipeError as e:
|
SendResponseSkManager.del_send_response_sk(type)
|
sk = SendResponseSkManager.get_send_response_sk(type)
|
return __send_response(sk, msg)
|
|
|
# 上传数据
|
def upload_data(code, _type, datas, new_sk=False):
|
key = f"{_type}_{code}"
|
fdata = json.dumps(
|
{"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}})
|
result = None
|
try:
|
if new_sk:
|
sk = SendResponseSkManager.create_send_response_sk()
|
result = __send_response(sk, fdata.encode('utf-8'))
|
else:
|
result = send_response(key, fdata.encode('utf-8'))
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
pass
|
|
|
def __run_upload_common():
|
print("__run_upload_common")
|
logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}")
|
while True:
|
try:
|
while not common_queue.empty():
|
temp = common_queue.get()
|
upload_data(temp[0], temp[1], temp[2])
|
|
except Exception as e:
|
logger_local_huaxin_l2_error.exception(e)
|
logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
|
finally:
|
time.sleep(0.01)
|
|
|
def __run_log():
|
print("__run_log")
|
logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}")
|
async_log_util.huaxin_l2_log.run_sync()
|
|
|
# 采用socket传输数据
|
def run_upload_common():
|
t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
|
t.start()
|
|
|
def run_log():
|
t = threading.Thread(target=lambda: __run_log(), daemon=True)
|
t.start()
|
|
|
def __test():
|
# 分配数据
|
pass
|
|
|
def run_test():
|
t = threading.Thread(target=lambda: __test(), daemon=True)
|
t.start()
|
|
|
def test():
|
ipclist = []
|
for i in range(0, 70):
|
ipclist.append(f"ipc://l2order{i}.ipc")
|
manager = L2DataUploadProtocolManager(ipclist)
|
code = "000333"
|
manager.distribute_upload_host(code)
|
manager.upload_data_as_json(code, {"test": "test"})
|