"""
|
外部接口管理
|
"""
|
import json
|
import logging
|
import random
|
import socket
|
import threading
|
import time
|
|
# 心跳信息
|
from huaxin_client import socket_util
|
from huaxin_client.client_network import SendResponseSkManager
|
from log_module.log import logger_system, logger_request_api
|
from utils import middle_api_protocol, tool
|
|
MSG_TYPE_HEART = "heart"
|
# 命令信息
|
MSG_TYPE_CMD = "cmd"
|
|
CLIENT_TYPE_TRADE = "trade_low_suction"
|
|
# 心跳时间间隔
|
HEART_SPACE_TIME = 3
|
|
TRADE_DIRECTION_BUY = 1
|
TRADE_DIRECTION_SELL = 2
|
|
TRADE_TYPE_ORDER = 1
|
TRADE_TYPE_CANCEL_ORDER = 2
|
|
# 数据操作
|
OPERRATE_SET = 1 # 设置
|
OPERRATE_DELETE = 2 # 删除
|
OPERRATE_GET = 3 # 获取
|
OPERRATE_ADD = 4 # 新增
|
|
# 代码名单类型
|
CODE_LIST_WHITE = "white"
|
CODE_LIST_BLACK = "black"
|
CODE_LIST_WANT = "want"
|
CODE_LIST_PAUSE_BUY = "pause_buy"
|
CODE_LIST_MUST_BUY = "must_buy"
|
CODE_LIST_GREEN = "green"
|
|
# 类型
|
API_TYPE_TRADE = "trade" # 交易
|
API_TYPE_TRADE_STATE = "trade_state" # 交易状态
|
API_TYPE_TRADE_MODE = "trade_mode" # 交易模式
|
API_TYPE_SELL_RULE = "sell_rule" # 卖出规则
|
API_TYPE_CODE_LIST = "code_list" # 代码名单
|
API_TYPE_EXPORT_L2 = "export_l2" # 导出L2数据
|
API_TYPE_INIT = "init" # 初始化
|
API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data" # 交易数据刷新
|
API_TYPE_CODE_ATRRIBUTE = "code_attribute" # 代码属性
|
API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 代码交易状态
|
API_TYPE_GET_ENV = "get_env" # 获取环境信息
|
API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes" # 同步L1需要订阅的代码
|
API_TYPE_SYSTEM_LOG = "system_log" # 系统日志
|
API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server" # 从数据服务器拉取数据
|
API_TYPE_CODE_TRADE_INFO = "code_trade_info" # 代码交易信息
|
API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count" # L2有效监听数量
|
API_TYPE_SAVE_RUNNING_DATA = "save_running_data" # 保存运行时数据
|
API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info" # 获取代码持仓信息
|
API_TYPE_COMMON_REQUEST = "common_request" # 通用请求
|
|
|
class ActionCallback(object):
|
def OnCommonRequest(self, client_id, request_id, data):
|
pass
|
|
|
# 交易指令管理
|
# 交易指令管理
|
@tool.singleton
|
class ApiCommandManager:
|
trade_ls_client_dict = {}
|
trade_ls_client_count = 0
|
|
def __init__(self, addr, port, trade_action_callback, trade_ls_client_count=20):
|
self.trade_ls_client_dict.clear()
|
self.trade_ls_client_count = trade_ls_client_count
|
self.action_callback = trade_action_callback
|
self.ip_port = (addr, port)
|
|
for i in range(trade_ls_client_count):
|
result = self.__create_and_run_client(CLIENT_TYPE_TRADE, i)
|
self.trade_ls_client_dict[result[0]] = result[1]
|
|
def __create_client(self, client_type, rid):
|
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 生成socket,连接server
|
# client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
|
# client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000))
|
client.connect(self.ip_port)
|
client.send(SendResponseSkManager.format_response(
|
json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8")))
|
client.recv(1024)
|
return client
|
|
def __create_and_run_client(self, type, index=None):
|
key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
|
if index is not None:
|
key += f"_{index}"
|
sk = self.__create_client(type, key)
|
# 发送心跳
|
self.__heartbeats_thread(type, key, sk)
|
self.__listen_command_thread(type, key, sk)
|
# print("create_and_run_client success", type, key)
|
logger_request_api.info(f"创建本地socket请求客户端:{type}")
|
return key, sk
|
|
# 听取指令
|
def __listen_command(self, _type, client_id, sk):
|
while True:
|
try:
|
result = socket_util.recv_data(sk)[0]
|
if result:
|
start_time = time.time()
|
try:
|
print("接收数据", _type, result)
|
result_json = json.loads(result)
|
if result_json["type"] == MSG_TYPE_HEART:
|
# 返回内容
|
sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))
|
continue
|
|
data = result_json["data"]
|
content_type = data["type"]
|
# print("接收内容", data)
|
request_id = result_json.get('request_id')
|
if not socket_util.is_client_params_sign_right(result_json):
|
# print("签名错误")
|
# 签名出错
|
SendResponseSkManager.send_error_response(_type, request_id, client_id,
|
{"code": -1, "msg": "签名错误"})
|
continue
|
if content_type == API_TYPE_COMMON_REQUEST:
|
self.action_callback.OnCommonRequest(client_id, request_id, data)
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
use_time = int(time.time() - start_time)
|
if use_time > 5:
|
result_json = json.loads(result)
|
logger_request_api.info(f"超时5s以上:{result_json['data']['type']}")
|
# 发送响应
|
sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
|
else:
|
raise Exception("接收的内容为空")
|
|
except Exception as e:
|
logging.exception(e)
|
if _type == CLIENT_TYPE_TRADE:
|
if client_id in self.trade_ls_client_dict:
|
self.trade_ls_client_dict.pop(client_id)
|
try:
|
sk.close()
|
except:
|
pass
|
# 结束当前的消息循环
|
break
|
|
def __heart_beats(self, _type, client_id, sk):
|
while True:
|
try:
|
sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
|
# print("心跳信息发送成功", client_id)
|
except Exception as e:
|
if _type == CLIENT_TYPE_TRADE:
|
if client_id in self.trade_ls_client_dict:
|
self.trade_ls_client_dict.pop(client_id)
|
try:
|
sk.close()
|
except:
|
pass
|
# 结束当前的消息循环
|
break
|
time.sleep(HEART_SPACE_TIME)
|
|
def __listen_command_thread(self, _type, rid, sk):
|
t1 = threading.Thread(target=lambda: self.__listen_command(_type, rid, sk))
|
t1.setDaemon(True)
|
t1.start()
|
|
def __heartbeats_thread(self, _type, rid, sk):
|
t1 = threading.Thread(target=lambda: self.__heart_beats(_type, rid, sk))
|
t1.setDaemon(True)
|
t1.start()
|
|
def __maintain_client(self):
|
logger_system.info(f"outside_api __maintain_client 线程ID:{tool.get_thread_id()}")
|
while True:
|
try:
|
if len(self.trade_ls_client_dict) < self.trade_ls_client_count:
|
for i in range(self.trade_ls_client_count - len(self.trade_ls_client_dict)):
|
result = self.__create_and_run_client(CLIENT_TYPE_TRADE)
|
self.trade_ls_client_dict[result[0]] = result[1]
|
except:
|
pass
|
time.sleep(1)
|
|
# 维护连接数的稳定
|
def run(self, blocking=True):
|
# 维护client
|
if blocking:
|
self.__maintain_client()
|
else:
|
t1 = threading.Thread(target=lambda: self.__maintain_client())
|
t1.setDaemon(True)
|
t1.start()
|
|
|
if __name__ == "__main__":
|
manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, ActionCallback())
|
manager.run()
|
input()
|