import concurrent.futures
|
import datetime
|
import hashlib
|
import json
|
import logging
|
import multiprocessing
|
import queue
|
import random
|
import socket
|
import socketserver
|
import threading
|
import time
|
|
import schedule
|
|
import constant
|
import outside_api_command_manager
|
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
|
from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader, zyltgb_util
|
from code_attribute.code_l1_data_manager import L1DataManager
|
from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager
|
from huaxin_client import l2_data_transform_protocol
|
from huaxin_client.trade_transform_protocol import TradeResponse
|
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \
|
l2_data_source_util, l2_data_log
|
from l2.cancel_buy_strategy import GCancelBigNumComputer, \
|
DCancelBigNumComputer
|
from l2.code_price_manager import Buy1PriceManager
|
from l2.huaxin import huaxin_target_codes_manager, l2_huaxin_util
|
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
|
from l2.l2_data_manager import TradePointManager, OrderBeginPosInfo
|
from l2.l2_data_manager_new import L2TradeDataProcessor
|
from l2.l2_data_util import L2DataUtil
|
from l2.l2_sell_manager import L2MarketSellManager
|
from l2.l2_transaction_data_processor import HuaXinTransactionDatasProcessor
|
from l2.place_order_single_data_manager import L2TradeSingleCallback, L2TradeSingleDataManager
|
from log_module import async_log_util, log_export
|
from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \
|
hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
|
logger_system, logger_trade, logger_local_huaxin_l1_trade_info, logger_l2_codes_subscript, logger_l2_radical_buy
|
from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager
|
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager, RealTimeKplMarketData
|
from third_data.history_k_data_util import JueJinApi
|
from trade import l2_trade_util, \
|
trade_data_manager, trade_constant, buy_open_limit_up_strategy
|
from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy
|
from trade.buy_money_count_setting import BuyMoneyAndCountSetting, BuyMoneyUtil
|
|
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \
|
huaxin_trade_record_manager, huaxin_sell_util
|
from api.outside_api_command_callback import OutsideApiCommandCallback
|
from trade.huaxin.huaxin_trade_record_manager import DelegateRecordManager
|
from trade.order_statistic import DealAndDelegateWithBuyModeDataManager
|
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager
|
from trade.sell.sell_rule_manager import TradeRuleManager
|
from trade.trade_data_manager import RadicalBuyDealCodesManager
|
from trade.trade_manager import CodesTradeStateManager
|
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util, init_data_util
|
|
trade_data_request_queue = queue.Queue()
|
|
|
class MyTCPServer(socketserver.TCPServer):
|
def __init__(self, server_address, RequestHandlerClass):
|
socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
|
|
|
# 如果使用异步的形式则需要再重写ThreadingTCPServer
|
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
|
|
|
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
|
__inited = False
|
__TradeBuyQueue = transaction_progress.TradeBuyQueue()
|
__KPLCodeJXBlockManager = KPLCodeJXBlockManager()
|
__GCancelBigNumComputer = GCancelBigNumComputer()
|
|
def setup(self):
|
self.__init()
|
|
@classmethod
|
def __init(cls):
|
if cls.__inited:
|
return True
|
cls.__inited = True
|
cls.__req_socket_dict = {}
|
|
def __is_sign_right(self, data_json):
|
list_str = []
|
sign = data_json["sign"]
|
data_json.pop("sign")
|
for k in data_json:
|
list_str.append(f"{k}={data_json[k]}")
|
list_str.sort()
|
__str = "&".join(list_str) + "JiaBei@!*."
|
md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
|
if md5 != sign:
|
raise Exception("签名出错")
|
|
@classmethod
|
def getRecvData(cls, skk):
|
data = ""
|
header_size = 10
|
buf = skk.recv(header_size)
|
header_str = buf
|
if buf:
|
start_time = time.time()
|
buf = buf.decode('utf-8')
|
if buf.startswith("##"):
|
content_length = int(buf[2:10])
|
received_size = 0
|
while not received_size == content_length:
|
r_data = skk.recv(10240)
|
received_size += len(r_data)
|
data += r_data.decode('utf-8')
|
else:
|
data = skk.recv(1024 * 1024)
|
data = buf + data.decode('utf-8')
|
# hx_logger_l2_upload.info(f"读取数据耗时:{round((time.time() - start_time) * 1000, 1)}")
|
return data, header_str
|
|
def handle(self):
|
host = self.client_address[0]
|
super().handle()
|
sk: socket.socket = self.request
|
while True:
|
try:
|
# data = sk.recv(1024*1024, socket.MSG_WAITALL)
|
data, header = self.getRecvData(sk)
|
if data:
|
data_str = data
|
# print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
|
data_json = None
|
try:
|
data_json = json.loads(data_str)
|
except json.decoder.JSONDecodeError as e:
|
# JSON解析失败
|
sk.sendall(socket_util.load_header(json.dumps(
|
{"code": 100, "msg": f"JSON解析失败"}).encode(
|
encoding='utf-8')))
|
hx_logger_trade_callback.error(
|
f"json解析失败,字符串长度:{len(data_str)},字符串内容:\"{data_str[:30]}......{data_str[-20:]}\"")
|
continue
|
if data_json["type"] == 'register':
|
client_type = data_json["data"]["client_type"]
|
rid = data_json["rid"]
|
trade_api.ClientSocketManager.add_client(client_type, rid, sk)
|
sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
|
try:
|
# print("客户端", ClientSocketManager.socket_client_dict)
|
while True:
|
result, header = self.getRecvData(sk)
|
try:
|
resultJSON = json.loads(result)
|
if resultJSON["type"] == 'heart':
|
# 记录活跃客户端
|
trade_api.ClientSocketManager.heart(resultJSON['client_id'])
|
else:
|
hx_logger_contact_debug.warning(f"接收到非心跳信息:{result}")
|
except json.decoder.JSONDecodeError as e:
|
if not result:
|
sk.close()
|
# print("JSON解析出错", result, header)
|
time.sleep(1)
|
except ConnectionResetError as ee:
|
trade_api.ClientSocketManager.del_client(rid)
|
except Exception as e:
|
logging.exception(e)
|
|
elif data_json["type"] == "response":
|
# 主动触发的响应
|
try:
|
my_trade_response.OnTradeResponse(data_json)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "trade_callback":
|
try:
|
# 交易回调
|
my_trade_response.OnTradeCallback(data_json)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "l2_order":
|
try:
|
# L2逐笔委托
|
data = data_json["data"]
|
code = data["code"]
|
timestamp = data.get("time")
|
datas = data["data"]
|
TradeServerProcessor.l2_order(code, datas, timestamp)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
|
elif data_json["type"] == "l2_trans":
|
try:
|
data = data_json["data"]
|
code = data["code"]
|
datas = data["data"]
|
TradeServerProcessor.l2_transaction(code, datas)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "l2_market_data":
|
try:
|
data = data_json["data"]
|
code = data["code"]
|
data = data["data"]
|
TradeServerProcessor.l2_market_data(code, data)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "l2_subscript_codes":
|
try:
|
data = data_json["data"]
|
datas = data["data"]
|
# print("l2_subscript_codes", data_json)
|
# 订阅的代码
|
huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(datas)
|
# 上传数据
|
codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
|
l2_log.codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
|
|
fresults = []
|
if codes:
|
for code in codes:
|
code_name = gpcode_manager.get_code_name(code)
|
fresults.append((code, code_name))
|
|
fdata = middle_api_protocol.load_l2_subscript_codes(fresults)
|
middle_api_protocol.request(fdata)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "get_level1_codes":
|
# print("get_level1_codes")
|
# 获取level1的代码
|
list_ = JueJinApi.get_exchanges_codes(["SHSE", "SZSE"])
|
fdatas = []
|
code_name_map = {}
|
for d in list_:
|
if not tool.is_target_code(d["sec_id"]):
|
continue
|
if d["sec_level"] != 1:
|
continue
|
|
# if d["pre_close"] * tool.get_limit_up_rate(d["sec_id"]) > constant.MAX_SUBSCRIPT_CODE_PRICE:
|
# continue
|
if (d["listed_date"] + datetime.timedelta(
|
days=100)).timestamp() > datetime.datetime.now().timestamp():
|
continue
|
fdatas.append(d["sec_id"])
|
code_name_map[d["sec_id"]] = d["sec_name"]
|
# 保存代码名称
|
CodesNameManager().add_code_names(code_name_map)
|
|
sk.sendall(
|
socket_util.load_header(json.dumps({"code": 0, "data": fdatas}).encode(encoding='utf-8')))
|
|
elif data_json["type"] == "set_target_codes":
|
try:
|
TradeServerProcessor.set_target_codes(data_json)
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
elif data_json["type"] == "trading_order_canceled":
|
try:
|
logger_l2_g_cancel.info(f"正在成交的订单撤单,data:{data_json}")
|
data = data_json["data"]
|
code = data["code"]
|
order_no = data["data"]
|
TradeServerProcessor.trading_order_canceled(code, order_no)
|
finally:
|
sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
|
|
|
else:
|
# 断开连接
|
break
|
# sk.close()
|
except Exception as e:
|
logging.exception(e)
|
break
|
|
def finish(self):
|
super().finish()
|
|
|
# 交易服务处理器
|
class TradeServerProcessor:
|
__TradeBuyQueue = transaction_progress.TradeBuyQueue()
|
__KPLCodeJXBlockManager = KPLCodeJXBlockManager()
|
__GCancelBigNumComputer = GCancelBigNumComputer()
|
__sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
__process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
__updating_jx_blocks_codes = set()
|
|
@classmethod
|
def sell(cls, datas):
|
rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL])
|
excuted_rule_ids = set()
|
if rules:
|
for d in datas:
|
code = d[0]
|
# 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
|
buy1_volume = d[6]
|
buy1_price = d[5]
|
if buy1_volume:
|
for r in rules:
|
# 生效时间
|
if r.code == code:
|
# --------判断是否可以执行--------
|
can_excute = False
|
if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2):
|
# 价格已经触发
|
if r.buy1_volume:
|
if r.buy1_volume >= buy1_volume:
|
# 量价触发
|
can_excute = True
|
async_log_util.info(logger_trade, f"触发卖规则:量触发{buy1_volume}/{r.buy1_volume}")
|
else:
|
can_excute = True
|
async_log_util.info(logger_trade, f"触发卖规则:价格触发{buy1_price}/{r.buy1_price}")
|
# 价格触发
|
# 获取价格类型
|
if not can_excute:
|
continue
|
|
# 请求卖出锁
|
TradeRuleManager().require_sell_lock(r.id_)
|
try:
|
if r.id_ in excuted_rule_ids:
|
continue
|
excuted_rule_ids.add(r.id_)
|
# 获取最新的执行状况
|
r = TradeRuleManager().get_by_id(r.id_)
|
if r.excuted:
|
continue
|
# 提交卖
|
limit_down_price = gpcode_manager.get_limit_down_price(code)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price,
|
limit_down_price,
|
buy1_price)
|
TradeRuleManager().excuted(r.id_)
|
except Exception as e:
|
logger_debug.exception(e)
|
finally:
|
TradeRuleManager().release_sell_lock(r.id_)
|
|
# 保存现价
|
@classmethod
|
def __save_l1_current_price(cls, datas):
|
before_trade = int(tool.get_now_time_str().replace(":", "")) < int("092900")
|
for d in datas:
|
code = d[0]
|
# 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
|
price = d[1]
|
L1DataManager.set_l1_current_price(code, price)
|
if before_trade:
|
# 开盘前保存开盘价
|
L1DataManager.set_open_price(code, price)
|
huaxin_l1_data_manager.set_buy1_data(code, d[5], d[6])
|
|
@classmethod
|
def __process_buy_open_limit_up_datas(cls, datas):
|
"""
|
处理排1的数据
|
@param datas: [(代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)]
|
@return:
|
"""
|
# 9:25之后不再处理
|
if tool.get_now_time_as_int() > int("092500"):
|
return
|
for d in datas:
|
if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(d[0]):
|
# 09:19:50 到 09:20:00判断是否要撤单
|
if int("091950") <= int(d[9].replace(":", "")) < int("092000"):
|
async_log_util.info(logger_debug, f"排1撤单:{d},封单:{d[8] * d[5]}")
|
if d[8] * d[5] < 1e8:
|
code = d[0]
|
current_delegates = DelegateRecordManager().list_current_delegates(code)
|
if current_delegates:
|
for c in current_delegates:
|
huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code,
|
c["orderSysID"])
|
|
# 获取L1现价
|
@classmethod
|
def get_l1_current_price(cls, code):
|
return L1DataManager.get_l1_current_price(code)
|
|
# 设置目标代码
|
@classmethod
|
def set_target_codes(cls, data_json):
|
data = data_json["data"]
|
request_id = data_json["request_id"]
|
datas = data["data"]
|
cls.__save_l1_current_price(datas)
|
cls.__process_buy_open_limit_up_datas(datas)
|
# 9:30之前采用非线程
|
# TODO 测试
|
if int(tool.get_now_time_str().replace(":", "")) < int("093000") or True:
|
HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)
|
else:
|
cls.__process_l1_data_thread_pool.submit(
|
lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id))
|
|
@classmethod
|
def set_l1_trade_codes_info(cls, data_json):
|
data = data_json["data"]
|
request_id = data_json["request_id"]
|
datas = data["data"]
|
cls.__save_l1_current_price(datas)
|
cls.sell(datas)
|
|
@classmethod
|
def l2_order(cls, code, _datas, timestamp):
|
if not constant.L2_DATA_IS_LOADED:
|
logger_debug.info(f"{code}还未载入L2本地数据")
|
return
|
|
now_time = time.time()
|
use_time = int((now_time - timestamp) * 1000)
|
thread_id = random.randint(0, 100000)
|
l2_log.threadIds[code] = thread_id
|
l2_data_count = len(_datas)
|
l2_log.info(code, hx_logger_l2_orderdetail,
|
f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}")
|
|
# l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
|
try:
|
l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
|
finally:
|
use_time = time.time() - now_time
|
if use_time > 0.008:
|
l2_data_log.l2_time_log(code,
|
f"处理L2逐笔委托结束:处理数据数量: {l2_data_count} 最终处理时间:{round(use_time * 1000, 2)}ms")
|
|
@classmethod
|
def l2_transaction(cls, code, datas):
|
# async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}")
|
if datas:
|
HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas(code, datas)
|
|
@classmethod
|
def l2_market_data(cls, code, data):
|
|
def update_kpl_jx_block(code_, buy_1_price_, limit_up_price_):
|
# ----------------------------------板块相关------------------------------
|
try:
|
if code_ in cls.__updating_jx_blocks_codes:
|
return
|
cls.__updating_jx_blocks_codes.add(code_)
|
cls.__KPLCodeJXBlockManager.load_jx_blocks(code_, buy_1_price_, limit_up_price_,
|
kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reasons())
|
# 更新板块信息
|
latest_current_limit_up_records = kpl_data_manager.get_latest_current_limit_up_records()
|
CodePlateKeyBuyManager.update_can_buy_blocks(code_,
|
kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
|
kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
|
latest_current_limit_up_records,
|
block_info.get_before_blocks_dict(),
|
kpl_data_manager.KPLLimitUpDataRecordManager.get_current_limit_up_reason_codes_dict())
|
finally:
|
cls.__updating_jx_blocks_codes.discard(code_)
|
|
time_str = f"{data['dataTimeStamp']}"
|
if time_str.startswith("9"):
|
time_str = "0" + time_str
|
time_str = time_str[:6]
|
time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
|
buy_1_price, buy_1_volume = data["buy"][0]
|
sell_1_price, sell_1_volume = data["sell"][0]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
|
code_price_manager.Buy1PriceManager().set_latest_buy1_money(code, buy_1_price, buy_1_volume)
|
|
# -----------------------判断是是否有自动撤单规则-----------------------
|
try:
|
if DCancelBigNumComputer().has_auto_cancel_rules(code):
|
need_cancel, rule_id = DCancelBigNumComputer().need_cancel(code, buy_1_volume)
|
if need_cancel:
|
try:
|
l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, f"盯封单撤:{time_str}-{buy_1_volume}",
|
cancel_type=trade_constant.CANCEL_TYPE_D)
|
finally:
|
TradeRuleManager().excuted(rule_id)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
if limit_up_price is not None:
|
average_rate = None
|
try:
|
average_price = data["totalValueTrade"] / data["totalVolumeTrade"]
|
pre_close_price = CodePrePriceManager.get_price_pre_cache(code)
|
average_rate = round((average_price - pre_close_price) / pre_close_price, 4)
|
except:
|
pass
|
# 处理买1,卖1信息
|
code_price_manager.Buy1PriceManager().process(code, buy_1_price, buy_1_volume, time_str,
|
limit_up_price,
|
sell_1_price, sell_1_volume // 100, average_rate)
|
latest_3m_buy1_money_list = code_price_manager.Buy1PriceManager().get_latest_3m_buy1_money_list(code)
|
# -----------------------------重新计算L撤后---------------------------
|
# 暂时不更新,无意义
|
# 如果时涨停状态
|
# if abs(float(limit_up_price) - float(buy_1_price)) < 0.001:
|
# # 是否处于下单状态
|
# state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
|
# if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or constant.TEST:
|
# if latest_3m_buy1_money_list and tool.trade_time_sub(latest_3m_buy1_money_list[-1][0],
|
# latest_3m_buy1_money_list[0][0]) >= 2 * 60:
|
# # 2分钟以内,标准差在10%以内
|
# c_start_index = None
|
# for i in range(len(latest_3m_buy1_money_list) - 1, -1, -1):
|
# if tool.trade_time_sub(latest_3m_buy1_money_list[-1][0],
|
# latest_3m_buy1_money_list[i][0]) >= 2 * 60:
|
# c_start_index = i
|
# break
|
# if c_start_index is not None:
|
# latest_3m_buy1_money_list = copy.deepcopy(latest_3m_buy1_money_list[c_start_index:])
|
# latest_3m_buy1_money_list = [x[1] for x in latest_3m_buy1_money_list]
|
# avg_val = numpy.mean(numpy.array(latest_3m_buy1_money_list))
|
# max_val = max(latest_3m_buy1_money_list)
|
# min_val = min(latest_3m_buy1_money_list)
|
# if abs(max_val - avg_val) / avg_val < 0.1 and abs(min_val - avg_val) / avg_val < 0.1:
|
# # 买1封单额平稳
|
# LCancelBigNumComputer().re_compute_l_down_watch_indexes(code)
|
|
threading.Thread(target=lambda: update_kpl_jx_block(code, buy_1_price, limit_up_price), daemon=True).start()
|
|
async_log_util.info(hx_logger_l2_market_data, f"{code}#{data}")
|
|
sell_1_info = data["sell"][0] if data.get("sell") else None
|
L2MarketSellManager().set_current_total_sell_data(code, time_str,
|
data["totalAskVolume"] * data["avgAskPrice"],
|
data["totalAskVolume"], sell_1_info, data.get("sell"))
|
|
@classmethod
|
def trading_order_canceled(cls, code, order_no):
|
pass
|
|
@classmethod
|
def test_sell(cls):
|
# (代码, 现价, 涨幅, 量, 更新时间, 买1价格, 买1量)
|
datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210),
|
("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)]
|
cls.sell(datas)
|
|
|
def clear_invalid_client():
|
logger_system.info(f"trade_server clear_invalid_client 线程ID:{tool.get_thread_id()}")
|
while True:
|
try:
|
huaxin_trade_api.ClientSocketManager.del_invalid_clients()
|
except:
|
pass
|
finally:
|
time.sleep(2)
|
|
|
def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue):
|
logger_system.info(f"trade_server __recv_pipe_l1 线程ID:{tool.get_thread_id()}")
|
if queue_l1_w_strategy_r is not None:
|
while True:
|
try:
|
val = queue_l1_w_strategy_r.get()
|
if val:
|
val = json.loads(val)
|
# print("收到来自L1的数据:", val["type"])
|
# 处理数据
|
type_ = val["type"]
|
timestamp = val.get("time")
|
# 大于10s的数据放弃处理
|
if type_ == "set_target_codes":
|
async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据")
|
if time.time() * 1000 - timestamp > 10 * 1000:
|
continue
|
TradeServerProcessor.set_target_codes(val)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
|
# 排得太远撤单
|
def __cancel_buy_for_too_far():
|
while True:
|
try:
|
# 获取账户可用资金
|
account_available_money = trade_data_manager.AccountMoneyManager().get_available_money_cache()
|
if account_available_money is not None and account_available_money > constant.BUY_MONEY_PER_CODE:
|
continue
|
can_cancel_codes = []
|
current_delegates = huaxin_trade_record_manager.DelegateRecordManager().list_current_delegates()
|
for c in current_delegates:
|
if int(c["direction"]) != huaxin_util.TORA_TSTP_D_Buy:
|
continue
|
code = c["securityID"]
|
# 获取下单位置信息
|
order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code)
|
if order_begin_pos is None or order_begin_pos.buy_single_index is None:
|
continue
|
total_datas = l2_data_util.local_today_datas.get(code)
|
if not total_datas:
|
continue
|
if order_begin_pos.buy_exec_index < 0:
|
continue
|
if tool.trade_time_sub(tool.get_now_time_str(),
|
total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) < 60:
|
continue
|
trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code)
|
# 下单位置
|
place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code)
|
|
# 获取剩下的笔数
|
total_left_num = 0
|
for i in range(trade_index + 1, place_order_index):
|
data = total_datas[i]
|
val = data["val"]
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val["num"] * float(val["price"]) < 5000:
|
continue
|
left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
|
i,
|
total_datas,
|
l2_data_util.local_today_canceled_buyno_map.get(
|
code))
|
if left_count > 0:
|
total_left_num += val["num"] * left_count
|
# 获取封单额
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
buy1_money = Buy1PriceManager().get_latest_buy1_money(code)
|
if buy1_money is None:
|
buy1_money = 0
|
if buy1_money > 0:
|
total_left_money = total_left_num * 100 * float(limit_up_price)
|
rate = total_left_money / buy1_money
|
if rate > 0.5:
|
can_cancel_codes.append((code, rate))
|
if can_cancel_codes:
|
can_cancel_codes.sort(key=lambda x: x[1], reverse=True)
|
# 暂时取消
|
# l2_data_manager_new.L2TradeDataProcessor.cancel_buy(can_cancel_codes[0][0], "下单距离太远")
|
except Exception as e:
|
logger_debug.exception(e)
|
finally:
|
time.sleep(3)
|
|
|
def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue):
|
logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}")
|
if queue_l1_trade_w_strategy_r is not None:
|
while True:
|
try:
|
val = queue_l1_trade_w_strategy_r.get()
|
if val:
|
async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}")
|
val = json.loads(val)
|
# print("收到来自L1的数据:", val["type"])
|
# 处理数据
|
type_ = val["type"]
|
if type_ == "upload_l1_trade_datas":
|
# 处理专为交易提供的L1数据
|
TradeServerProcessor.set_l1_trade_codes_info(val)
|
async_log_util.info(logger_local_huaxin_l1_trade_info, val)
|
|
except Exception as e:
|
logger_local_huaxin_l1_trade_info.exception(e)
|
logging.exception(e)
|
|
|
class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack):
|
def OnL2Order(self, code, datas, timestamp):
|
TradeServerProcessor.l2_order(code, datas, timestamp)
|
|
def OnL2Transaction(self, code, datas):
|
TradeServerProcessor.l2_transaction(code, datas)
|
|
def OnMarketData(self, code, data):
|
try:
|
TradeServerProcessor.l2_market_data(code, data)
|
except Exception as e:
|
logger_debug.exception(e)
|
|
def OnTradingOrderCancel(self, code, buy_no):
|
TradeServerProcessor.trading_order_canceled(code, buy_no)
|
|
|
class MyTradeResponse(TradeResponse):
|
|
def OnTradeCallback(self, data_json):
|
data_json = data_json["data"]
|
type_ = data_json["type"]
|
if type_ == 0:
|
# 获取是否交易成功
|
data = data_json["data"]
|
order_status = data["orderStatus"]
|
huaxin_trade_record_manager.DelegateRecordManager.add([data])
|
if huaxin_util.is_deal(order_status):
|
if int(str(data["direction"])) == huaxin_util.TORA_TSTP_D_Buy:
|
l2_trade_util.forbidden_trade(data["securityID"], msg="已成交", force=True)
|
if TradePointManager.get_latest_place_order_mode(
|
data["securityID"]) == OrderBeginPosInfo.MODE_RADICAL:
|
RadicalBuyDealCodesManager().add_deal_code(data["securityID"])
|
# 成交,更新成交列表与资金列表
|
huaxin_trade_data_update.add_deal_list()
|
huaxin_trade_data_update.add_money_list()
|
# 记录交易反馈日志
|
async_log_util.info(hx_logger_trade_callback, data_json)
|
|
def OnTradeResponse(self, data_json):
|
hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
|
# 设置响应内容
|
trade_api.set_response(data_json["request_id"], data_json['data'])
|
|
|
class MyL2TradeSingleCallback(L2TradeSingleCallback):
|
# 积极买板块计算结果缓存:{"code",(有效时间, 结果)}
|
__radical_buy_by_blocks_result_cache = {}
|
|
def OnTradeSingle(self, code, big_buy_order_count, _type, data):
|
# 只处理深证的票
|
try:
|
# 判断是否下单
|
state = CodesTradeStateManager().get_trade_state_cache(code)
|
if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_SUCCESS:
|
# 已经下单了
|
return
|
|
l2_log.debug(code, "成交触发买入计算 触发模式:{} 大单数量:{}", _type, big_buy_order_count)
|
|
total_datas = l2_data_util.local_today_datas.get(code)
|
|
mode_descs = []
|
# if big_buy_order_count > 0:
|
# mode_descs.append("300w")
|
if l2_data_manager_new.L2TradeDataProcessor.get_active_buy_blocks(code):
|
mode_descs.append("身位")
|
|
current_total_sell_data = L2MarketSellManager().get_current_total_sell_data(code)
|
sell_info = None
|
if current_total_sell_data:
|
sell_info = (current_total_sell_data[0], current_total_sell_data[1])
|
|
if _type == L2TradeSingleDataManager.TYPE_PASSIVE and mode_descs:
|
# 可以激进下单且必须是首次下单才能激进
|
place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
|
if tool.is_sz_code(code) and place_order_count == 0 and current_total_sell_data[
|
1] > 500 * 10000 and global_util.zyltgb_map.get(
|
code) < 50 * 100000000:
|
# 首次下单,自由流通50亿以下,总卖额500w才能激进下单
|
mode_descs.insert(0, "成交触发")
|
last_index = total_datas[-1]["index"]
|
volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code)
|
order_begin_pos = OrderBeginPosInfo(buy_single_index=last_index,
|
buy_exec_index=last_index,
|
buy_compute_index=last_index,
|
num=0, count=1,
|
max_num_set=set(),
|
buy_volume_rate=volume_rate,
|
mode=OrderBeginPosInfo.MODE_ACTIVE,
|
mode_desc=",".join(mode_descs),
|
sell_info=sell_info,
|
threshold_money=0)
|
l2_data_manager_new.L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos)
|
l2_log.debug(code, "积极下单,获取到买入执行位置:{} 成交数据触发模式:{} 大单数量:{}",
|
order_begin_pos.buy_exec_index,
|
_type, big_buy_order_count)
|
l2_data_manager_new.L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"],
|
True, None)
|
else:
|
l2_log.debug(code, "积极下单,不满足扫入下单条件,无法扫入")
|
else:
|
if not tool.is_sz_code(code):
|
return
|
# 找到最近的大买单
|
for i in range(len(total_datas) - 1, -1, -1):
|
d = total_datas[i]
|
val = d['val']
|
if not L2DataUtil.is_limit_up_price_buy(val):
|
continue
|
if val['num'] * float(val['price']) < 5000:
|
continue
|
if val['orderNo'] < data[0][6]:
|
continue
|
result = L2TradeSingleDataManager.is_can_place_order(code, d)
|
if result and result[0]:
|
volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code)
|
order_begin_pos = OrderBeginPosInfo(buy_single_index=i,
|
buy_exec_index=i,
|
buy_compute_index=i,
|
num=0, count=1,
|
max_num_set=set(),
|
buy_volume_rate=volume_rate,
|
mode=OrderBeginPosInfo.MODE_FAST,
|
mode_desc="成交触发",
|
sell_info=sell_info,
|
threshold_money=0)
|
l2_data_manager_new.L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos)
|
l2_log.debug(code, "非激进下单,获取到买入执行位置:{} 成交数据触发模式:{}",
|
order_begin_pos.buy_exec_index,
|
_type)
|
l2_data_manager_new.L2TradeDataProcessor.start_buy(code, total_datas[-1],
|
total_datas[-1]["index"],
|
True, None)
|
break
|
except Exception as e:
|
logger_debug.exception(e)
|
|
def OnLimitUpActiveBuy(self, code, transaction_datas):
|
__start_time = time.time()
|
try:
|
# 判断是否处于可下单状态
|
state = CodesTradeStateManager().get_trade_state_cache(code)
|
if not trade_util.is_can_order_by_state(state):
|
# 不处于可下单状态
|
return
|
|
# 判断最近60个交易日有无涨停
|
# 判断昨日是否涨停过
|
async_log_util.info(logger_l2_radical_buy, f"涨停主动买:{code}-{transaction_datas[-1]}")
|
deal_codes = RadicalBuyDealCodesManager().get_deal_codes()
|
# 判断今日扫入的代码数量是否大于阈值
|
radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting()
|
MAX_COUNT = 4 if radical_buy_setting is None else radical_buy_setting[0]
|
if len(deal_codes) >= MAX_COUNT:
|
async_log_util.info(logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
|
return
|
if code in deal_codes:
|
async_log_util.info(logger_l2_radical_buy, f"该代码已经成交:{code}")
|
return
|
|
# 单票是否可买
|
can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
|
if can_buy_result[0]:
|
# 获取激进买的板块
|
result_cache = self.__radical_buy_by_blocks_result_cache.get(code)
|
if not result_cache or result_cache[0] < time.time():
|
# 不存在/过期
|
yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
|
if yesterday_codes is None:
|
yesterday_codes = set()
|
# 计算是否可以扫入
|
radical_result = RadicalBuyBlockManager.is_radical_buy(code, yesterday_codes)
|
async_log_util.info(logger_l2_radical_buy, f"计算板块结果:{code}-{radical_result}")
|
result_cache = (time.time() + 3, radical_result)
|
self.__radical_buy_by_blocks_result_cache[code] = result_cache
|
RadicalBuyDealCodesManager.radical_buy_blocks_dict[code] = radical_result[0]
|
# 取缓存
|
result = result_cache[1]
|
if result[0]:
|
# 买入的板块
|
buy_blocks = result[0]
|
# 如果关键词包含已成交的原因就不再下单
|
# 获取已经成交代码的板块
|
try:
|
# ---------------判断板块是否还可以买入----------------
|
f_buy_blocks = radical_buy_data_manager.is_block_can_radical_buy(code, buy_blocks, deal_codes)
|
if not f_buy_blocks:
|
return
|
buy_blocks = f_buy_blocks
|
except Exception as e:
|
logger_debug.exception(e)
|
|
# 判断当前时间段是否可以买入
|
mode = OrderBeginPosInfo.MODE_RADICAL
|
can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode,
|
DealAndDelegateWithBuyModeDataManager().get_deal_codes_info(
|
mode),
|
DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info(
|
mode))
|
if not can_buy:
|
async_log_util.info(logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}")
|
return
|
|
# -----根据成交比例判断是否可买------
|
result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas)
|
async_log_util.info(logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
|
in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
|
buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks]
|
if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE:
|
if tool.get_now_time_as_int() < 93200:
|
radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
|
async_log_util.info(logger_l2_radical_buy,
|
f"09:32之前不交易:{code}")
|
return
|
# 判断是否开得太高
|
open_price = L1DataManager.get_open_price(code)
|
if not radical_buy_strategy.is_can_buy_with_open_price(code, open_price):
|
async_log_util.info(logger_l2_radical_buy,
|
f"开得太高:{code}")
|
radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
|
return
|
radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code)
|
|
if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT and not tool.is_sh_code(code):
|
# 上证不能根据成交买入
|
refer_sell_data = L2MarketSellManager().get_refer_sell_data(code,
|
l2_huaxin_util.convert_time(
|
transaction_datas[-1][3]))
|
total_datas = l2_data_util.local_today_datas.get(code)
|
buy_single_index, buy_exec_index = total_datas[-1]["index"], total_datas[-1]["index"]
|
buy_volume_rate = L2TradeDataProcessor.volume_rate_info[code][0]
|
sell_info = (0, 0)
|
if refer_sell_data:
|
sell_info = (refer_sell_data[0], refer_sell_data[1])
|
threshold_money = 0
|
order_begin_pos_info = OrderBeginPosInfo(buy_single_index=buy_single_index,
|
buy_exec_index=buy_exec_index,
|
buy_compute_index=buy_exec_index,
|
num=1, count=1,
|
max_num_set=set(),
|
buy_volume_rate=buy_volume_rate,
|
mode=OrderBeginPosInfo.MODE_RADICAL,
|
mode_desc=f"扫入买入:{buy_blocks}",
|
sell_info=sell_info,
|
threshold_money=threshold_money)
|
L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos_info)
|
buy_result = L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"],
|
True, block_info=buy_blocks_with_money)
|
if buy_result:
|
# 下单成功
|
radical_buy_data_manager.BlockPlaceOrderRecordManager().add_record(code, buy_blocks)
|
radical_buy_strategy.clear_latest_deal_active_buy_order(code)
|
else:
|
RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict[code] = (
|
time.time() + 30, transaction_datas[-1][6], buy_blocks,
|
l2_huaxin_util.convert_time(transaction_datas[-1][3]), buy_blocks_with_money)
|
else:
|
async_log_util.info(logger_l2_radical_buy, f"不能下单:{code}-{result_by_volume}")
|
else:
|
volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code)
|
async_log_util.info(logger_l2_radical_buy, f"没有可扫入的板块:{code},量比:{volume_rate}")
|
else:
|
async_log_util.info(logger_l2_radical_buy, f"目前代码不可交易:{code}-{can_buy_result[1]}")
|
except Exception as e:
|
async_log_util.info(logger_debug, f"激进买计算异常:{str(e)}")
|
logger_debug.exception(e)
|
finally:
|
use_time = time.time() - __start_time
|
if use_time > 0.005:
|
async_log_util.info(logger_debug, f"扫入处理时长:{code}-{use_time}")
|
|
|
# 回调
|
my_l2_data_callback = MyL2DataCallback()
|
my_l2_data_callbacks = [MyL2DataCallback() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT)]
|
my_trade_response = MyTradeResponse()
|
|
|
# 预埋单
|
def __test_pre_place_order():
|
logger_debug.info("进入预埋单测试")
|
codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
|
if codes:
|
for code in codes:
|
# 获取昨日收盘价格
|
limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
|
if not limit_up_price:
|
init_data_util.re_set_price_pre(code)
|
limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
|
if not limit_up_price:
|
logger_debug.info(f"没有获取到涨停价:{code}")
|
continue
|
shadow_price = tool.get_shadow_price(limit_up_price)
|
if not constant.TRADE_ENABLE:
|
return
|
try:
|
volume = tool.get_buy_volume_by_money(limit_up_price, constant.AVAILABLE_BUY_MONEYS[0])
|
result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, limit_up_price,
|
blocking=False,
|
shadow_price=shadow_price, shadow_volume=volume)
|
async_log_util.info(logger_trade, f"{code}下单结束:{result}")
|
buy_open_limit_up_strategy.BuyOpenLimitupDataManager().set_place_order_info(code, volume, volume,
|
result.get("order_ref"))
|
except Exception as e:
|
pass
|
|
|
def __subscript_fixed_codes_l2():
|
"""
|
订阅固定代码的L2数据
|
@return:
|
"""
|
codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
|
add_datas = []
|
for code in codes:
|
limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
|
if not limit_up_price:
|
init_data_util.re_set_price_pre(code)
|
limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
|
min_volume = int(round(50 * 10000 / limit_up_price))
|
# 传递笼子价
|
add_datas.append(
|
# (代码, 最小量, 涨停价,影子订单价格,买量, 特殊价格)
|
(code, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2),
|
tool.get_buy_volume(limit_up_price),
|
[tool.get_buy_volume_by_money(limit_up_price, x) for x in constant.AVAILABLE_BUY_MONEYS]))
|
huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, 0)
|
|
|
# 做一些初始化的操作
|
def __init():
|
def run_pending():
|
schedule.every().day.at("15:10:00").do(zyltgb_util.update_all_zylt_volumes)
|
schedule.every().day.at("01:01:00").do(__test_pre_place_order)
|
schedule.every().day.at("09:10:00").do(__subscript_fixed_codes_l2)
|
schedule.every().day.at("08:00:01").do(history_k_data_manager.update_history_k_bars)
|
schedule.every().day.at("08:30:01").do(history_k_data_manager.update_history_k_bars)
|
schedule.every().day.at("09:00:01").do(history_k_data_manager.update_history_k_bars)
|
schedule.every().day.at("09:00:01").do(huaxin_trade_data_update.add_money_list)
|
schedule.every().day.at("09:15:20").do(huaxin_trade_data_update.add_money_list)
|
while True:
|
schedule.run_pending()
|
time.sleep(1)
|
# 9点半后终止运行
|
# if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0:
|
# break
|
|
# 持仓刷新
|
huaxin_trade_data_update.add_position_list()
|
|
threading.Thread(target=run_pending, daemon=True).start()
|
l2_data_util.load_l2_data_all(True)
|
L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback())
|
# 加载自由流通量
|
global_data_loader.load_zyltgb_volume_from_db()
|
|
|
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
|
queue_l1_trade_w_strategy_r, trade_ipc_addr):
|
"""
|
@param queue_strategy_r_trade_w:
|
@param queue_l1_w_strategy_r:
|
@param queue_strategy_w_trade_r:
|
@param queue_strategy_w_trade_r_for_read:
|
@param queue_l1_trade_w_strategy_r:
|
@param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址)
|
@return:
|
"""
|
logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
|
try:
|
# 执行一些初始化数据
|
block_info.init()
|
__init()
|
# 启动外部接口监听
|
manager = outside_api_command_manager.ApiCommandManager()
|
manager.init(middle_api_protocol.SERVER_HOST,
|
middle_api_protocol.SERVER_PORT,
|
OutsideApiCommandCallback(), common_client_count=50)
|
manager.run(blocking=False)
|
|
# 启动交易服务
|
huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,
|
queue_strategy_w_trade_r_for_read, trade_ipc_addr)
|
|
# 监听l1那边传过来的代码
|
t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)
|
t1.start()
|
|
# 监听l1交易那边传过来的代码
|
t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True)
|
t1.start()
|
|
# 下单距离太远取消订单
|
t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True)
|
t1.start()
|
|
# 同步异步日志
|
t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
|
t1.start()
|
|
# 同步L2的异步日志
|
l2_log.codeLogQueueDistributeManager.run_async()
|
|
t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True)
|
t1.start()
|
|
logger_system.info("create TradeServer")
|
t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
|
t1.start()
|
|
laddr = "0.0.0.0", 10008
|
try:
|
tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle
|
tcpserver.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
|
except Exception as e:
|
logger_system.exception(e)
|
|
|
if __name__ == "__main__":
|
code = "002528"
|
global_data_loader.init()
|
kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas()
|
l2_data_util.load_l2_data(code, False, False)
|
datas = log_export.load_l2_market_data()
|
datas = datas[code]
|
for data in datas:
|
TradeServerProcessor.l2_market_data(code, data)
|