import concurrent.futures import datetime import hashlib import json import logging import queue import random import socket import socketserver import threading import time import schedule import constant import outside_api_command_manager from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader, zyltgb_util from code_attribute.code_l1_data_manager import L1DataManager from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, \ WantBuyCodesManager from huaxin_client import l2_data_transform_protocol from huaxin_client.trade_transform_protocol import TradeResponse from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \ l2_data_source_util, l2_data_log, data_callback from l2.cancel_buy_strategy import GCancelBigNumComputer, \ DCancelBigNumComputer, RDCancelBigNumComputer from l2.code_price_manager import Buy1PriceManager from l2.huaxin import huaxin_target_codes_manager, l2_huaxin_util from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager from l2.l2_data_manager import TradePointManager, OrderBeginPosInfo from l2.l2_data_manager_new import L2TradeDataProcessor from l2.l2_data_util import L2DataUtil from l2.l2_sell_manager import L2MarketSellManager from l2.l2_transaction_data_processor import HuaXinTransactionDatasProcessor from l2.place_order_single_data_manager import L2TradeSingleCallback, L2TradeSingleDataManager from log_module import async_log_util, log_export from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \ hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \ logger_system, logger_trade, logger_l2_radical_buy, logger_l2_trade from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager, kpl_api, kpl_util from third_data.code_plate_key_manager import KPLCodeJXBlockManager, RealTimeKplMarketData, \ KPLPlateForbiddenManager from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils from trade import l2_trade_util, \ trade_data_manager, trade_constant, buy_open_limit_up_strategy from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy from trade.buy_money_count_setting import BuyMoneyAndCountSetting, BuyMoneyUtil from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \ huaxin_trade_record_manager from api.outside_api_command_callback import OutsideApiCommandCallback from trade.huaxin.huaxin_trade_record_manager import DelegateRecordManager from trade.order_statistic import DealAndDelegateWithBuyModeDataManager from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, \ EveryLimitupBigDealOrderManager, RadicalCodeMarketInfoManager, BeforeSubDealBigOrderManager, \ EveryLimitupBigDelegateOrderManager from trade.sell.sell_rule_manager import TradeRuleManager from trade.trade_data_manager import RadicalBuyDealCodesManager from trade.trade_manager import CodesTradeStateManager from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util, init_data_util, \ output_util trade_data_request_queue = queue.Queue(maxsize=1000) class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass): socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): __inited = False __TradeBuyQueue = transaction_progress.TradeBuyQueue() __KPLCodeJXBlockManager = KPLCodeJXBlockManager() __GCancelBigNumComputer = GCancelBigNumComputer() # L2进程对应订阅的代码: {"进程ID": 代码列表} __pid_l2_subscript_codes = {} def setup(self): self.__init() @classmethod def __init(cls): if cls.__inited: return True cls.__inited = True cls.__req_socket_dict = {} def __is_sign_right(self, data_json): list_str = [] sign = data_json["sign"] data_json.pop("sign") for k in data_json: list_str.append(f"{k}={data_json[k]}") list_str.sort() __str = "&".join(list_str) + "JiaBei@!*." md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() if md5 != sign: raise Exception("签名出错") @classmethod def getRecvData(cls, skk): data = "" header_size = 10 buf = skk.recv(header_size) header_str = buf if buf: start_time = time.time() buf = buf.decode('utf-8') if buf.startswith("##"): content_length = int(buf[2:10]) received_size = 0 while not received_size == content_length: r_data = skk.recv(10240) received_size += len(r_data) data += r_data.decode('utf-8') else: data = skk.recv(1024 * 1024) data = buf + data.decode('utf-8') # hx_logger_l2_upload.info(f"读取数据耗时:{round((time.time() - start_time) * 1000, 1)}") return data, header_str def handle(self): host = self.client_address[0] super().handle() sk: socket.socket = self.request while True: try: # data = sk.recv(1024*1024, socket.MSG_WAITALL) data, header = self.getRecvData(sk) if data: data_str = data # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}") data_json = None try: data_json = json.loads(data_str) except json.decoder.JSONDecodeError as e: # JSON解析失败 sk.sendall(socket_util.load_header(json.dumps( {"code": 100, "msg": f"JSON解析失败"}).encode( encoding='utf-8'))) hx_logger_trade_callback.error( f"json解析失败,字符串长度:{len(data_str)},字符串内容:\"{data_str[:30]}......{data_str[-20:]}\"") continue if data_json["type"] == 'register': client_type = data_json["data"]["client_type"] rid = data_json["rid"] trade_api.ClientSocketManager.add_client(client_type, rid, sk) sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8')) try: # print("客户端", ClientSocketManager.socket_client_dict) while True: result, header = self.getRecvData(sk) try: resultJSON = json.loads(result) if resultJSON["type"] == 'heart': # 记录活跃客户端 trade_api.ClientSocketManager.heart(resultJSON['client_id']) else: hx_logger_contact_debug.warning(f"接收到非心跳信息:{result}") except json.decoder.JSONDecodeError as e: if not result: sk.close() # print("JSON解析出错", result, header) time.sleep(1) except ConnectionResetError as ee: trade_api.ClientSocketManager.del_client(rid) except Exception as e: logging.exception(e) elif data_json["type"] == "response": # 主动触发的响应 try: my_trade_response.OnTradeResponse(data_json) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "trade_callback": try: # 交易回调 my_trade_response.OnTradeCallback(data_json) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "l2_order": try: # L2逐笔委托 data = data_json["data"] code = data["code"] timestamp = data.get("time") datas = data["data"] TradeServerProcessor.l2_order(code, datas, timestamp) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "l2_trans": try: data = data_json["data"] code = data["code"] datas = data["data"] TradeServerProcessor.l2_transaction(code, datas) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "l2_market_data": try: data = data_json["data"] code = data["code"] data = data["data"] TradeServerProcessor.l2_market_data(code, data) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "l2_subscript_codes": try: data = data_json["data"] datas = data["data"] # print("l2_subscript_codes", data_json) # 订阅的代码 huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(datas) # 上传数据 codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes() l2_log.codeLogQueueDistributeManager.set_l2_subscript_codes(codes) fresults = [] if codes: for code in codes: try: # 获取成交大单:(参考大单金额,已成交大单金额,大单要求金额) th, is_temp = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code) deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info( code, gpcode_manager.get_limit_up_price_as_num(code)) deal_big_order_info = ( output_util.money_desc(th), output_util.money_desc(deal_big_money_info[1]), output_util.money_desc(deal_big_money_info[2])) except: deal_big_order_info = None code_name = gpcode_manager.get_code_name(code) fresults.append((code, code_name, deal_big_order_info)) fdata = middle_api_protocol.load_l2_subscript_codes(fresults) middle_api_protocol.request(fdata) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "l2_subscript_codes_v2": try: data = data_json["data"] datas = data["data"] pid, datas = datas[0], datas[1] self.__pid_l2_subscript_codes[pid] = datas # print("l2_subscript_codes", data_json) fcodes = [] for pid in self.__pid_l2_subscript_codes: codes = self.__pid_l2_subscript_codes[pid] fcodes.extend(codes) # 订阅的代码 huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(fcodes) # 上传数据 codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes() l2_log.codeLogQueueDistributeManager.set_l2_subscript_codes(codes) fresults = [] if codes: for code in codes: try: # 获取成交大单:(参考大单金额,已成交大单金额,大单要求金额) th, is_temp = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code) deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info( code, gpcode_manager.get_limit_up_price_as_num(code)) deal_big_order_info = ( output_util.money_desc(th), output_util.money_desc(deal_big_money_info[1]), output_util.money_desc(deal_big_money_info[2])) except: deal_big_order_info = None code_name = gpcode_manager.get_code_name(code) fresults.append((code, code_name, deal_big_order_info)) fdata = middle_api_protocol.load_l2_subscript_codes(fresults) middle_api_protocol.request(fdata) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "get_level1_codes": # print("get_level1_codes") # 获取level1的代码 list_ = JueJinApi.get_exchanges_codes(["SHSE", "SZSE"]) fdatas = [] code_name_map = {} for d in list_: if not tool.is_target_code(d["sec_id"]): continue if d["sec_level"] != 1: continue # if d["pre_close"] * tool.get_limit_up_rate(d["sec_id"]) > constant.MAX_SUBSCRIPT_CODE_PRICE: # continue if (d["listed_date"] + datetime.timedelta( days=100)).timestamp() > datetime.datetime.now().timestamp(): continue fdatas.append(d["sec_id"]) code_name_map[d["sec_id"]] = d["sec_name"] # 保存代码名称 CodesNameManager().add_code_names(code_name_map) sk.sendall( socket_util.load_header(json.dumps({"code": 0, "data": fdatas}).encode(encoding='utf-8'))) elif data_json["type"] == "set_target_codes": try: TradeServerProcessor.set_target_codes(data_json) except Exception as e: logging.exception(e) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "trading_order_canceled": try: logger_l2_g_cancel.info(f"正在成交的订单撤单,data:{data_json}") data = data_json["data"] code = data["code"] order_no = data["data"] TradeServerProcessor.trading_order_canceled(code, order_no) finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) else: # 断开连接 break # sk.close() except Exception as e: logging.exception(e) break def finish(self): super().finish() # 交易服务处理器 class TradeServerProcessor: __TradeBuyQueue = transaction_progress.TradeBuyQueue() __KPLCodeJXBlockManager = KPLCodeJXBlockManager() __GCancelBigNumComputer = GCancelBigNumComputer() __sell_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) __updating_jx_blocks_codes = set() # 保存现价 @classmethod def __save_l1_current_price(cls, datas): before_trade = int(tool.get_now_time_str().replace(":", "")) < int("092900") for d in datas: code = d[0] # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量) price = d[1] L1DataManager.set_l1_current_price(code, price) if before_trade: # 开盘前保存开盘价 L1DataManager.set_open_price(code, price) huaxin_l1_data_manager.set_buy1_data(code, d[5], d[6]) @classmethod def __process_buy_open_limit_up_datas(cls, datas): """ 处理排1的数据 @param datas: [(代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间, 昨日收盘价(集合竞价才有))] @return: """ # 9:25之后不再处理 if tool.get_now_time_as_int() > int("092500"): return for d in datas: # 计算当前是否是涨停状态 if len(d) == 11: async_log_util.info(logger_debug, f"开1数据:{d}") if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(d[0]): # 09:19:50 到 09:20:00判断是否要撤单 if int("091950") <= int(d[9].replace(":", "")) < int("092000"): async_log_util.info(logger_debug, f"排1撤单:{d},封单:{d[8] * d[5]}") if d[8] * d[5] < 1e8: code = d[0] current_delegates = DelegateRecordManager().list_current_delegates(code) if current_delegates: for c in current_delegates: huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, c["orderSysID"]) # 获取L1现价 @classmethod def get_l1_current_price(cls, code): return L1DataManager.get_l1_current_price(code) # 设置目标代码 @classmethod def set_target_codes(cls, data_json): data = data_json["data"] request_id = data_json["request_id"] datas = data["data"] cls.__save_l1_current_price(datas) cls.__process_buy_open_limit_up_datas(datas) # 根据高标的实时涨幅计算拉黑板块 rate_dict = {d[0]: d[2] for d in datas} cls.__process_l1_data_thread_pool.submit( lambda: KPLPlateForbiddenManager().compute(rate_dict)) # 9:30之前采用非线程 if int(tool.get_now_time_str().replace(":", "")) < int("093000") or True: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id) else: cls.__process_l1_data_thread_pool.submit( lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)) @classmethod def l2_order(cls, code, _datas, timestamp): if not constant.L2_DATA_IS_LOADED: logger_debug.info(f"{code}还未载入L2本地数据") return now_time = time.time() use_time = int((now_time - timestamp) * 1000) thread_id = random.randint(0, 100000) l2_log.threadIds[code] = thread_id l2_data_count = len(_datas) # l2_log.info(code, hx_logger_l2_orderdetail, # f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}") # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托") try: l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas) finally: use_time = time.time() - now_time if use_time > 0.008: l2_data_log.l2_time_log(code, f"处理L2逐笔委托结束:处理数据数量: {l2_data_count} 最终处理时间:{round(use_time * 1000, 2)}ms") @classmethod def l2_transaction(cls, code, datas): # async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}") if datas: HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas(code, datas) @classmethod def l2_market_data(cls, code, data): time_str = f"{data['dataTimeStamp']}" time_str = l2_huaxin_util.convert_time(time_str) buy_1_price, buy_1_volume = data["buy"][0] sell_1_price, sell_1_volume = data["sell"][0] limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) # 涨幅 price = data['lastPrice'] code_price_manager.CurrentPriceManager.set_current_price(code, price) code_price_manager.Buy1PriceManager().set_latest_buy1_money(code, buy_1_price, buy_1_volume) # -----------------------判断是是否有自动撤单规则----------------------- try: if DCancelBigNumComputer().has_auto_cancel_rules(code): need_cancel, rule_id = DCancelBigNumComputer().need_cancel(code, buy_1_volume) if need_cancel: try: l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, f"盯封单撤:{time_str}-{buy_1_volume}", cancel_type=trade_constant.CANCEL_TYPE_D) finally: TradeRuleManager().excuted(rule_id) except Exception as e: logger_debug.exception(e) pre_close_price = CodePrePriceManager.get_price_pre_cache(code) if pre_close_price is not None: average_rate = None try: average_price = data["totalValueTrade"] / data["totalVolumeTrade"] average_rate = round((average_price - pre_close_price) / pre_close_price, 4) except: pass # 处理买1,卖1信息 code_price_manager.Buy1PriceManager().process(code, buy_1_price, buy_1_volume, time_str, limit_up_price, sell_1_price, sell_1_volume // 100, average_rate) latest_3m_buy1_money_list = code_price_manager.Buy1PriceManager().get_latest_3m_buy1_money_list(code) # 拉取总大单成交 threading.Thread( target=lambda: radical_buy_data_manager.TotalDealBigOrderInfoManager.update_big_order_info(code, data[ "totalValueTrade"]), daemon=True).start() async_log_util.info(hx_logger_l2_market_data, f"{code}#{data}") sell_1_info = data["sell"][0] if data.get("sell") else None L2MarketSellManager().set_current_total_sell_data(code, time_str, data["totalAskVolume"] * data["avgAskPrice"], data["totalAskVolume"], sell_1_info, data.get("sell")) # 炸板 if sell_1_info and sell_1_info[1] > 0: if BeforeSubDealBigOrderManager().is_need_update(code): # 炸板更新数据 cls.__sell_thread_pool.submit( lambda: radical_buy_data_manager.pull_pre_deal_big_orders(code)) if data["sell"] and len(data["sell"]) > 1 and data["sell"][1][1] > 0: # 出现卖二 radical_buy_strategy.clear_data(code, force=True, market_time_str=time_str) # 设置扫入数据 RadicalCodeMarketInfoManager().set_market_info(code, time_str, limit_up_price, data["buy"][0], sell_1_info) # 判断是否下单 state = CodesTradeStateManager().get_trade_state_cache(code) if not trade_util.is_can_order_by_state(state): # 不处于可下单状态 RadicalBuyDataManager().market_info_change(code) # 是否即将炸开 if RadicalCodeMarketInfoManager().is_almost_open_limit_up(code): # 即将炸开 total_deal_big_order_info = radical_buy_data_manager.get_total_deal_big_order_info(code, limit_up_price) if total_deal_big_order_info and total_deal_big_order_info[0] <= 0: EveryLimitupBigDealOrderManager.clear(code, f"板上放量:{time_str}") # 大单足够 # l2_trade_single_callback.process_limit_up_active_buy(code, [], is_almost_open_limit_up=True, # l2_market_time_str=time_str) @classmethod def trading_order_canceled(cls, code, order_no): pass def clear_invalid_client(): logger_system.info(f"trade_server clear_invalid_client 线程ID:{tool.get_thread_id()}") while True: try: huaxin_trade_api.ClientSocketManager.del_invalid_clients() except: pass finally: time.sleep(2) # 排得太远撤单 def __cancel_buy_for_too_far(): while True: try: # 获取账户可用资金 account_available_money = trade_data_manager.AccountMoneyManager().get_available_money_cache() if account_available_money is not None and account_available_money > constant.BUY_MONEY_PER_CODE: continue can_cancel_codes = [] current_delegates = huaxin_trade_record_manager.DelegateRecordManager().list_current_delegates() for c in current_delegates: if int(c["direction"]) != huaxin_util.TORA_TSTP_D_Buy: continue code = c["securityID"] # 获取下单位置信息 order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code) if order_begin_pos is None or order_begin_pos.buy_single_index is None: continue total_datas = l2_data_util.local_today_datas.get(code) if not total_datas: continue if order_begin_pos.buy_exec_index < 0: continue if tool.trade_time_sub(tool.get_now_time_str(), total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) < 60: continue trade_index, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code) # 下单位置 place_order_index = SCancelBigNumComputer().get_real_place_order_index_cache(code) # 获取剩下的笔数 total_left_num = 0 for i in range(trade_index + 1, place_order_index): data = total_datas[i] val = data["val"] if not L2DataUtil.is_limit_up_price_buy(val): continue if val["num"] * float(val["price"]) < 5000: continue left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i, total_datas, l2_data_util.local_today_canceled_buyno_map.get( code)) if left_count > 0: total_left_num += val["num"] * left_count # 获取封单额 limit_up_price = gpcode_manager.get_limit_up_price(code) buy1_money = Buy1PriceManager().get_latest_buy1_money(code) if buy1_money is None: buy1_money = 0 if buy1_money > 0: total_left_money = total_left_num * 100 * float(limit_up_price) rate = total_left_money / buy1_money if rate > 0.5: can_cancel_codes.append((code, rate)) if can_cancel_codes: can_cancel_codes.sort(key=lambda x: x[1], reverse=True) # 暂时取消 # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(can_cancel_codes[0][0], "下单距离太远") except Exception as e: logger_debug.exception(e) finally: time.sleep(3) class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack): def OnL2Order(self, code, datas, timestamp): TradeServerProcessor.l2_order(code, datas, timestamp) def OnL2Transaction(self, code, datas): TradeServerProcessor.l2_transaction(code, datas) def OnMarketData(self, code, data): try: TradeServerProcessor.l2_market_data(code, data) except Exception as e: logger_debug.exception(e) def OnTradingOrderCancel(self, code, buy_no): TradeServerProcessor.trading_order_canceled(code, buy_no) class MyTradeResponse(TradeResponse): def OnTradeCallback(self, data_json): data_json = data_json["data"] type_ = data_json["type"] if type_ == 0: # 获取是否交易成功 data = data_json["data"] order_status = data["orderStatus"] huaxin_trade_record_manager.DelegateRecordManager.add([data]) if huaxin_util.is_deal(order_status): if int(str(data["direction"])) == huaxin_util.TORA_TSTP_D_Buy: l2_trade_util.forbidden_trade(data["securityID"], msg="已成交", force=True) if TradePointManager.get_latest_place_order_mode( data["securityID"]) == OrderBeginPosInfo.MODE_RADICAL: RadicalBuyDealCodesManager().add_deal_code(data["securityID"]) # 成交,更新成交列表与资金列表 huaxin_trade_data_update.add_deal_list() huaxin_trade_data_update.add_money_list() # 记录交易反馈日志 async_log_util.info(hx_logger_trade_callback, data_json) def OnTradeResponse(self, data_json): hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}") # 设置响应内容 trade_api.set_response(data_json["request_id"], data_json['data']) class MyL2TradeSingleCallback(L2TradeSingleCallback): # 积极买板块计算结果缓存:{"code",(有效时间, 结果)} __radical_buy_by_blocks_result_cache = {} def OnTradeSingle(self, code, big_buy_order_count, _type, data): # 暂时不处理 if True: return # 只处理深证的票 try: # 判断是否下单 state = CodesTradeStateManager().get_trade_state_cache(code) if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_SUCCESS: # 已经下单了 return l2_log.debug(code, "成交触发买入计算 触发模式:{} 大单数量:{}", _type, big_buy_order_count) total_datas = l2_data_util.local_today_datas.get(code) mode_descs = [] # if big_buy_order_count > 0: # mode_descs.append("300w") if l2_data_manager_new.L2TradeDataProcessor.get_active_buy_blocks(code): mode_descs.append("身位") current_total_sell_data = L2MarketSellManager().get_current_total_sell_data(code) sell_info = None if current_total_sell_data: sell_info = (current_total_sell_data[0], current_total_sell_data[1]) if _type == L2TradeSingleDataManager.TYPE_PASSIVE and mode_descs: # 可以激进下单且必须是首次下单才能激进 place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) if tool.is_sz_code(code) and place_order_count == 0 and current_total_sell_data[ 1] > 500 * 10000 and global_util.zyltgb_map.get( code) < 50 * 100000000: # 首次下单,自由流通50亿以下,总卖额500w才能激进下单 mode_descs.insert(0, "成交触发") last_index = total_datas[-1]["index"] volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code) order_begin_pos = OrderBeginPosInfo(buy_single_index=last_index, buy_exec_index=last_index, buy_compute_index=last_index, num=0, count=1, max_num_set=set(), buy_volume_rate=volume_rate, mode=OrderBeginPosInfo.MODE_ACTIVE, mode_desc=",".join(mode_descs), sell_info=sell_info, threshold_money=0) l2_data_manager_new.L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos) l2_log.debug(code, "积极下单,获取到买入执行位置:{} 成交数据触发模式:{} 大单数量:{}", order_begin_pos.buy_exec_index, _type, big_buy_order_count) l2_data_manager_new.L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"], True, None) else: l2_log.debug(code, "积极下单,不满足扫入下单条件,无法扫入") else: if not tool.is_sz_code(code): return # 找到最近的大买单 for i in range(len(total_datas) - 1, -1, -1): d = total_datas[i] val = d['val'] if not L2DataUtil.is_limit_up_price_buy(val): continue if val['num'] * float(val['price']) < 5000: continue if val['orderNo'] < data[0][6]: continue result = L2TradeSingleDataManager.is_can_place_order(code, d) if result and result[0]: volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code) order_begin_pos = OrderBeginPosInfo(buy_single_index=i, buy_exec_index=i, buy_compute_index=i, num=0, count=1, max_num_set=set(), buy_volume_rate=volume_rate, mode=OrderBeginPosInfo.MODE_FAST, mode_desc="成交触发", sell_info=sell_info, threshold_money=0) l2_data_manager_new.L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos) l2_log.debug(code, "非激进下单,获取到买入执行位置:{} 成交数据触发模式:{}", order_begin_pos.buy_exec_index, _type) l2_data_manager_new.L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"], True, None) break except Exception as e: logger_debug.exception(e) def process_limit_up_active_buy(self, code, transaction_datas, is_almost_open_limit_up=False, l2_market_time_str='', no_left_limit_up_sell=False): """ 处理涨停主动买 @param no_left_limit_up_sell: 是否还有剩余涨停卖尚未成交 @param code: @param transaction_datas: @param is_almost_open_limit_up: 是否即将炸板 @param l2_market_time_str: 时间 @return: 是否清除本次上板数据 """ __start_time = time.time() try: # 判断是否处于可下单状态 state = CodesTradeStateManager().get_trade_state_cache(code) if not trade_util.is_can_order_by_state(state): # 不处于可下单状态 return True if transaction_datas: l2_log.info(code, logger_l2_radical_buy, f"涨停主动买:{code}-{transaction_datas[-1]}") else: l2_log.info(code, logger_l2_radical_buy, f"即将炸板:{code}-{is_almost_open_limit_up}-{l2_market_time_str}") deal_codes = RadicalBuyDealCodesManager().get_deal_codes() # 判断今日扫入的代码数量是否大于阈值 radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting() MAX_COUNT = 4 if radical_buy_setting is None else radical_buy_setting[0] if not WantBuyCodesManager().is_in_cache(code): # 加绿不判断板块是否成交 if len(deal_codes) >= MAX_COUNT: l2_log.info(code, logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}") return True if code in deal_codes: l2_log.info(code, logger_l2_radical_buy, f"该代码已经成交:{code}") return True # 单票是否可买 can_buy_result = RadicalBuyDataManager.is_code_can_buy(code) if can_buy_result[0]: # 获取激进买的板块 f_buy_blocks, orgin_buy_blocks = radical_buy_strategy.compute_can_radical_buy_blocks(code, deal_codes) if orgin_buy_blocks: if not f_buy_blocks: return True # 买入的板块 buy_blocks = f_buy_blocks # 判断当前时间段是否可以买入 mode = OrderBeginPosInfo.MODE_RADICAL can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode, DealAndDelegateWithBuyModeDataManager().get_deal_codes_info( mode), DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info( mode)) if not can_buy: l2_log.info(code, logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}") return True # -----根据成交比例判断是否可买------ result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas, is_almost_open_limit_up, no_left_limit_up_sell=no_left_limit_up_sell) l2_log.info(code, logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}") in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b), in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE: if not WantBuyCodesManager().is_in_cache(code): # 加绿的不需要判断如下问题 # if tool.get_now_time_as_int() < 93100: # radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) # async_log_util.info(logger_l2_radical_buy, # f"09:31之前不交易:{code}") # return True # 判断是否开得太高 open_price = L1DataManager.get_open_price(code) if not radical_buy_strategy.is_can_buy_with_open_price(code, open_price): l2_log.info(code, logger_l2_radical_buy, f"开得太高:{code}") radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) return True # if not RadicalCodeMarketInfoManager().is_opened_limit_up(code): # # 辨识度的票首封可买 # # async_log_util.info(logger_l2_radical_buy, # f"没有炸过板:{code}") # return True radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code) if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT and not tool.is_sh_code(code): # 上证不能根据成交买入 latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3]) refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, latest_deal_time) total_datas = l2_data_util.local_today_datas.get(code) buy_single_index, buy_exec_index = total_datas[-1]["index"], total_datas[-1]["index"] buy_volume_rate = L2TradeDataProcessor.volume_rate_info[code][0] sell_info = (0, 0) if refer_sell_data: sell_info = (refer_sell_data[0], refer_sell_data[1]) threshold_money = 0 every_deal_orders = EveryLimitupBigDealOrderManager.list_big_buy_deal_orders(code) if every_deal_orders: min_order_no = min(every_deal_orders, lambda x: x[0])[0] else: min_order_no = transaction_datas[-1][6] order_begin_pos_info = OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=buy_exec_index, buy_compute_index=buy_exec_index, num=1, count=1, max_num_set=set(), buy_volume_rate=buy_volume_rate, mode=OrderBeginPosInfo.MODE_RADICAL, mode_desc=f"扫入买入:{buy_blocks}, 大单成交最小订单号:{min_order_no}", sell_info=sell_info, threshold_money=threshold_money, min_order_no= min_order_no ) L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos_info) buy_result = L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"], True, block_info=buy_blocks_with_money) if buy_result: # 下单成功 radical_buy_data_manager.BlockPlaceOrderRecordManager().add_record(code, buy_blocks) radical_buy_strategy.clear_data(code, force=True) RDCancelBigNumComputer().clear_data(code) # 大单成交足够 RadicalBuyDataManager().big_order_deal_enough(code) return True else: if transaction_datas: latest_buy_no = transaction_datas[-1][6] latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3]) else: # 如果没有成交数据,就取最近的买单号 total_datas = l2_data_util.local_today_datas.get(code) latest_buy_no = 0 for index in range(total_datas[-1]["index"], -1, -1): if L2DataUtil.is_buy(total_datas[index]["val"]): latest_buy_no = int(total_datas[index]["val"]["orderNo"]) break latest_deal_time = l2_market_time_str RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict[code] = ( time.time() + 60, latest_buy_no, buy_blocks, latest_deal_time, buy_blocks_with_money, is_almost_open_limit_up) return False else: l2_log.info(code, logger_l2_radical_buy, f"不能下单:{code}-{result_by_volume}") return False else: volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code) l2_log.info(code, logger_l2_radical_buy, f"没有可扫入的板块:{code},量比:{volume_rate}") return True else: l2_log.info(code, logger_l2_radical_buy, f"目前代码不可交易:{code}-{can_buy_result[1]}") return True except Exception as e: l2_log.info(code, logger_debug, f"激进买计算异常:{str(e)}") logger_debug.exception(e) finally: use_time = time.time() - __start_time if use_time > 0.005: l2_log.info(code, logger_debug, f"扫入处理时长:{code}-{use_time}") def OnLimitUpActiveBuy(self, code, transaction_datas, no_left_limit_up_sell): can_clear_before_data = self.process_limit_up_active_buy(code, transaction_datas, no_left_limit_up_sell=no_left_limit_up_sell) if can_clear_before_data: # 清除 EveryLimitupBigDealOrderManager.clear(code, "处理涨停成交数据") pass def OnLastLimitUpSellDeal(self, code, data): """ 最后一笔涨停卖数据成交 @param code: @param data: (data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType']) @return: """ if True: return if data[6] < data[7]: # 非主动买 return # 根据板块判断是否可买 state = CodesTradeStateManager().get_trade_state_cache(code) if not trade_util.is_can_order_by_state(state): # 不处于可下单状态 return l2_log.info(code, logger_l2_radical_buy, f"最后一笔涨停卖被吃:{code}-{data}") deal_codes = RadicalBuyDealCodesManager().get_deal_codes() # 判断今日扫入的代码数量是否大于阈值 radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting() MAX_COUNT = 4 if radical_buy_setting is None else radical_buy_setting[0] if not WantBuyCodesManager().is_in_cache(code): # 加绿不判断板块是否成交 if len(deal_codes) >= MAX_COUNT: l2_log.info(code, logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}") return if code in deal_codes: l2_log.info(code, logger_l2_radical_buy, f"该代码已经成交:{code}") return # 单票是否可买 can_buy_result = RadicalBuyDataManager.is_code_can_buy(code) if not can_buy_result[0]: return # 获取激进买的板块 f_buy_blocks, orgin_buy_blocks = radical_buy_strategy.compute_can_radical_buy_blocks(code, deal_codes) if not orgin_buy_blocks: l2_log.info(code, logger_l2_radical_buy, f"没有可扫入的板块:{code}") return if not f_buy_blocks: return # 买入的板块 buy_blocks = f_buy_blocks # 判断当前时间段是否可以买入 mode = OrderBeginPosInfo.MODE_RADICAL can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode, DealAndDelegateWithBuyModeDataManager().get_deal_codes_info( mode), DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info( mode)) if not can_buy: l2_log.info(code, logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}") return in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b), in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] if not WantBuyCodesManager().is_in_cache(code): # 判断是否开得太高 open_price = L1DataManager.get_open_price(code) if not radical_buy_strategy.is_can_buy_with_open_price(code, open_price): l2_log.info(code, logger_l2_radical_buy, f"开得太高:{code}") radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) return radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code) # 根据L2下单 latest_buy_no = data[6] latest_deal_time = l2_huaxin_util.convert_time(data[3]) # 清除大单委托数据 EveryLimitupBigDelegateOrderManager.clear(code, '') l2_log.info(code, logger_l2_trade, f"计算完板块与大单,准备下单:{data}") RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict[code] = ( time.time() + 1, latest_buy_no, buy_blocks, latest_deal_time, buy_blocks_with_money, False) # 回调 my_l2_data_callback = MyL2DataCallback() my_l2_data_callbacks = [MyL2DataCallback() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT)] my_trade_response = MyTradeResponse() def run_l2_market_info_reciever(queues: list): """ 接收L2 market数据 @param queues: @return: """ def recieve_data(queue): while True: try: d = queue.get() # {"type": "l2_market", "data": (code, data)} if d["type"] == "l2_market": code, market_data = d["data"] my_l2_data_callback.OnMarketData(code, market_data) except: pass for q in queues: threading.Thread(target=recieve_data, args=(q,), daemon=True).start() # 预埋单 def __test_pre_place_order(): codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes() logger_debug.info(f"进入预埋单测试:{codes}") try: if codes: for code in codes: # 获取昨日收盘价格 limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) if not limit_up_price: init_data_util.re_set_price_pre(code, force=True) limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) if not limit_up_price: logger_debug.info(f"没有获取到涨停价:{code}") continue shadow_price = tool.get_shadow_price(limit_up_price) if not constant.TRADE_ENABLE: return try: volume = tool.get_buy_volume_by_money(limit_up_price, constant.AVAILABLE_BUY_MONEYS[0]) result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, limit_up_price, blocking=False, shadow_price=shadow_price, shadow_volume=volume) l2_log.info(code, logger_trade, f"{code}下单结束:{result}") buy_open_limit_up_strategy.BuyOpenLimitupDataManager().set_place_order_info(code, volume, volume, result.get("order_ref")) except Exception as e: logger_debug.exception(e) except Exception as e: logger_debug.exception(e) def __subscript_fixed_codes_l2(): """ 订阅固定代码的L2数据 @return: """ codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes() add_datas = [] for code in codes: limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) if not limit_up_price: init_data_util.re_set_price_pre(code) limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) min_volume = int(round(50 * 10000 / limit_up_price)) special_volumes = BuyMoneyUtil.get_possible_buy_volumes(limit_up_price) special_volumes |= set([tool.get_buy_volume_by_money(limit_up_price, x) for x in constant.AVAILABLE_BUY_MONEYS]) # 传递笼子价 add_datas.append( # (代码, 最小量, 涨停价,影子订单价格,买量, 特殊价格) (code, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2), tool.get_buy_volume(limit_up_price), list(special_volumes))) huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, 0) def __update_yesterday_kpl_limit_up_datas(): day = tool.get_now_date_str() day = HistoryKDatasUtils.get_previous_trading_date(day) results = kpl_api.getHistoryLimitUpInfo(day) result_list = kpl_util.parseDaBanData(json.dumps({"list": results, "errcode": 0}), kpl_util.DABAN_TYPE_LIMIT_UP) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(day, result_list, set_not_open=True) logger_debug.info("更新昨日开盘啦实时涨停数据") # 做一些初始化的操作 def __init(): def run_pending(): # 更新自由流通市值 schedule.every().day.at("15:10:00").do(zyltgb_util.update_all_zylt_volumes) # 测试下单 schedule.every().day.at("01:02:00").do(__test_pre_place_order) # 订阅固定的代码 schedule.every().day.at("09:10:00").do(__subscript_fixed_codes_l2) # 更新K线 schedule.every().day.at("08:00:01").do(history_k_data_manager.update_history_k_bars) schedule.every().day.at("08:30:01").do(history_k_data_manager.update_history_k_bars) schedule.every().day.at("09:00:01").do(history_k_data_manager.update_history_k_bars) # 更新账户信息 schedule.every().day.at("09:00:01").do(huaxin_trade_data_update.add_money_list) schedule.every().day.at("09:15:20").do(huaxin_trade_data_update.add_money_list) schedule.every().day.at("09:15:20").do(huaxin_trade_data_update.add_money_list) # 更新昨日实时涨停数据 schedule.every().day.at("07:58:00").do(__update_yesterday_kpl_limit_up_datas) while True: try: schedule.run_pending() except: pass finally: time.sleep(1) # 9点半后终止运行 # if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0: # break # 持仓刷新 huaxin_trade_data_update.add_position_list() threading.Thread(target=run_pending, daemon=True).start() l2_data_util.load_l2_data_all(True) # L2成交信号回调 global l2_trade_single_callback l2_trade_single_callback = MyL2TradeSingleCallback() data_callback.l2_trade_single_callback = l2_trade_single_callback L2TradeSingleDataManager.set_callback(l2_trade_single_callback) # 加载自由流通量 global_data_loader.load_zyltgb_volume_from_db() # 获取最近7天涨停数最多的板块 # try: # if not KPLPlateForbiddenManager().list_all_cache() and tool.get_now_time_as_int() > int("070000"): # # 没有添加过的时候需要重新添加 # datas_ = LatestLimitUpBlockManager().statistics_limit_up_block_infos() # if datas_: # for data_ in datas_: # # 连续2天的板块就不买 # if data_[2] >= 2: # KPLPlateForbiddenManager().save_plate(data_[0]) # except: # pass # 初始化数据 BuyMoneyAndCountSetting() gpcode_manager.WantBuyCodesManager() def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr): """ @param queue_strategy_r_trade_w: @param queue_strategy_w_trade_r: @param queue_strategy_w_trade_r_for_read: @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址) @return: """ logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") try: # 执行一些初始化数据 block_info.init() __init() # 启动外部接口监听 manager = outside_api_command_manager.ApiCommandManager() manager.init(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, OutsideApiCommandCallback(), common_client_count=50) manager.run(blocking=False) # 启动交易服务 huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr) # 下单距离太远取消订单 t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True) t1.start() # 清理无用的客户端 t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) t1.start() logger_system.info("create TradeServer") laddr = "0.0.0.0", 10008 try: tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{laddr[1]} 启动失败") except Exception as e: logger_system.exception(e) if __name__ == "__main__": code = "002528" global_data_loader.init() kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas() l2_data_util.load_l2_data(code, False, False) datas = log_export.load_l2_market_data() datas = datas[code] for data in datas: TradeServerProcessor.l2_market_data(code, data)