"""
|
命令管理器
|
"""
|
import json
|
import logging
|
import random
|
import socket
|
import threading
|
import time
|
|
# 心跳信息
|
import crypt
|
import socket_util
|
from client_network import SendResponseSkManager
|
|
MSG_TYPE_HEART = "heart"
|
# 命令信息
|
MSG_TYPE_CMD = "cmd"
|
|
CLIENT_TYPE_TRADE = "trade"
|
CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
|
CLIENT_TYPE_DEAL_LIST = "deal_list"
|
CLIENT_TYPE_POSITION_LIST = "position_list"
|
CLIENT_TYPE_MONEY = "money"
|
CLIENT_TYPE_DEAL = "deal"
|
|
CLIENT_TYPE_CMD_L2 = "l2_cmd"
|
|
# 心跳时间间隔
|
HEART_SPACE_TIME = 3
|
|
|
class TradeActionCallback(object):
|
# 交易
|
def OnTrade(self, client_id, request_id, type_, data):
|
pass
|
|
# 委托列表
|
def OnDelegateList(self, client_id, request_id):
|
pass
|
|
# 成交列表
|
def OnDealList(self, client_id, request_id):
|
pass
|
|
# 成交列表
|
def OnPositionList(self, client_id, request_id):
|
pass
|
|
# 获取资金信息
|
def OnMoney(self, client_id, request_id):
|
pass
|
|
|
class L2ActionCallback(object):
|
# 监听L2数据
|
def OnSetL2Position(self, client_id, request_id, codes_data):
|
pass
|
|
|
# 交易指令管理
|
class TradeCommandManager:
|
trade_client_dict = {}
|
_instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls._instance:
|
cls._instance = super().__new__(cls, *args, **kwargs)
|
return cls._instance
|
|
@classmethod
|
def __create_client(cls, client_type, rid):
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server
|
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
|
# client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000))
|
client.connect(cls.ip_port)
|
client.send(SendResponseSkManager.format_response(
|
json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8")))
|
client.recv(1024)
|
return client
|
|
@classmethod
|
def __create_and_run_client(cls, type, index=None):
|
key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
|
if index is not None:
|
key += f"_{index}"
|
sk = cls.__create_client(type, key)
|
# 发送心跳
|
cls.__heartbeats_thread(type, key, sk)
|
cls.__listen_command_thread(type, key, sk)
|
return key, sk
|
|
@classmethod
|
def init(cls, addr, port, trade_action_callback, trade_client_count=10):
|
cls.trade_client_dict = {}
|
cls.trade_client_count = trade_client_count
|
cls.action_callback = trade_action_callback
|
cls.ip_port = (addr, port)
|
|
for i in range(trade_client_count):
|
result = cls.__create_and_run_client(CLIENT_TYPE_TRADE, i)
|
cls.trade_client_dict[result[0]] = result[1]
|
|
# 查询委托与成交与资金
|
cls.delegate_client = cls.__create_and_run_client(CLIENT_TYPE_DELEGATE_LIST)
|
cls.deal_list_client = cls.__create_and_run_client(CLIENT_TYPE_DEAL_LIST)
|
# 持仓客户端
|
cls.position_list_client = cls.__create_and_run_client(CLIENT_TYPE_POSITION_LIST)
|
# 资金详情
|
cls.money_client = cls.__create_and_run_client(CLIENT_TYPE_MONEY)
|
|
# 听取指令
|
@classmethod
|
def __listen_command(cls, _type, client_id, sk):
|
while True:
|
try:
|
result = socket_util.recv_data(sk)[0]
|
if result:
|
try:
|
print("接收数据", _type, result)
|
result_json = json.loads(result)
|
|
if result_json["type"] == MSG_TYPE_HEART:
|
# 返回内容
|
sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))
|
continue
|
|
data = result_json["data"]
|
print("接收内容", data)
|
request_id = result_json.get('request_id')
|
if not socket_util.is_client_params_sign_right(result_json):
|
print("签名错误")
|
# 签名出错
|
SendResponseSkManager.send_error_response(_type, request_id, client_id,
|
{"code": -1, "msg": "签名错误"})
|
continue
|
|
if _type == CLIENT_TYPE_TRADE:
|
# 交易
|
ctype = data["trade_type"]
|
cls.action_callback.OnTrade(client_id, request_id, ctype, data)
|
elif _type == CLIENT_TYPE_MONEY:
|
cls.action_callback.OnMoney(client_id, request_id)
|
elif _type == CLIENT_TYPE_DEAL_LIST:
|
cls.action_callback.OnDealList(client_id, request_id)
|
elif _type == CLIENT_TYPE_DELEGATE_LIST:
|
can_cancel = data["can_cancel"]
|
cls.action_callback.OnDelegateList(client_id, request_id, can_cancel)
|
elif _type == CLIENT_TYPE_POSITION_LIST:
|
cls.action_callback.OnPositionList(client_id, request_id)
|
except Exception as e:
|
logging.exception(e)
|
pass
|
finally:
|
# 发送响应
|
sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
|
else:
|
raise Exception("接收的内容为空")
|
|
except Exception as e:
|
logging.exception(e)
|
if _type == CLIENT_TYPE_TRADE:
|
if client_id in cls.trade_client_dict:
|
cls.trade_client_dict.pop(client_id)
|
elif _type == CLIENT_TYPE_MONEY:
|
cls.money_client = None
|
elif _type == CLIENT_TYPE_DEAL_LIST:
|
cls.deal_list_client = None
|
elif _type == CLIENT_TYPE_DELEGATE_LIST:
|
cls.delegate_client = None
|
elif _type == CLIENT_TYPE_POSITION_LIST:
|
cls.position_list_client = None
|
try:
|
sk.close()
|
except:
|
pass
|
# 结束当前的消息循环
|
break
|
|
@classmethod
|
def __heart_beats(cls, _type, client_id, sk):
|
while True:
|
try:
|
sk.send(SendResponseSkManager.format_response(
|
json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
|
print("心跳信息发送成功", client_id)
|
except Exception as e:
|
print("心跳信息发送失败")
|
logging.exception(e)
|
if _type == CLIENT_TYPE_TRADE:
|
if client_id in cls.trade_client_dict:
|
cls.trade_client_dict.pop(client_id)
|
elif _type == CLIENT_TYPE_MONEY:
|
cls.money_client = None
|
elif _type == CLIENT_TYPE_DEAL_LIST:
|
cls.deal_list_client = None
|
elif _type == CLIENT_TYPE_DELEGATE_LIST:
|
cls.delegate_client = None
|
elif _type == CLIENT_TYPE_POSITION_LIST:
|
cls.position_list_client = None
|
try:
|
sk.close()
|
except:
|
pass
|
# 结束当前的消息循环
|
break
|
time.sleep(HEART_SPACE_TIME)
|
|
@classmethod
|
def __listen_command_thread(cls, _type, rid, sk):
|
t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk))
|
t1.setDaemon(True)
|
t1.start()
|
|
@classmethod
|
def __heartbeats_thread(cls, _type, rid, sk):
|
t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk))
|
t1.setDaemon(True)
|
t1.start()
|
|
@classmethod
|
def __maintain_client(cls):
|
while True:
|
try:
|
if len(cls.trade_client_dict) < cls.trade_client_count:
|
print("__maintain_client", CLIENT_TYPE_TRADE)
|
for i in range(cls.trade_client_count - len(cls.trade_client_dict)):
|
result = cls.__create_and_run_client(CLIENT_TYPE_TRADE)
|
cls.trade_client_dict[result[0]] = result[1]
|
# 判断
|
# 查询委托与成交与资金
|
if cls.delegate_client is None:
|
print("__maintain_client", CLIENT_TYPE_DELEGATE_LIST)
|
cls.delegate_client = cls.__create_and_run_client(CLIENT_TYPE_DELEGATE_LIST)
|
|
if cls.deal_list_client is None:
|
print("__maintain_client", CLIENT_TYPE_DEAL_LIST)
|
cls.deal_list_client = cls.__create_and_run_client(CLIENT_TYPE_DEAL_LIST)
|
|
# 持仓客户端
|
if cls.position_list_client is None:
|
print("__maintain_client", CLIENT_TYPE_POSITION_LIST)
|
cls.position_list_client = cls.__create_and_run_client(CLIENT_TYPE_POSITION_LIST)
|
|
if cls.money_client is None:
|
print("__maintain_client", CLIENT_TYPE_MONEY)
|
cls.money_client = cls.__create_and_run_client(CLIENT_TYPE_MONEY)
|
|
except:
|
pass
|
time.sleep(1)
|
|
# 维护连接数的稳定
|
def run(self, blocking=True):
|
# 维护client
|
if blocking:
|
self.__maintain_client()
|
else:
|
t1 = threading.Thread(target=lambda: self.__maintain_client())
|
t1.setDaemon(True)
|
t1.start()
|
|
|
# L2指令管理
|
class L2CommandManager:
|
|
@classmethod
|
def __create_client(cls, client_type, rid):
|
client = socket.socket() # 生成socket,连接server
|
# client.settimeout(20)
|
client.connect(cls.ip_port)
|
client.send(SendResponseSkManager.format_response(
|
json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8")))
|
client.recv(10240)
|
return client
|
|
@classmethod
|
def init(cls, addr, port, l2_action_callback):
|
cls.action_callback = l2_action_callback
|
cls.ip_port = (addr, port)
|
# 查询委托与成交与资金
|
key = f"{CLIENT_TYPE_CMD_L2}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
|
cls.l2_cmd_client = cls.__create_client(CLIENT_TYPE_CMD_L2, key)
|
cls.__heartbeats_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client)
|
cls.__listen_command_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client)
|
|
# 听取指令
|
@classmethod
|
def __listen_command(cls, _type, client_id, sk):
|
while True:
|
try:
|
result = socket_util.recv_data(sk)[0]
|
print("接收L2_CMD数据")
|
if result:
|
try:
|
result_json = json.loads(result)
|
if result_json["type"] == MSG_TYPE_HEART:
|
# 返回内容
|
sk.send(json.dumps({"type": "heart", "rid": client_id}).encode('utf-8'))
|
continue
|
data = result_json["data"]
|
request_id = result_json["request_id"]
|
type = data["type"]
|
if not socket_util.is_client_params_sign_right(result_json):
|
# 签名出错
|
SendResponseSkManager.send_error_response(_type, request_id, client_id,
|
{"code": -1, "msg": "签名错误"})
|
continue
|
codes_data = data["data"]
|
if type == CLIENT_TYPE_CMD_L2:
|
cls.action_callback.OnSetL2Position(client_id, request_id, codes_data)
|
except:
|
pass
|
finally:
|
# 发送响应
|
sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
|
else:
|
raise Exception("接收L2_CMD数据为空")
|
except Exception as e:
|
logging.exception(e)
|
cls.l2_cmd_client = None
|
try:
|
sk.close()
|
except:
|
pass
|
# 结束当前的消息循环
|
break
|
|
@classmethod
|
def __heart_beats(cls, _type, client_id, sk):
|
while True:
|
try:
|
sk.send(SendResponseSkManager.format_response(
|
json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
|
print("心跳信息发送成功", client_id)
|
except Exception as e:
|
print("心跳信息发送失败")
|
logging.exception(e)
|
cls.l2_cmd_client = None
|
try:
|
sk.close()
|
except:
|
pass
|
break
|
time.sleep(HEART_SPACE_TIME)
|
|
@classmethod
|
def __heartbeats_thread(cls, _type, rid, sk):
|
t1 = threading.Thread(target=lambda: cls.__heart_beats(_type, rid, sk))
|
t1.setDaemon(True)
|
t1.start()
|
|
@classmethod
|
def __listen_command_thread(cls, _type, rid, sk):
|
t1 = threading.Thread(target=lambda: cls.__listen_command(_type, rid, sk))
|
t1.setDaemon(True)
|
t1.start()
|
|
@classmethod
|
def __maintain_client(cls):
|
while True:
|
try:
|
if cls.l2_cmd_client is None:
|
print("__maintain_client")
|
key = f"{CLIENT_TYPE_CMD_L2}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
|
cls.l2_cmd_client = cls.__create_client(CLIENT_TYPE_CMD_L2, key)
|
cls.__heartbeats_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client)
|
cls.__listen_command_thread(CLIENT_TYPE_CMD_L2, key, cls.l2_cmd_client)
|
except Exception as e:
|
logging.exception(e)
|
time.sleep(1)
|
|
# 维护连接数的稳定
|
def run(self, blocking=True):
|
if blocking:
|
self.__maintain_client()
|
else:
|
# 维护client
|
t1 = threading.Thread(target=lambda: self.__maintain_client())
|
t1.setDaemon(True)
|
t1.start()
|
|
|
if __name__ == "__main__":
|
manager = TradeCommandManager("127.0.0.1", 10008, None)
|
manager.run()
|
input()
|