import datetime
|
import hashlib
|
import io
|
import json
|
import logging
|
import queue
|
import random
|
import socket
|
import socketserver
|
import threading
|
import time
|
|
import dask
|
import psutil
|
import huaxin_client.constant
|
from line_profiler import LineProfiler
|
|
import constant
|
import inited_data
|
import outside_api_command_manager
|
from code_attribute import gpcode_manager
|
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from huaxin_client.client_network import SendResponseSkManager
|
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
|
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
|
from l2.huaxin import huaxin_target_codes_manager
|
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
|
from l2.l2_data_manager_new import L2TradeDataProcessor
|
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
|
hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
|
from third_data import block_info, kpl_api, kpl_data_manager
|
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
|
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
|
from third_data.kpl_data_manager import KPLDataManager
|
from third_data.kpl_util import KPLDataType
|
from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util
|
|
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update
|
from trade.trade_manager import TradeTargetCodeModeManager
|
from utils import socket_util, data_export_util, middle_api_protocol, tool
|
|
trade_data_request_queue = queue.Queue()
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
__TradeBuyQueue = transaction_progress.TradeBuyQueue()
|
__KPLCodeJXBlockManager = KPLCodeJXBlockManager()
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
def __is_sign_right(self, data_json):
|
list_str = []
|
sign = data_json["sign"]
|
data_json.pop("sign")
|
for k in data_json:
|
list_str.append(f"{k}={data_json[k]}")
|
list_str.sort()
|
__str = "&".join(list_str) + "JiaBei@!*."
|
md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
|
if md5 != sign:
|
raise Exception("签名出错")
|
|
@classmethod
|
def getRecvData(cls, skk):
|
data = ""
|
header_size = 10
|
buf = skk.recv(header_size)
|
header_str = buf
|
if buf:
|
start_time = time.time()
|
buf = buf.decode('utf-8')
|
if buf.startswith("##"):
|
content_length = int(buf[2:10])
|
received_size = 0
|
while not received_size == content_length:
|
r_data = skk.recv(10240)
|
received_size += len(r_data)
|
data += r_data.decode('utf-8')
|
else:
|
data = skk.recv(1024 * 1024)
|
data = buf + data.decode('utf-8')
|
# hx_logger_l2_upload.info(f"读取数据耗时:{round((time.time() - start_time) * 1000, 1)}")
|
return data, header_str
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
try:
|
# data = sk.recv(1024*1024, socket.MSG_WAITALL)
|
data, header = self.getRecvData(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = None
|
try:
|
data_json = json.loads(data_str)
|
except json.decoder.JSONDecodeError as e:
|
# JSON解析失败
|
sk.sendall(socket_util.load_header(json.dumps(
|
{"code": 100, "msg": f"JSON解析失败"}).encode(
|
encoding='utf-8')))
|
hx_logger_trade_callback.error(
|
f"json解析失败,字符串长度:{len(data_str)},字符串内容:\"{data_str[:30]}......{data_str[-20:]}\"")
|
continue
|
if data_json["type"] == 'register':
|
client_type = data_json["data"]["client_type"]
|
rid = data_json["rid"]
|
trade_api.ClientSocketManager.add_client(client_type, rid, sk)
|
sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
|
try:
|
# print("客户端", ClientSocketManager.socket_client_dict)
|
while True:
|
result, header = self.getRecvData(sk)
|
try:
|
resultJSON = json.loads(result)
|
if resultJSON["type"] == 'heart':
|
# 记录活跃客户端
|
trade_api.ClientSocketManager.heart(resultJSON['client_id'])
|
else:
|
hx_logger_contact_debug.warning(f"接收到非心跳信息:{result}")
|
except json.decoder.JSONDecodeError as e:
|
if not result:
|
sk.close()
|
print("JSON解析出错", result, header)
|
time.sleep(1)
|
except ConnectionResetError as ee:
|
trade_api.ClientSocketManager.del_client(rid)
|
except Exception as e:
|
logging.exception(e)
|
|
elif data_json["type"] == "response":
|
# 主动触发的响应
|
try:
|
client_id = data_json.get("client_id")
|
hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
|
# 设置响应内容
|
trade_api.set_response(data_json["request_id"], data_json['data'])
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "trade_callback":
|
try:
|
# 交易回调
|
data_json = data_json["data"]
|
type_ = data_json["type"]
|
# 记录交易反馈日志
|
hx_logger_trade_callback.info(data_json)
|
# 重新请求委托列表与资金
|
huaxin_trade_data_update.add_delegate_list()
|
huaxin_trade_data_update.add_deal_list()
|
huaxin_trade_data_update.add_money_list()
|
# print("响应结果:", data_json['data'])
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "l2_order":
|
try:
|
# L2逐笔委托
|
data = data_json["data"]
|
code = data["code"]
|
datas = data["data"]
|
hx_logger_l2_orderdetail.info(f"{code}#{datas}")
|
l2_log.threadIds[code] = random.randint(0, 100000)
|
l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, datas)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
|
elif data_json["type"] == "l2_trans":
|
try:
|
data = data_json["data"]
|
code = data["code"]
|
datas = data["data"]
|
hx_logger_l2_transaction.info(f"{code}#{datas}")
|
if datas:
|
# 设置成交价
|
current_price_process_manager.set_trade_price(code, datas[-1][1])
|
try:
|
buyno_map = l2_data_util.local_today_buyno_map.get(code)
|
hx_logger_l2_transaction.info(
|
f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
|
buy_progress_index = None
|
for i in range(len(datas) - 1, -1, -1):
|
d = datas[i]
|
buy_no = f"{d[6]}"
|
if buyno_map and buy_no in buyno_map:
|
hx_logger_l2_transaction.info(f"{code}成交进度:{buyno_map[buy_no]}")
|
buy_progress_index = buyno_map[buy_no]["index"]
|
break
|
|
# 获取执行位时间
|
buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
|
code)
|
if True:
|
if buy_progress_index is not None:
|
self.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
|
total_datas = l2_data_util.local_today_datas.get(code)
|
num_operate_map = l2_data_util.local_today_num_operate_map.get(
|
code)
|
logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}", code,
|
buy_progress_index)
|
buy_time = total_datas[buy_progress_index]["val"][
|
"time"]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if buy_exec_index:
|
need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
|
buy_progress_index,
|
buy_exec_index,
|
total_datas,
|
num_operate_map,
|
num * 100 * float(
|
limit_up_price),
|
limit_up_price)
|
if need_cancel:
|
L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
|
|
f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
|
buy_exec_index,
|
buy_progress_index,
|
total_datas,
|
num_operate_map)
|
f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
|
buy_progress_index,
|
total_datas)
|
f3 = dask.delayed(
|
deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
|
code,
|
buy_progress_index,
|
total_datas,
|
num_operate_map)
|
dask.compute(f1, f2, f3)
|
except Exception as e:
|
hx_logger_l2_transaction.exception(e)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "l2_market_data":
|
try:
|
data = data_json["data"]
|
code = data["code"]
|
data = data["data"]
|
time_str = f"{data['dataTimeStamp']}"
|
if time_str.startswith("9"):
|
time_str = "0" + time_str
|
time_str = time_str[:6]
|
time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
|
buy_1_price, buy_1_volume = data["buy"][0]
|
sell_1_price, sell_1_volume = data["sell"][0]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
if limit_up_price is not None:
|
# 处理买1,卖1信息
|
code_price_manager.Buy1PriceManager().process(code, buy_1_price, time_str,
|
limit_up_price,
|
sell_1_price, sell_1_volume // 100)
|
pre_close_price = round(float(limit_up_price) / 1.1, 2)
|
# 如果涨幅大于7%就读取板块
|
price_rate = (buy_1_price - pre_close_price) / pre_close_price
|
if price_rate > 0.07:
|
if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code):
|
blocks = kpl_api.getCodeJingXuanBlocks(code)
|
self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks)
|
elif price_rate > 0.03:
|
# 添加备用板块
|
if not self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True):
|
blocks = kpl_api.getCodeJingXuanBlocks(code)
|
self.__KPLCodeJXBlockManager.save_jx_blocks(code, blocks, by=True)
|
|
# 更新板块信息
|
yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
|
CodePlateKeyBuyManager.update_can_buy_blocks(code,
|
kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
|
kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
|
yesterday_codes,
|
block_info.get_before_blocks_dict())
|
|
hx_logger_l2_market_data.info(f"{code}#{data}")
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "l2_subscript_codes":
|
try:
|
data = data_json["data"]
|
datas = data["data"]
|
print("l2_subscript_codes", data_json)
|
# 订阅的代码
|
huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(datas)
|
# 上传数据
|
codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
|
fresults = []
|
if codes:
|
for code in codes:
|
code_name = gpcode_manager.get_code_name(code)
|
fresults.append((code, code_name))
|
|
fdata = middle_api_protocol.load_l2_subscript_codes(fresults)
|
middle_api_protocol.request(fdata)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "get_level1_codes":
|
print("get_level1_codes")
|
# 获取level1的代码
|
list_ = JueJinApi.get_exchanges_codes(["SHSE", "SZSE"])
|
fdatas = []
|
for d in list_:
|
if d["sec_id"].find("60") != 0 and d["sec_id"].find("00") != 0:
|
continue
|
if d["sec_level"] != 1:
|
continue
|
if d["pre_close"] * 1.1 > constant.MAX_SUBSCRIPT_CODE_PRICE:
|
continue
|
if (d["listed_date"] + datetime.timedelta(
|
days=100)).timestamp() > datetime.datetime.now().timestamp():
|
continue
|
fdatas.append(d["sec_id"])
|
sk.sendall(
|
socket_util.load_header(json.dumps({"code": 0, "data": fdatas}).encode(encoding='utf-8')))
|
|
elif data_json["type"] == "set_target_codes":
|
try:
|
TradeServerProcessor.set_target_codes(data_json)
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "trading_order_canceled":
|
data = data_json["data"]
|
code = data["code"]
|
order_no = data["data"]
|
hx_logger_l2_upload.info(f"{code}-正在成交的订单撤单,order_no:{order_no}")
|
# buyno_map = l2_data_util.local_today_buyno_map.get(code)
|
# if buyno_map:
|
# l2_data = buyno_map.get(order_no)
|
# buyno_map.get(order_no)
|
# 执行撤单
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "G撤撤单", "G撤")
|
else:
|
# 断开连接
|
break
|
# sk.close()
|
except Exception as e:
|
logging.exception(e)
|
break
|
|
def finish(self):
|
super().finish()
|
|
|
# 交易服务处理器
|
class TradeServerProcessor:
|
# 设置目标代码
|
@classmethod
|
def set_target_codes(cls, data_json):
|
data = data_json["data"]
|
datas = data["data"]
|
HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas)
|
|
|
def clear_invalid_client():
|
while True:
|
try:
|
huaxin_trade_api.ClientSocketManager.del_invalid_clients()
|
except:
|
pass
|
finally:
|
time.sleep(2)
|
|
|
def __recv_pipe_l1(pipe_l1):
|
if pipe_l1 is not None:
|
while True:
|
try:
|
val = pipe_l1.recv()
|
if val:
|
val = json.loads(val)
|
print("收到来自L1的数据:", val["type"])
|
# 处理数据
|
type_ = val["type"]
|
if type_ == "set_target_codes":
|
TradeServerProcessor.set_target_codes(val)
|
except Exception as e:
|
logging.exception(e)
|
|
|
class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback):
|
@classmethod
|
def __send_response(cls, data_bytes):
|
sk = SendResponseSkManager.create_send_response_sk(addr=huaxin_client.constant.SERVER_IP,
|
port=huaxin_client.constant.SERVER_PORT)
|
try:
|
data_bytes = socket_util.load_header(data_bytes)
|
sk.sendall(data_bytes)
|
result, header_str = socket_util.recv_data(sk)
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result['msg'])
|
finally:
|
sk.close()
|
|
def send_response(self, data, _client_id, _request_id):
|
data_bytes = json.dumps({"type": "response", "data": data, "client_id": _client_id,
|
"request_id": _request_id}).encode('utf-8')
|
for i in range(3):
|
try:
|
self.__send_response(data_bytes)
|
print("发送数据成功")
|
break
|
except Exception as e1:
|
pass
|
|
# 交易
|
def OnTrade(self, client_id, request_id, data):
|
try:
|
trade_type = data["trade_type"]
|
if trade_type == outside_api_command_manager.TRADE_TYPE_ORDER:
|
code = data["code"]
|
direction = data["direction"]
|
volume = data["volume"]
|
price_type = data["price_type"]
|
price = data["price"]
|
sinfo = data["sinfo"]
|
result = huaxin_trade_api.order(direction, code, volume, price, price_type=price_type, sinfo=sinfo,
|
blocking=True, request_id=request_id)
|
self.send_response({"code": 0, "data": result}, client_id, request_id)
|
elif trade_type == outside_api_command_manager.TRADE_TYPE_CANCEL_ORDER:
|
code = data["code"]
|
direction = data["direction"]
|
accountID = data["accountID"]
|
orderSysID = data["orderSysID"]
|
sinfo = data["sinfo"]
|
result = huaxin_trade_api.cancel_order(direction, accountID, orderSysID, sinfo=sinfo,
|
blocking=True, request_id=request_id)
|
self.send_response({"code": 0, "data": result}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
# 交易状态
|
def OnTradeState(self, client_id, request_id, data):
|
try:
|
operate = data["operate"]
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
state = data["state"]
|
if state:
|
trade_manager.TradeStateManager().open_buy()
|
else:
|
trade_manager.TradeStateManager().close_buy()
|
self.send_response({"code": 0, "msg": ("开启成功" if state else "关闭成功")}, client_id, request_id)
|
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
can_buy = trade_manager.TradeStateManager().is_can_buy_cache()
|
self.send_response({"code": 0, "data": {"can_buy": can_buy}}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
# 交易模式
|
def OnTradeMode(self, client_id, request_id, data):
|
try:
|
operate = data["operate"]
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
mode = data["mode"]
|
TradeTargetCodeModeManager().set_mode(mode)
|
self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
mode = TradeTargetCodeModeManager().get_mode_cache()
|
self.send_response({"code": 0, "data": {"mode": mode}}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
# 代码名单
|
def OnCodeList(self, client_id, request_id, data):
|
try:
|
code_list_type = data["code_list_type"]
|
operate = data["operate"]
|
code = data.get("code")
|
fresult = {"code": 0}
|
if code_list_type == outside_api_command_manager.CODE_LIST_WANT:
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
gpcode_manager.WantBuyCodesManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
elif operate == outside_api_command_manager.OPERRATE_DELETE:
|
gpcode_manager.WantBuyCodesManager().remove_code(code)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
fresult = {"code": 0, "data": datas}
|
elif code_list_type == outside_api_command_manager.CODE_LIST_BLACK:
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
l2_trade_util.forbidden_trade(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
elif operate == outside_api_command_manager.OPERRATE_DELETE:
|
l2_trade_util.remove_from_forbidden_trade_codes(code)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
codes = l2_trade_util.BlackListCodeManager().list_codes_cache()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
fresult = {"code": 0, "data": datas}
|
elif code_list_type == outside_api_command_manager.CODE_LIST_WHITE:
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
l2_trade_util.WhiteListCodeManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
elif operate == outside_api_command_manager.OPERRATE_DELETE:
|
l2_trade_util.WhiteListCodeManager().remove_code(code)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
codes = l2_trade_util.WhiteListCodeManager().list_codes_cache()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
fresult = {"code": 0, "data": datas}
|
|
elif code_list_type == outside_api_command_manager.CODE_LIST_PAUSE_BUY:
|
if operate == outside_api_command_manager.OPERRATE_SET:
|
gpcode_manager.PauseBuyCodesManager().add_code(code)
|
name = gpcode_manager.get_code_name(code)
|
if not name:
|
results = HistoryKDatasUtils.get_gp_codes_names([code])
|
if results:
|
gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
|
elif operate == outside_api_command_manager.OPERRATE_DELETE:
|
gpcode_manager.PauseBuyCodesManager().remove_code(code)
|
elif operate == outside_api_command_manager.OPERRATE_GET:
|
codes = gpcode_manager.PauseBuyCodesManager().list_code_cache()
|
datas = []
|
for code in codes:
|
name = gpcode_manager.get_code_name(code)
|
datas.append(f"{name}:{code}")
|
fresult = {"code": 0, "data": datas}
|
self.send_response(fresult, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnExportL2(self, client_id, request_id, data):
|
try:
|
code = data["code"]
|
excel_file_name = data_export_util.export_l2_excel(code)
|
print("导出L2数据目录:",excel_file_name)
|
self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnEveryDayInit(self, client_id, request_id, data):
|
try:
|
inited_data.everyday_init()
|
self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnRefreshTradeData(self, client_id, request_id, data):
|
try:
|
sync_type = data["ctype"]
|
if sync_type == "delegate_list":
|
huaxin_trade_data_update.add_delegate_list()
|
elif sync_type == "deal_list":
|
huaxin_trade_data_update.add_deal_list()
|
elif sync_type == "money":
|
huaxin_trade_data_update.add_money_list()
|
elif sync_type == "position_list":
|
huaxin_trade_data_update.add_position_list()
|
self.send_response({"code": 0, "data": {}, "msg": ""}, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnGetCodeAttribute(self, client_id, request_id, data):
|
try:
|
code = data["code"]
|
# 查询是否想买单/白名单/黑名单/暂不买
|
code_name = gpcode_manager.get_code_name(code)
|
want = gpcode_manager.WantBuyCodesManager().is_in_cache(code)
|
white = l2_trade_util.WhiteListCodeManager().is_in_cache(code)
|
black = l2_trade_util.is_in_forbidden_trade_codes(code)
|
pause_buy = gpcode_manager.PauseBuyCodesManager().is_in_cache(code)
|
|
desc_list = []
|
if want:
|
desc_list.append("【想买单】")
|
if white:
|
desc_list.append("【白名单】")
|
if black:
|
desc_list.append("【黑名单】")
|
if pause_buy:
|
desc_list.append("【暂不买】")
|
result = {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}}
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnGetCodeTradeState(self, client_id, request_id, data):
|
try:
|
code = data["code"]
|
state = trade_manager.CodesTradeStateManager().get_trade_state(code)
|
result = {"code": 0, "data": {"state": state}}
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
def OnGetEnvInfo(self, client_id, request_id, data):
|
try:
|
fdata = {}
|
try:
|
date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
|
if date:
|
fdata["juejin"] = 1
|
except Exception as e:
|
fdata["juejin"] = 0
|
fdata["kpl"] = {}
|
# 获取开盘啦数据
|
kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
|
KPLDataType.INDUSTRY_RANK.value]
|
for kpl_type in kpl_types:
|
if kpl_type in KPLDataManager.kpl_data_update_info:
|
fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
|
|
try:
|
# 验证redis
|
RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
|
fdata["redis"] = 1
|
except:
|
fdata["redis"] = 0
|
|
try:
|
# 验证mysql
|
mysql_data.Mysqldb().select_one("select 1")
|
fdata["mysql"] = 1
|
except:
|
fdata["mysql"] = 0
|
|
try:
|
# redis异步任务数量
|
fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
|
except:
|
pass
|
# 获取CPU与内存适用情况
|
memory_info = psutil.virtual_memory()
|
cpu_percent = psutil.cpu_percent(interval=1)
|
fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
|
# 获取交易通道
|
try:
|
can_access = huaxin_trade_api.test_trade_channel()
|
fdata["trade_channel_access"] = 1 if can_access else 0
|
except:
|
fdata["trade_channel_access"] = 0
|
|
result = {"code": 0, "data": fdata, "msg": ""}
|
print("OnGetEnvInfo 成功")
|
self.send_response(result, client_id, request_id)
|
except Exception as e:
|
self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
|
|
|
def run(pipe_trade, pipe_l1):
|
# 执行一些初始化数据
|
block_info.init()
|
|
# 启动外部接口监听
|
manager = outside_api_command_manager.ApiCommandManager()
|
manager.init(middle_api_protocol.SERVER_HOST,
|
middle_api_protocol.SERVER_PORT,
|
OutsideApiCommandCallback())
|
manager.run(blocking=False)
|
|
# 启动交易服务
|
huaxin_trade_api.run_pipe_trade(pipe_trade)
|
|
# 监听l1那边传过来的代码
|
t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True)
|
t1.start()
|
|
print("create TradeServer")
|
t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
|
t1.start()
|
|
laddr = "0.0.0.0", 10008
|
tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
|
|
if __name__ == "__main__":
|
|
# l2.l2_data_util.save_l2_data(code, None, datas)
|
|
# ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "1", None)
|
# ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "2", None)
|
# ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "3", None)
|
#
|
# for i in range(0, 3):
|
# t1 = threading.Thread(target=lambda: test1())
|
# t1.setDaemon(True)
|
# t1.start()
|
#
|
# for i in range(0, 3):
|
# t1 = threading.Thread(target=lambda: test2())
|
# t1.setDaemon(True)
|
# t1.start()
|
#
|
|
while True:
|
time.sleep(10)
|