import hashlib
|
import json
|
import logging
|
import queue
|
import random
|
import socket
|
import socketserver
|
import threading
|
import time
|
|
import dask
|
|
from code_attribute import gpcode_manager
|
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
|
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
|
from l2.huaxin import huaxin_target_codes_manager
|
from l2.l2_data_manager_new import L2TradeDataProcessor
|
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
|
hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
|
from third_data import block_info
|
from trade import deal_big_money_manager, current_price_process_manager
|
|
from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api
|
from utils import socket_util
|
|
trade_data_request_queue = queue.Queue()
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
__TradeBuyQueue = transaction_progress.TradeBuyQueue()
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
def __is_sign_right(self, data_json):
|
list_str = []
|
sign = data_json["sign"]
|
data_json.pop("sign")
|
for k in data_json:
|
list_str.append(f"{k}={data_json[k]}")
|
list_str.sort()
|
__str = "&".join(list_str) + "JiaBei@!*."
|
md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
|
if md5 != sign:
|
raise Exception("签名出错")
|
|
@classmethod
|
def getRecvData(cls, skk):
|
data = ""
|
header_size = 10
|
buf = skk.recv(header_size)
|
header_str = buf
|
if buf:
|
start_time = time.time()
|
buf = buf.decode('utf-8')
|
if buf.startswith("##"):
|
content_length = int(buf[2:10])
|
received_size = 0
|
while not received_size == content_length:
|
r_data = skk.recv(10240)
|
received_size += len(r_data)
|
data += r_data.decode('utf-8')
|
else:
|
data = skk.recv(1024 * 1024)
|
data = buf + data.decode('utf-8')
|
hx_logger_l2_upload.info(f"读取数据耗时:{round((time.time() - start_time) * 1000, 1)}")
|
return data, header_str
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
try:
|
# data = sk.recv(1024*1024, socket.MSG_WAITALL)
|
data, header = self.getRecvData(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = None
|
try:
|
data_json = json.loads(data_str)
|
except json.decoder.JSONDecodeError as e:
|
# JSON解析失败
|
sk.sendall(socket_util.load_header( json.dumps(
|
{"code": 100, "msg": f"JSON解析失败"}).encode(
|
encoding='utf-8')))
|
hx_logger_trade_callback.error(
|
f"json解析失败,字符串长度:{len(data_str)},字符串内容:\"{data_str[:30]}......{data_str[-20:]}\"")
|
continue
|
if data_json["type"] == 'register':
|
client_type = data_json["data"]["client_type"]
|
rid = data_json["rid"]
|
trade_api.ClientSocketManager.add_client(client_type, rid, sk)
|
sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
|
try:
|
# print("客户端", ClientSocketManager.socket_client_dict)
|
while True:
|
result, header = self.getRecvData(sk)
|
try:
|
resultJSON = json.loads(result)
|
if resultJSON["type"] == 'heart':
|
# 记录活跃客户端
|
trade_api.ClientSocketManager.heart(resultJSON['client_id'])
|
else:
|
hx_logger_contact_debug.warning(f"接收到非心跳信息:{result}")
|
except json.decoder.JSONDecodeError as e:
|
if not result:
|
sk.close()
|
print("JSON解析出错", result, header)
|
time.sleep(1)
|
except ConnectionResetError as ee:
|
trade_api.ClientSocketManager.del_client(rid)
|
except Exception as e:
|
logging.exception(e)
|
|
elif data_json["type"] == "response":
|
# 主动触发的响应
|
try:
|
client_id = data_json["client_id"]
|
hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
|
# 设置响应内容
|
trade_api.set_response(client_id, data_json["request_id"], data_json['data'])
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "trade_callback":
|
try:
|
# 交易回调
|
data_json = data_json["data"]
|
type_ = data_json["type"]
|
# 记录交易反馈日志
|
hx_logger_trade_callback.info(data_json)
|
# 重新请求委托列表与资金
|
trade_api_server.trade_data_request_queue.put_nowait({"type": "delegate_list"})
|
trade_api_server.trade_data_request_queue.put_nowait({"type": "money"})
|
trade_api_server.trade_data_request_queue.put_nowait({"type": "deal_list"})
|
# print("响应结果:", data_json['data'])
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "l2_order":
|
# L2逐笔委托
|
data = data_json["data"]
|
code = data["code"]
|
datas = data["data"]
|
hx_logger_l2_orderdetail.info(f"{code}#{datas}")
|
l2_log.threadIds[code] = random.randint(0, 100000)
|
l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, datas)
|
sk.sendall(json.dumps({"code": 0}).encode(encoding='utf-8'))
|
|
elif data_json["type"] == "l2_trans":
|
data = data_json["data"]
|
code = data["code"]
|
datas = data["data"]
|
hx_logger_l2_transaction.info(f"{code}#{datas}")
|
if datas:
|
# 设置成交价
|
current_price_process_manager.set_trade_price(code, datas[-1][1])
|
try:
|
buyno_map = l2_data_util.local_today_buyno_map.get(code)
|
hx_logger_l2_transaction.info(f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
|
buy_progress_index = None
|
for i in range(len(datas) - 1, -1, -1):
|
d = datas[i]
|
buy_no = f"{d[6]}"
|
if buyno_map and buy_no in buyno_map:
|
hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}")
|
buy_progress_index = buyno_map[buy_no]["index"]
|
break
|
|
# 获取执行位时间
|
buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
|
code)
|
if True:
|
if buy_progress_index is not None:
|
self.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
|
total_datas = l2_data_util.local_today_datas.get(code)
|
num_operate_map = l2_data_util.local_today_num_operate_map.get(
|
code)
|
logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
|
buy_progress_index)
|
buy_time = total_datas[buy_progress_index]["val"][
|
"time"]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if buy_exec_index:
|
need_cancel, msg = DCancelBigNumComputer.set_trade_progress(code,
|
buy_progress_index,
|
buy_exec_index,
|
total_datas,
|
num_operate_map,
|
num * 100 * float(
|
limit_up_price),
|
limit_up_price)
|
if need_cancel:
|
L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
|
|
f1 = dask.delayed(HourCancelBigNumComputer.set_trade_progress)(code, buy_time,
|
buy_exec_index,
|
buy_progress_index,
|
total_datas,
|
num_operate_map)
|
f2 = dask.delayed(LCancelBigNumComputer.set_trade_progress)(code,
|
buy_progress_index,
|
total_datas)
|
f3 = dask.delayed(deal_big_money_manager.set_trade_progress)(code,
|
buy_progress_index,
|
total_datas,
|
num_operate_map)
|
dask.compute(f1, f2, f3)
|
except Exception as e:
|
hx_logger_l2_transaction.exception(e)
|
|
sk.sendall(json.dumps({"code": 0}).encode(encoding='utf-8'))
|
elif data_json["type"] == "l2_market_data":
|
data = data_json["data"]
|
code = data["code"]
|
data = data["data"]
|
time_str = f"{data['dataTimeStamp']}"
|
if time_str.startswith("9"):
|
time_str = "0" + time_str
|
time_str = time_str[:6]
|
time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
|
buy_1_price, buy_1_volume = data["buy"][0]
|
sell_1_price, sell_1_volume = data["sell"][0]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
if limit_up_price is not None:
|
# 处理买1,卖1信息
|
code_price_manager.Buy1PriceManager.process(code, buy_1_price, time_str, limit_up_price,
|
sell_1_price, sell_1_volume // 100)
|
hx_logger_l2_market_data.info(f"{code}#{data}")
|
sk.sendall(json.dumps({"code": 0}).encode(encoding='utf-8'))
|
elif data_json["type"] == "l2_subscript_codes":
|
data = data_json["data"]
|
datas = data["data"]
|
print("l2_subscript_codes", data_json)
|
# 订阅的代码
|
huaxin_target_codes_manager.save_subscript_codes(datas)
|
sk.sendall(json.dumps({"code": 0}).encode(encoding='utf-8'))
|
else:
|
# 断开连接
|
break
|
# sk.close()
|
except Exception as e:
|
logging.exception(e)
|
break
|
|
def finish(self):
|
super().finish()
|
|
|
def clear_invalid_client():
|
while True:
|
try:
|
huaxin_trade_api.ClientSocketManager.del_invalid_clients()
|
except:
|
pass
|
finally:
|
time.sleep(2)
|
|
|
def run():
|
# 执行一些初始化数据
|
block_info.init()
|
|
print("create TradeServer")
|
t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
|
t1.start()
|
|
laddr = "0.0.0.0", 10008
|
tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
|
|
if __name__ == "__main__":
|
|
# l2.l2_data_util.save_l2_data(code, None, datas)
|
|
# ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "1", None)
|
# ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "2", None)
|
# ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "3", None)
|
#
|
# for i in range(0, 3):
|
# t1 = threading.Thread(target=lambda: test1())
|
# t1.setDaemon(True)
|
# t1.start()
|
#
|
# for i in range(0, 3):
|
# t1 = threading.Thread(target=lambda: test2())
|
# t1.setDaemon(True)
|
# t1.start()
|
#
|
|
while True:
|
time.sleep(10)
|