import hashlib
|
import json
|
import logging
|
import socket
|
import socketserver
|
import threading
|
import time
|
|
import psutil
|
|
import inited_data
|
from code_attribute import gpcode_manager
|
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from l2 import l2_data_manager_new
|
from l2.huaxin import huaxin_target_codes_manager
|
from log_module.log import logger_system, logger_l2_codes_subscript
|
from third_data import block_info
|
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
|
from third_data.kpl_data_manager import KPLDataManager
|
from third_data.kpl_util import KPLDataType
|
from trade import trade_manager, trade_huaxin, l2_trade_util
|
|
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \
|
huaxin_trade_data_update
|
from trade.huaxin.huaxin_trade_api import ClientSocketManager
|
from utils import socket_util, tool, huaxin_util, data_export_util
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
def __is_sign_right(self, data_json):
|
list_str = []
|
sign = data_json["sign"]
|
data_json.pop("sign")
|
for k in data_json:
|
list_str.append(f"{k}={data_json[k]}")
|
list_str.sort()
|
__str = "&".join(list_str) + "JiaBei@!*."
|
md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
|
if md5 != sign:
|
raise Exception("签名出错")
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
return_str = ""
|
try:
|
data, header = socket_util.recv_data(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = json.loads(data_str)
|
type_ = data_json['type']
|
|
is_sign_right = socket_util.is_client_params_sign_right(data_json)
|
# ------客户端请求接口-------
|
if type_ == 'buy':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
volume = codes_data["volume"]
|
price = codes_data["price"]
|
try:
|
if not code:
|
raise Exception("请上传code")
|
if not volume:
|
raise Exception("请上传volume")
|
|
if round(float(price), 2) <= 0:
|
prices = HistoryKDatasUtils.get_now_price([code])
|
if not prices:
|
raise Exception("现价获取失败")
|
price = prices[0][1]
|
# 下单
|
result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume,
|
round(float(price), 2))
|
if result:
|
resultJSON = result
|
print("下单结果:", resultJSON)
|
#
|
# {'code': 0, 'data': {'sinfo': 'b_600480_1689060343812', 'securityId': '600480',
|
# 'orderLocalId': '0190000809', 'orderStatus': '7', 'statusMsg':
|
# '10932:产品状态资源访问授权不足', 'orderSysID': '110010190000809', 'accountId':
|
# '38800001334901'}}
|
if resultJSON['code'] == 0:
|
try:
|
resultJSON = resultJSON['data']
|
statusCode = resultJSON['orderStatus']
|
|
if statusCode == huaxin_util.TORA_TSTP_OST_Rejected:
|
# 交易所拒绝
|
raise Exception(resultJSON['statusMsg'])
|
else:
|
trade_huaxin.order_success(resultJSON['securityId'],
|
resultJSON['accountID'],
|
resultJSON['orderSysID'],
|
resultJSON['insertTime']
|
)
|
return_str = json.dumps({"code": 0})
|
finally:
|
# 更新委托列表
|
huaxin_trade_data_update.add_delegate_list("接口")
|
huaxin_trade_data_update.add_money_list()
|
else:
|
raise Exception(resultJSON['msg'])
|
break
|
except Exception as e:
|
raise e
|
elif type_ == 'cancel_order':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
orderSysID = codes_data.get("orderSysID")
|
accountId = codes_data.get("accountId")
|
if code and orderSysID and accountId:
|
result = huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code,
|
orderSysID, True)
|
print("---撤单结果----")
|
print(result)
|
if result["code"] == 0:
|
if result["data"]["cancel"] == 1:
|
# 撤单成功
|
trade_huaxin.cancel_order_success(code, accountId, orderSysID)
|
return_str = json.dumps({"code": 0})
|
else:
|
# 撤单失败
|
raise Exception(result["data"]["errorMsg"])
|
else:
|
raise Exception(result["msg"])
|
|
elif code:
|
state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
|
if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
|
try:
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
logging.exception(e)
|
return_str = json.dumps({"code": 2, "msg": str(e)})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "请上传代码"})
|
break
|
|
elif type_ == 'sell':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
volume = codes_data["volume"]
|
price = codes_data["price"]
|
# 是否强制卖0/1
|
force_sell = codes_data["force"]
|
# TODO 强制卖策略
|
if volume == 0:
|
# 查询持仓量
|
volume = huaxin_trade_record_manager.PositionManager.get_code_volume(code)
|
if volume <= 0:
|
raise Exception("代码无持仓")
|
|
if not price:
|
# 获取现价
|
prices = HistoryKDatasUtils.get_now_price([code])
|
if not prices:
|
raise Exception("现价获取失败")
|
# 已现价的5档价卖
|
price = prices[0][1] - 0.04
|
|
result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_SELL, code, volume, price)
|
if result["code"] == 0:
|
if result["data"]["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected or (
|
type(result["data"]["orderStatus"]) == int and result["data"]["orderStatus"] < 0):
|
raise Exception(result["data"]["statusMsg"])
|
else:
|
return_str = json.dumps({"code": 0, "msg": ""})
|
else:
|
raise Exception(result["msg"])
|
|
print("---卖出结果----")
|
print(result)
|
break
|
elif type_ == 'delegate_list':
|
# 委托列表
|
update_time = data_json["data"]["update_time"]
|
# 是否可撤 0/1
|
can_cancel = data_json["data"]["can_cancel"]
|
results, update_time = None, None
|
if can_cancel:
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), None,
|
[huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
|
else:
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), update_time)
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
|
break
|
elif type_ == 'deal_list':
|
# 成交列表
|
results = huaxin_trade_record_manager.DealRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"))
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results}, "msg": ""})
|
elif type_ == 'position_list':
|
# 持仓股列表
|
results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"))
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results}, "msg": ""})
|
elif type_ == 'money_list':
|
# 资金详情
|
money_data = huaxin_trade_record_manager.MoneyManager.get_data()
|
return_str = json.dumps(
|
{"code": 0, "data": money_data, "msg": ""})
|
elif type_ == 'sync_trade_data':
|
# 同步交易数据
|
sync_type = data_json["data"]["type"]
|
if sync_type == "delegate_list":
|
huaxin_trade_data_update.add_delegate_list("接口")
|
elif sync_type == "deal_list":
|
huaxin_trade_data_update.add_deal_list()
|
elif sync_type == "money":
|
huaxin_trade_data_update.add_money_list()
|
elif sync_type == "position_list":
|
huaxin_trade_data_update.add_position_list()
|
return_str = json.dumps(
|
{"code": 0, "data": {}, "msg": ""})
|
elif type_ == "get_huaxin_subscript_codes":
|
# 获取华鑫订阅的代码
|
codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
|
fresults = []
|
if codes:
|
for code in codes:
|
code_name = gpcode_manager.get_code_name(code)
|
fresults.append((code, code_name))
|
return_str = json.dumps(
|
{"code": 0, "data": {"count": len(fresults), "list": fresults}, "msg": ""})
|
elif type_ == "export_l2_data":
|
# 导出L2数据
|
code = data_json["data"]["code"]
|
try:
|
data_export_util.export_l2_excel(code)
|
return_str = json.dumps(
|
{"code": 0, "data": {}, "msg": ""})
|
except Exception as e:
|
logging.exception(e)
|
return_str = json.dumps(
|
{"code": 1, "msg": str(e)})
|
elif type_ == 'everyday_init':
|
# 每日初始化
|
inited_data.everyday_init()
|
return_str = json.dumps(
|
{"code": 0, "data": {}, "msg": ""})
|
elif type_ == 'huaxin_channel_state':
|
# 华鑫通道状态
|
types = [huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_TRADE,
|
huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_CMD_L2,
|
huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_DEAL_LIST,
|
huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST,
|
huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_MONEY,
|
huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_POSITION_LIST]
|
fdata = {}
|
for t in types:
|
client_list = huaxin_trade_api.ClientSocketManager.list_client(t)
|
client_state_list = []
|
for client in client_list:
|
# 判断是否已经上锁
|
lock_state = huaxin_trade_api.ClientSocketManager.is_client_locked(client[0])
|
lock_state_desc = ""
|
if lock_state is None:
|
lock_state_desc = "未知"
|
elif lock_state:
|
lock_state_desc = "已锁"
|
else:
|
lock_state_desc = "未锁"
|
client_state_list.append((client[0], lock_state_desc))
|
fdata[t] = client_state_list
|
return_str = json.dumps(
|
{"code": 0, "data": fdata, "msg": ""})
|
elif type_ == 'juejin_is_valid':
|
# 掘金是否可用
|
try:
|
date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
|
if date:
|
return_str = json.dumps(
|
{"code": 0, "msg": ""})
|
except Exception as e:
|
return_str = json.dumps(
|
{"code": 0, "msg": str(e)})
|
elif type_ == 'get_env_info':
|
# 获取环境信息
|
fdata = {}
|
try:
|
date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
|
if date:
|
fdata["juejin"] = 1
|
except Exception as e:
|
fdata["juejin"] = 0
|
fdata["kpl"] = {}
|
# 获取开盘啦数据
|
kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
|
KPLDataType.INDUSTRY_RANK.value]
|
for kpl_type in kpl_types:
|
if kpl_type in KPLDataManager.kpl_data_update_info:
|
fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
|
|
try:
|
# 验证redis
|
RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
|
fdata["redis"] = 1
|
except:
|
fdata["redis"] = 0
|
|
try:
|
# 验证mysql
|
mysql_data.Mysqldb().select_one("select 1")
|
fdata["mysql"] = 1
|
except:
|
fdata["mysql"] = 0
|
|
try:
|
# redis异步任务数量
|
fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
|
except:
|
pass
|
|
# 获取CPU与内存适用情况
|
memory_info = psutil.virtual_memory()
|
cpu_percent = psutil.cpu_percent(interval=1)
|
fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
|
return_str = json.dumps(
|
{"code": 0, "data": fdata, "msg": ""})
|
elif type_ == 'test_redis':
|
redis = redis_manager.RedisManager(5).getRedisNoPool()
|
try:
|
_start_time = time.time()
|
times = []
|
for i in range(0, 100):
|
RedisUtils.sadd(redis, "test_set", f"000000:{i}", auto_free=False)
|
times.append(time.time() - _start_time)
|
_start_time = time.time()
|
for i in range(0, 20):
|
RedisUtils.smembers(redis, "test_set", auto_free=False)
|
times.append(time.time() - _start_time)
|
return_str = json.dumps(
|
{"code": 0, "data": times, "msg": ""})
|
finally:
|
redis.close()
|
# RedisUtils.realse(redis)
|
|
# 查询委托列表
|
elif type_ == 'test':
|
# 卖出
|
# trade_api.order(trade_api.TRADE_DIRECTION_SELL, "600854", 100, 5.45)
|
result = huaxin_trade_api.get_deal_list()
|
print("\n\n---成交列表----")
|
for d in result["data"]:
|
print(d)
|
|
result = huaxin_trade_api.get_delegate_list(True)
|
print("\n\n---可撤委托----")
|
for d in result["data"]:
|
print(d)
|
result = huaxin_trade_api.get_delegate_list(False)
|
print("\n\n---全部委托----")
|
for d in result["data"]:
|
print(d)
|
|
result = huaxin_trade_api.get_position_list()
|
print("\n\n---持仓列表----")
|
for d in result["data"]:
|
print(d)
|
|
result = huaxin_trade_api.get_money()
|
print("\n\n---账户列表----")
|
for d in result["data"]:
|
print(d)
|
elif type_ == 'test_l2':
|
codes_data = data_json["data"]
|
result = huaxin_trade_api.set_l2_codes_data(codes_data)
|
print("\n\n---L2设置结果----")
|
print(result)
|
break
|
# sk.close()
|
except Exception as e:
|
logging.exception(e)
|
return_str = json.dumps({"code": 401, "msg": str(e)})
|
break
|
finally:
|
sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8')))
|
|
def finish(self):
|
super().finish()
|
|
|
def __set_target_codes(pipe_l2):
|
logger_system.info("启动读取L2订阅代码队列")
|
while True:
|
try:
|
_datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop()
|
if _datas:
|
times = _datas[0]
|
datas = _datas[1]
|
request_id = _datas[2]
|
logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}", request_id, len(datas))
|
print("时间戳:", times)
|
print("内容:", datas)
|
# 只处理20s内的数据
|
if time.time() - times < 20 and datas:
|
codes = [d[0] for d in datas]
|
for code in codes:
|
block_info.init_code(code)
|
root_data = {"data": {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
|
"data": datas},
|
"request_id": f"{ClientSocketManager.CLIENT_TYPE_CMD_L2}_{round(time.time() * 1000)}"}
|
root_data = socket_util.encryp_client_params_sign(root_data)
|
pipe_l2.send(json.dumps(root_data))
|
print("设置L2代码结束")
|
logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
|
except Exception as e:
|
logging.exception(e)
|
logger_l2_codes_subscript.exception(e)
|
finally:
|
time.sleep(0.01)
|
|
|
def __read_sync_task(pipe):
|
logger_system.info("启动读取数据同步服务")
|
while True:
|
try:
|
if pipe:
|
val = pipe.recv()
|
if val:
|
print("接收到更新任务:", val)
|
val = json.loads(val)
|
type_ = val["type"]
|
if type_ == "want_list":
|
print("want_list before", gpcode_manager.WantBuyCodesManager().list_code_cache())
|
gpcode_manager.WantBuyCodesManager().sync()
|
print("want_list after", gpcode_manager.WantBuyCodesManager().list_code_cache())
|
elif type_ == "white_list":
|
print("white_list before", l2_trade_util.WhiteListCodeManager().list_codes_cache())
|
l2_trade_util.WhiteListCodeManager().sync()
|
print("white_list after", l2_trade_util.WhiteListCodeManager().list_codes_cache())
|
elif type_ == "black_list":
|
print("black_list before", l2_trade_util.BlackListCodeManager().list_codes_cache())
|
l2_trade_util.BlackListCodeManager().sync()
|
print("black_list after", l2_trade_util.BlackListCodeManager().list_codes_cache())
|
elif type_ == "pause_buy_list":
|
print("pause_buy_list before", gpcode_manager.PauseBuyCodesManager().list_code_cache())
|
gpcode_manager.PauseBuyCodesManager().sync()
|
print("pause_buy_list after", gpcode_manager.PauseBuyCodesManager().list_code_cache())
|
elif type_ == "trade_state":
|
print("trade_state before", trade_manager.TradeStateManager().is_can_buy_cache())
|
trade_manager.TradeStateManager().sync()
|
print("trade_state after", trade_manager.TradeStateManager().is_can_buy_cache())
|
elif type_ == "trade_mode":
|
print("trade_mode before", trade_manager.TradeTargetCodeModeManager().get_mode_cache())
|
trade_manager.TradeTargetCodeModeManager().sync()
|
print("trade_mode after", trade_manager.TradeTargetCodeModeManager().get_mode_cache())
|
|
|
|
|
|
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(1)
|
|
|
def run(pipe_server, pipe_l2):
|
logger_system.info("create TradeApiServer")
|
# 拉取交易信息
|
huaxin_trade_data_update.run()
|
#
|
t1 = threading.Thread(target=lambda: __set_target_codes(pipe_l2), daemon=True)
|
t1.start()
|
|
t1 = threading.Thread(target=lambda: __read_sync_task(pipe_server), daemon=True)
|
t1.start()
|
|
while True:
|
time.sleep(5)
|
|
# 托管环境下不开启接口
|
# laddr = "0.0.0.0", 10009
|
# tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
# tcpserver.serve_forever()
|
|
|
if __name__ == "__main__":
|
pass
|