import hashlib
|
import json
|
import logging
|
import queue
|
import random
|
import socket
|
import socketserver
|
import threading
|
import time
|
|
import constant
|
from l2 import l2_data_manager_new
|
from l2.huaxin import huaxin_target_codes_manager
|
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
|
hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_trade_debug
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
from trade import trade_manager, trade_huaxin
|
|
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_record_manager
|
from utils import socket_util, tool, huaxin_util
|
|
trade_data_request_queue = queue.Queue()
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
def __is_sign_right(self, data_json):
|
list_str = []
|
sign = data_json["sign"]
|
data_json.pop("sign")
|
for k in data_json:
|
list_str.append(f"{k}={data_json[k]}")
|
list_str.sort()
|
__str = "&".join(list_str) + "JiaBei@!*."
|
md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
|
if md5 != sign:
|
raise Exception("签名出错")
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
return_str = ""
|
try:
|
data, header = socket_util.recv_data(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = json.loads(data_str)
|
type_ = data_json['type']
|
|
is_sign_right = socket_util.is_client_params_sign_right(data_json)
|
# ------客户端请求接口-------
|
if type_ == 'buy':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
volume = codes_data["volume"]
|
price = codes_data["price"]
|
try:
|
if not code:
|
raise Exception("请上传code")
|
if not volume:
|
raise Exception("请上传volume")
|
|
if round(float(price), 2) <= 0:
|
prices = HistoryKDatasUtils.get_now_price([code])
|
if not prices:
|
raise Exception("现价获取失败")
|
price = prices[0][1]
|
# 下单
|
result = trade_api.order(trade_api.TRADE_DIRECTION_BUY, code, volume,
|
round(float(price), 2))
|
if result:
|
resultJSON = result
|
print("下单结果:", resultJSON)
|
#
|
# {'code': 0, 'data': {'sinfo': 'b_600480_1689060343812', 'securityId': '600480',
|
# 'orderLocalId': '0190000809', 'orderStatus': '7', 'statusMsg':
|
# '10932:产品状态资源访问授权不足', 'orderSysID': '110010190000809', 'accountId':
|
# '38800001334901'}}
|
if resultJSON['code'] == 0:
|
try:
|
resultJSON = resultJSON['data']
|
statusCode = resultJSON['orderStatus']
|
|
if statusCode == huaxin_util.TORA_TSTP_OST_Rejected:
|
# 交易所拒绝
|
raise Exception(resultJSON['statusMsg'])
|
else:
|
trade_huaxin.order_success(resultJSON['securityId'],
|
resultJSON['accountID'],
|
resultJSON['orderSysID'])
|
return_str = json.dumps({"code": 0})
|
finally:
|
# 更新委托列表
|
trade_data_request_queue.put_nowait({"type": "delegate_list"})
|
# 更新资金
|
trade_data_request_queue.put_nowait({"type": "money"})
|
else:
|
raise Exception(resultJSON['msg'])
|
break
|
except Exception as e:
|
raise e
|
elif type_ == 'cancel_order':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
orderSysID = codes_data.get("orderSysID")
|
accountId = codes_data.get("accountId")
|
if code and orderSysID and accountId:
|
result = trade_api.cancel_order(trade_api.TRADE_DIRECTION_BUY, code, orderSysID, True)
|
print("---撤单结果----")
|
print(result)
|
if result["code"] == 0:
|
if result["data"]["cancel"] == 1:
|
# 撤单成功
|
trade_huaxin.cancel_order_success(code, accountId, orderSysID)
|
return_str = json.dumps({"code": 0})
|
else:
|
# 撤单失败
|
raise Exception(result["data"]["errorMsg"])
|
else:
|
raise Exception(result["msg"])
|
|
elif code:
|
state = trade_manager.get_trade_state(code)
|
if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
|
try:
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
return_str = json.dumps({"code": 2, "msg": str(e)})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "请上传代码"})
|
# 更新委托列表
|
trade_data_request_queue.put_nowait({"type": "delegate_list"})
|
# 更新资金
|
trade_data_request_queue.put_nowait({"type": "money"})
|
break
|
|
elif type_ == 'sell':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
volume = codes_data["volume"]
|
price = codes_data["price"]
|
# 是否强制卖0/1
|
force_sell = codes_data["force"]
|
# TODO 强制卖策略
|
if volume == 0:
|
# 查询持仓量
|
volume = huaxin_trade_record_manager.PositionManager.get_code_volume(code)
|
if volume <= 0:
|
raise Exception("代码无持仓")
|
|
if not price:
|
# 获取现价
|
prices = HistoryKDatasUtils.get_now_price([code])
|
if not prices:
|
raise Exception("现价获取失败")
|
# 已现价的5档价卖
|
price = prices[0][1] - 0.04
|
|
result = trade_api.order(trade_api.TRADE_DIRECTION_SELL, code, volume, price)
|
if result["code"] == 0:
|
if result["data"]["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected:
|
raise Exception(result["data"]["statusMsg"])
|
else:
|
return_str = json.dumps({"code": 0, "msg": ""})
|
trade_data_request_queue.put_nowait({"type": "delegate_list"})
|
else:
|
raise Exception(result["msg"])
|
|
print("---卖出结果----")
|
print(result)
|
break
|
elif type_ == 'delegate_list':
|
# 委托列表
|
update_time = data_json["data"]["update_time"]
|
# 是否可撤 0/1
|
can_cancel = data_json["data"]["can_cancel"]
|
results, update_time = None, None
|
if can_cancel:
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), None,
|
[huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
|
else:
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), update_time)
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
|
break
|
elif type_ == 'deal_list':
|
# 成交列表
|
results, update_time = huaxin_trade_record_manager.DealRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"))
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results}, "msg": ""})
|
elif type_ == 'position_list':
|
# 持仓股列表
|
results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"))
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results}, "msg": ""})
|
elif type_ == 'money_list':
|
# 资金详情
|
money_data = huaxin_trade_record_manager.MoneyManager.get_data()
|
return_str = json.dumps(
|
{"code": 0, "data": money_data, "msg": ""})
|
elif type_ == 'sync_trade_data':
|
# 同步交易数据
|
sync_type = data_json["data"]["type"]
|
if sync_type == "delegate_list":
|
trade_data_request_queue.put_nowait({"type": "delegate_list"})
|
elif sync_type == "deal_list":
|
trade_data_request_queue.put_nowait({"type": "deal_list"})
|
elif sync_type == "money":
|
trade_data_request_queue.put_nowait({"type": "money"})
|
elif sync_type == "position_list":
|
trade_data_request_queue.put_nowait({"type": "position_list"})
|
return_str = json.dumps(
|
{"code": 0, "data": {}, "msg": ""})
|
# 查询委托列表
|
elif type_ == 'test':
|
# 卖出
|
# trade_api.order(trade_api.TRADE_DIRECTION_SELL, "600854", 100, 5.45)
|
result = trade_api.get_deal_list()
|
print("\n\n---成交列表----")
|
for d in result["data"]:
|
print(d)
|
|
result = trade_api.get_delegate_list(True)
|
print("\n\n---可撤委托----")
|
for d in result["data"]:
|
print(d)
|
result = trade_api.get_delegate_list(False)
|
print("\n\n---全部委托----")
|
for d in result["data"]:
|
print(d)
|
|
result = trade_api.get_position_list()
|
print("\n\n---持仓列表----")
|
for d in result["data"]:
|
print(d)
|
|
result = trade_api.get_money()
|
print("\n\n---账户列表----")
|
for d in result["data"]:
|
print(d)
|
elif type_ == 'test_l2':
|
codes_data = data_json["data"]
|
result = trade_api.set_l2_codes_data(codes_data)
|
print("\n\n---L2设置结果----")
|
print(result)
|
break
|
# sk.close()
|
except Exception as e:
|
logging.exception(e)
|
return_str = json.dumps({"code": 401, "msg": str(e)})
|
break
|
finally:
|
sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8')))
|
|
def finish(self):
|
super().finish()
|
|
|
def __read_trade_data_queue():
|
while True:
|
try:
|
data = trade_data_request_queue.get()
|
if data:
|
type_ = data["type"]
|
hx_logger_trade_debug.info(f"获取交易数据开始:{type_}")
|
|
if type_ == "delegate_list":
|
dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False)
|
print("获取委托列表", dataJSON)
|
if dataJSON["code"] == 0:
|
data = dataJSON["data"]
|
huaxin_trade_record_manager.DelegateRecordManager.add(data)
|
elif type_ == "money":
|
dataJSON = huaxin_trade_api.get_money()
|
if dataJSON["code"] == 0:
|
data = dataJSON["data"]
|
huaxin_trade_record_manager.MoneyManager.save_data(data)
|
if data:
|
usefulMoney = data[0]["usefulMoney"]
|
# 设置可用资金
|
trade_manager.set_available_money(0, usefulMoney)
|
# 设置可用资金
|
elif type_ == "deal_list":
|
dataJSON = huaxin_trade_api.get_deal_list()
|
if dataJSON["code"] == 0:
|
data = dataJSON["data"]
|
huaxin_trade_record_manager.DealRecordManager.add(data)
|
# 持仓股
|
elif type_ == "position_list":
|
dataJSON = huaxin_trade_api.get_position_list()
|
if dataJSON["code"] == 0:
|
data = dataJSON["data"]
|
huaxin_trade_record_manager.PositionManager.add(data)
|
|
hx_logger_trade_debug.info(f"获取交易数据成功:{type_}")
|
except Exception as e:
|
hx_logger_trade_debug.exception(e)
|
finally:
|
# 有1s的间隔
|
time.sleep(1)
|
|
|
def __set_target_codes():
|
while True:
|
try:
|
datas = huaxin_target_codes_manager.pop()
|
if datas:
|
result = huaxin_trade_api.set_l2_codes_data(datas)
|
print("设置L2代码结果:", result)
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(1)
|
|
|
def run():
|
print("create TradeApiServer")
|
# 拉取交易信息
|
t1 = threading.Thread(target=lambda: __read_trade_data_queue(), daemon=True)
|
t1.start()
|
|
t1 = threading.Thread(target=lambda: __set_target_codes(), daemon=True)
|
t1.start()
|
|
laddr = "0.0.0.0", 10009
|
tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
|
|
if __name__ == "__main__":
|
pass
|