import hashlib
|
import json
|
import logging
|
import socket
|
import socketserver
|
import threading
|
import time
|
|
import socket_manager
|
import trade_manager
|
from db import mysql_data, redis_manager
|
from db.redis_manager import RedisUtils
|
from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util
|
from utils.history_k_data_util import HistoryKDatasUtils, JueJinApi
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
def __is_sign_right(self, data_json):
|
list_str = []
|
sign = data_json["sign"]
|
data_json.pop("sign")
|
for k in data_json:
|
list_str.append(f"{k}={data_json[k]}")
|
list_str.sort()
|
__str = "&".join(list_str) + "JiaBei@!*."
|
md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
|
if md5 != sign:
|
raise Exception("签名出错")
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
return_str = ""
|
try:
|
data, header = socket_util.recv_data(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = json.loads(data_str)
|
type_ = data_json['type']
|
if type(type_) == int:
|
# 处理数字型TYPE
|
return_str = self.process_num_type(sk, type_, data_str)
|
break
|
|
is_sign_right = socket_util.is_client_params_sign_right(data_json)
|
# ------客户端请求接口-------
|
if type_ == 'buy':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
volume = codes_data["volume"]
|
price = codes_data["price"]
|
try:
|
if not code:
|
raise Exception("请上传code")
|
if not volume:
|
raise Exception("请上传volume")
|
|
if round(float(price), 2) <= 0:
|
prices = HistoryKDatasUtils.get_now_price([code])
|
if not prices:
|
raise Exception("现价获取失败")
|
price = prices[0][1]
|
# 下单
|
result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code, volume,
|
round(float(price), 2))
|
if result:
|
resultJSON = result
|
print("下单结果:", resultJSON)
|
if resultJSON['code'] == 0:
|
return_str = json.dumps({"code": 0})
|
else:
|
raise Exception(resultJSON['msg'])
|
break
|
except Exception as e:
|
raise e
|
elif type_ == 'cancel_order':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
orderSysID = codes_data.get("orderSysID")
|
accountId = codes_data.get("accountId")
|
if code:
|
result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code,accountId,
|
orderSysID, True)
|
print("---撤单结果----")
|
print(result)
|
if result["code"] == 0:
|
return_str = json.dumps({"code": 0})
|
else:
|
raise Exception(result["msg"])
|
else:
|
return_str = json.dumps({"code": 1, "msg": "请上传代码"})
|
break
|
|
elif type_ == 'sell':
|
# 验证签名
|
if not is_sign_right:
|
raise Exception("签名错误")
|
codes_data = data_json["data"]
|
code = codes_data["code"]
|
volume = codes_data["volume"]
|
price = codes_data["price"]
|
# 是否强制卖0/1
|
force_sell = codes_data["force"]
|
if not price:
|
# 获取现价
|
prices = HistoryKDatasUtils.get_now_price([code])
|
if not prices:
|
raise Exception("现价获取失败")
|
# 已现价的5档价卖
|
price = prices[0][1] - 0.04
|
|
result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_SELL, code, volume,
|
price)
|
if result["code"] == 0:
|
return_str = json.dumps({"code": 0, "msg": ""})
|
else:
|
raise Exception(result["msg"])
|
print("---卖出结果----")
|
print(result)
|
break
|
elif type_ == 'delegate_list':
|
# 委托列表
|
update_time = data_json["data"]["update_time"]
|
# 是否可撤 0/1
|
can_cancel = data_json["data"]["can_cancel"]
|
results, update_time = None, None
|
if can_cancel:
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), None,
|
[huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded])
|
else:
|
results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"), update_time)
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"})
|
break
|
elif type_ == 'deal_list':
|
# 成交列表
|
results = huaxin_trade_record_manager.DealRecordManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"))
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results}, "msg": ""})
|
elif type_ == 'position_list':
|
# 持仓股列表
|
results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day(
|
tool.get_now_date_str("%Y%m%d"))
|
return_str = json.dumps(
|
{"code": 0, "data": {"list": results}, "msg": ""})
|
elif type_ == 'money_list':
|
# 资金详情
|
money_data = huaxin_trade_record_manager.MoneyManager.get_data()
|
return_str = json.dumps(
|
{"code": 0, "data": money_data, "msg": ""})
|
elif type_ == 'sync_trade_data':
|
# 同步交易数据
|
sync_type = data_json["data"]["type"]
|
hosting_api_util.refresh_trade_data(sync_type)
|
return_str = json.dumps(
|
{"code": 0, "data": {}, "msg": ""})
|
elif type_ == "get_huaxin_subscript_codes":
|
# 获取华鑫订阅的代码
|
fresults = global_data_cache_util.huaxin_subscript_codes
|
update_time = global_data_cache_util.huaxin_subscript_codes_update_time
|
if update_time is None:
|
update_time = ''
|
return_str = json.dumps(
|
{"code": 0, "data": {"count": len(fresults), "list": fresults, "update_time": update_time},
|
"msg": ""})
|
pass
|
elif type_ == "export_l2_data":
|
# 导出L2数据
|
code = data_json["data"]["code"]
|
hosting_api_util.export_l2_data(code)
|
return_str = json.dumps(
|
{"code": 0, "data": {}, "msg": ""})
|
elif type_ == 'everyday_init':
|
# 每日初始化
|
hosting_api_util.everyday_init()
|
return_str = json.dumps(
|
{"code": 0, "data": {}, "msg": ""})
|
elif type_ == 'huaxin_channel_state':
|
# 华鑫通道状态
|
types = []
|
fdata = {}
|
return_str = json.dumps(
|
{"code": 0, "data": fdata, "msg": ""})
|
elif type_ == 'juejin_is_valid':
|
# 掘金是否可用
|
try:
|
date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
|
if date:
|
return_str = json.dumps(
|
{"code": 0, "msg": ""})
|
except Exception as e:
|
return_str = json.dumps(
|
{"code": 0, "msg": str(e)})
|
elif type_ == 'get_env_info':
|
# 获取环境信息
|
result = hosting_api_util.get_env_info()
|
return_str = json.dumps(result)
|
elif type_ == 'sync_l1_subscript_codes':
|
# 获取环境信息
|
result = hosting_api_util.sync_l1_subscript_codes()
|
return_str = json.dumps(result)
|
|
elif type_ == 'get_system_logs':
|
# 获取环境信息
|
start_index = data_json["data"]["start_index"]
|
count = data_json["data"]["count"]
|
result = hosting_api_util.get_system_logs(start_index,count)
|
return_str = json.dumps(result)
|
elif type_ == 'test_redis':
|
redis = redis_manager.RedisManager(5).getRedisNoPool()
|
try:
|
_start_time = time.time()
|
times = []
|
for i in range(0, 100):
|
RedisUtils.sadd(redis, "test_set", f"000000:{i}", auto_free=False)
|
times.append(time.time() - _start_time)
|
_start_time = time.time()
|
for i in range(0, 20):
|
RedisUtils.smembers(redis, "test_set", auto_free=False)
|
times.append(time.time() - _start_time)
|
return_str = json.dumps(
|
{"code": 0, "data": times, "msg": ""})
|
finally:
|
redis.close()
|
elif type_ == "trade_server_channels":
|
channels = socket_manager.ClientSocketManager.list_client()
|
return_str = json.dumps({"code": 0, "data": channels})
|
|
break
|
# sk.close()
|
except Exception as e:
|
logging.exception(e)
|
return_str = json.dumps({"code": 401, "msg": str(e)})
|
break
|
finally:
|
sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8')))
|
|
@classmethod
|
def process_num_type(cls, sk, type, _str):
|
return_str = ""
|
try:
|
if type == 201:
|
# 加入黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_BLACK)
|
return_str = json.dumps({"code": 0})
|
elif type == 203:
|
# 移除黑名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_BLACK)
|
return_str = json.dumps({"code": 0})
|
elif type == 301:
|
# 黑名单列表
|
result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_BLACK)
|
return_str = json.dumps(result)
|
|
elif type == 202:
|
# 加入白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
try:
|
for code in codes:
|
hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_WHITE)
|
return_str = json.dumps({"code": 0})
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
|
elif type == 204:
|
# 移除白名单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_WHITE)
|
return_str = json.dumps({"code": 0})
|
|
elif type == 302:
|
# 白名单列表
|
result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_WHITE)
|
return_str = json.dumps(result)
|
|
elif type == 401:
|
# 加入想要买
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_WANT)
|
return_str = json.dumps({"code": 0})
|
|
elif type == 402:
|
# 移除想买单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_WANT)
|
return_str = json.dumps({"code": 0})
|
|
elif type == 403:
|
# 想买单列表
|
result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_WANT)
|
return_str = json.dumps(result)
|
|
elif type == 411:
|
# 加入暂不买
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_PAUSE_BUY)
|
return_str = json.dumps({"code": 0})
|
|
elif type == 412:
|
# 移除暂不买
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
for code in codes:
|
hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_PAUSE_BUY)
|
return_str = json.dumps({"code": 0})
|
|
elif type == 413:
|
# 暂不买列表
|
result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_PAUSE_BUY)
|
return_str = json.dumps(result)
|
|
elif type == 420:
|
# 是否可以撤单
|
data = json.loads(_str)
|
codes = data["data"]["codes"]
|
code = codes[0]
|
result = hosting_api_util.get_code_trade_state(code)
|
state = result["data"]["state"]
|
if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS:
|
return_str = json.dumps({"code": 0, "msg": "可以取消"})
|
else:
|
return_str = json.dumps({"code": 1, "msg": "不可以取消"})
|
|
elif type == 430:
|
# 查询代码属性
|
data = json.loads(_str)
|
code = data["data"]["code"]
|
# 查询是否想买单/白名单/黑名单/暂不买
|
result = hosting_api_util.get_code_attribute(code)
|
return_str = json.dumps(result)
|
|
elif type == 501:
|
# 设置系统交易状态
|
data = json.loads(_str)
|
is_open = data["data"]["open"]
|
if is_open:
|
hosting_api_util.set_trade_state(True)
|
else:
|
hosting_api_util.set_trade_state(False)
|
return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")})
|
|
elif type == 502:
|
# 获取系统交易状态
|
result = hosting_api_util.get_trade_state()
|
return_str = json.dumps(result)
|
elif type == 503:
|
# 设置交易目标代码的模式
|
data = json.loads(_str)
|
mode = data["data"]["mode"]
|
try:
|
hosting_api_util.set_trade_mode(mode)
|
return_str = json.dumps({"code": 0, "data": {"mode": mode}})
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
elif type == 504:
|
# 获取交易目标代码模式
|
result = hosting_api_util.get_trade_mode()
|
return_str = json.dumps(result)
|
except Exception as e:
|
return_str = json.dumps({"code": 1, "msg": str(e)})
|
return return_str
|
|
def finish(self):
|
super().finish()
|
|
|
def run():
|
print("create middle_api_server")
|
laddr = "0.0.0.0", 10009
|
print("middle_api_server is at: http://%s:%d/" % (laddr))
|
tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
|
|
if __name__ == "__main__":
|
pass
|