From 6bbfbbb16d792f7737ec86cabdba5c0e98dcf4b4 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 29 八月 2025 17:41:29 +0800 Subject: [PATCH] 有涨停买撤单要触发撤单计算 --- huaxin_client/l1_client.py | 231 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 174 insertions(+), 57 deletions(-) diff --git a/huaxin_client/l1_client.py b/huaxin_client/l1_client.py index 2a62460..0ee112f 100644 --- a/huaxin_client/l1_client.py +++ b/huaxin_client/l1_client.py @@ -1,15 +1,24 @@ # -*- coding: utf-8 -*- import json import logging +import multiprocessing import os import threading import time from huaxin_client import socket_util, l1_subscript_codes_manager import xmdapi -from huaxin_client import tool -from huaxin_client.client_network import SendResponseSkManager -from log_module.log import logger_system, logger_local_huaxin_l1 +from huaxin_client import tool, constant +from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug +from third_data import custom_block_in_money_manager +from utils import tool as out_tool + +################B绫�################## +ADDRESS = "udp://224.224.1.19:7880" + +################A绫�################## +if constant.IS_A: + ADDRESS = "udp://224.224.1.9:7880" level1_data_dict = { @@ -45,9 +54,29 @@ login_req = xmdapi.CTORATstpReqUserLoginField() self.__api.ReqUserLogin(login_req, 1) + def subscribe_codes(self, codes_sh, codes_sz): + # 閲嶆柊璁㈤槄浠g爜 + if codes_sh: + ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE) + if ret != 0: + # print('SubscribeMarketData fail, ret[%d]' % ret) + pass + else: + # print('SubscribeMarketData success, ret[%d]' % ret) + pass + + if codes_sz: + ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) + if ret != 0: + # print('SubscribeMarketData fail, ret[%d]' % ret) + pass + else: + # print('SubscribeMarketData success, ret[%d]' % ret) + pass + def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID): if pRspInfoField.ErrorID == 0: - print('Login success! [%d]' % nRequestID) + # print('Login success! [%d]' % nRequestID) ''' 璁㈤槄琛屾儏 @@ -56,21 +85,7 @@ 鍏跺畠鎯呭喌锛岃闃卻ub_arr闆嗗悎涓殑鍚堢害琛屾儏 ''' - print(f"璁㈤槄鏁伴噺锛歴h-{len(self.codes_sh)} sz-{len(self.codes_sz)}") - if self.codes_sh: - ret = self.__api.SubscribeMarketData(self.codes_sh, xmdapi.TORA_TSTP_EXD_SSE) - if ret != 0: - print('SubscribeMarketData fail, ret[%d]' % ret) - else: - print('SubscribeMarketData success, ret[%d]' % ret) - - if self.codes_sz: - ret = self.__api.SubscribeMarketData(self.codes_sz, xmdapi.TORA_TSTP_EXD_SZSE) - if ret != 0: - print('SubscribeMarketData fail, ret[%d]' % ret) - else: - print('SubscribeMarketData success, ret[%d]' % ret) - + self.subscribe_codes(self.codes_sh, self.codes_sz) # sub_arr = [b'600004'] # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE) # if ret != 0: @@ -80,77 +95,147 @@ else: - print('Login fail!!! [%d] [%d] [%s]' - % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) + pass + # print('Login fail!!! [%d] [%d] [%s]' + # % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField): if pRspInfoField.ErrorID == 0: - print('OnRspSubMarketData: OK!') + # print('OnRspSubMarketData: OK!') + pass else: - print('OnRspSubMarketData: Error! [%d] [%s]' - % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) + # print('OnRspSubMarketData: Error! [%d] [%s]' + # % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) + pass def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField): if pRspInfoField.ErrorID == 0: - print('OnRspUnSubMarketData: OK!') + # print('OnRspUnSubMarketData: OK!') + pass else: - print('OnRspUnSubMarketData: Error! [%d] [%s]' - % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) + pass + # print('OnRspUnSubMarketData: Error! [%d] [%s]' + # % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg)) def OnRtnMarketData(self, pMarketDataField): if pMarketDataField.SecurityName.find("S") == 0: return if pMarketDataField.SecurityName.find("ST") >= 0: return - close_price = round(pMarketDataField.UpperLimitPrice / 1.1, 2) - rate = round((pMarketDataField.LastPrice - close_price) * 100 / close_price, 2) - # print(pMarketDataField.SecurityID, pMarketDataField.SecurityName, rate, pMarketDataField.Volume) - + close_price = pMarketDataField.PreClosePrice + lastPrice = pMarketDataField.LastPrice + if pMarketDataField.BidPrice1: + lastPrice = pMarketDataField.BidPrice1 + rate = round((lastPrice - close_price) * 100 / close_price, 2) + if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001: + # 娑ㄥ仠鏉�20%浠ヤ笂鐨勬墦鎶� + rate = rate / 2 + # (浠g爜, 鐜颁环, 娑ㄥ箙, 閲�, 褰撳墠鏃堕棿, 涔�1浠�, 涔�1閲�, 涔�2浠�, 涔�2閲�, 鏇存柊鏃堕棿, 鍗�1浠�, 鍗�1閲�) level1_data_dict[pMarketDataField.SecurityID] = ( - pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time()) - # print( - # "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]" - # % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice, - # pMarketDataField.Volume, - # pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, - # pMarketDataField.AskPrice1, - # pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice)) + pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(), + pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2, + pMarketDataField.BidVolume2, pMarketDataField.UpdateTime, pMarketDataField.AskPrice1, + pMarketDataField.AskVolume1) __latest_subscript_codes = set() -def __upload_codes_info(pipe_l2, datas): - if not tool.is_trade_time(): +def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas): + if not tool.is_trade_time() and not tool.is_pre_trade_time(): return # 涓婁紶鏁版嵁 type_ = "set_target_codes" + request_id = f"sb_{int(time.time() * 1000)}" fdata = json.dumps( - {"type": type_, "data": {"data": datas}}) - if pipe_l2 is not None: - pipe_l2.send(fdata) + {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}) + if queue_l1_w_strategy_r is not None: + queue_l1_w_strategy_r.put_nowait(fdata) # 璁板綍鏂板鍔犵殑浠g爜 codes = set([x[0] for x in datas]) add_codes = codes - __latest_subscript_codes __latest_subscript_codes.clear() for c in codes: __latest_subscript_codes.add(c) - logger_local_huaxin_l1.info(f"鏂板鍔犺闃呯殑浠g爜锛歿add_codes}") + if add_codes: + logger_local_huaxin_l1.info(f"({request_id})鏂板鍔犺闃呯殑浠g爜锛歿add_codes}") -def run(pipe_l2): - logger_local_huaxin_l1.info("杩愯l1璁㈤槄鏈嶅姟") +# 閲嶆柊璁㈤槄浠g爜 +def re_subscript(spi: MdSpi): + try: + codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes() + if len(codes_sh) > 100 and len(codes_sz) > 100: + logger_local_huaxin_l1.info(f"閲嶆柊璁㈤槄 sh-{len(codes_sh)} sz-{len(codes_sz)}") + spi.subscribe_codes(codes_sh, codes_sz) + except: + pass + + +__position_codes = set() + + +def __read_from_strategy(queue_l1_r_strategy_w: multiprocessing.Queue): + while True: + try: + data = queue_l1_r_strategy_w.get() + if type(data) == str: + data = json.loads(data) + if data["type"] == "set_position_codes": + codes = set(data["data"]) + global __position_codes + __position_codes = codes + logger_local_huaxin_l1.info(f"鏀跺埌绛栫暐娑堟伅锛歿data}", ) + except: + pass + finally: + time.sleep(1) + + +def __run_subscript_task(spi): + """ + 杩愯璁㈤槄浠诲姟锛屽湪9:19鍒�9:29涔嬮棿寮�濮嬭闃� + @return: + """ + is_re_subscript = False + while True: + try: + # 鍒ゆ柇鏄惁闇�瑕侀噸鏂拌闃� + if tool.is_pre_trade_time(): + re_subscript(spi) + is_re_subscript = True + if is_re_subscript: + break + except: + pass + finally: + time.sleep(3) + + +def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None): + """ + 杩愯l1璁㈤槄浠诲姟 + + @param queue_l1_w_strategy_r: L1鏂瑰啓锛岀瓥鐣ユ柟璇� + @param queue_l1_r_strategy_w: L1鏂硅锛岀瓥鐣ユ柟鍐� + @param fixed_codes: 鍥哄畾瑕佽繑鍥炴暟鎹殑浠g爜 + @return: + """ + if fixed_codes is None: + fixed_codes = set() + logger_local_huaxin_l1.info(f"杩愯l1璁㈤槄鏈嶅姟锛屽浐瀹氫唬鐮侊細{fixed_codes}") codes_sh = [] codes_sz = [] for i in range(15): try: + logger_local_huaxin_l1.info("寮�濮嬭幏鍙栫洰鏍囦唬鐮�") codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() logger_local_huaxin_l1.info(f"鑾峰彇涓婅瘉锛屾繁璇佷唬鐮佹暟閲忥細sh-{len(codes_sh)} sz-{len(codes_sz)}") break except Exception as e: logger_local_huaxin_l1.exception(e) time.sleep(4) - + logger_system.info(f"鑾峰彇L1璁㈤槄鐩爣绁ㄦ暟閲忥細sh-{len(codes_sh)} sz-{len(codes_sz)}") # 鎵撳嵃鎺ュ彛鐗堟湰鍙� print(xmdapi.CTORATstpXMdApi_GetApiVersion()) @@ -164,20 +249,36 @@ api.RegisterSpi(spi) # 娉ㄥ唽鍗曚釜琛屾儏鍓嶇疆鏈嶅姟鍦板潃 - # api.RegisterFront("tcp://224.224.1.19:7880") + # api.RegisterFront("tcp://210.14.72.16:9402") # 娉ㄥ唽澶氫釜琛屾儏鍓嶇疆鏈嶅姟鍦板潃锛岀敤閫楀彿闅斿紑 # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402") # 娉ㄥ唽鍚嶅瓧鏈嶅姟鍣ㄥ湴鍧�锛屾敮鎸佸鏈嶅姟鍦板潃閫楀彿闅斿紑 # api.RegisterNameServer('tcp://224.224.3.19:7888') # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370') - api.RegisterMulticast("udp://224.224.1.19:7880", None, "") + + # -------------------------姝e紡鍦板潃B绫�------------------------------- + api.RegisterMulticast(ADDRESS, None, "") + + # -------------------------姝e紡鍦板潃A绫�------------------------------- + # api.RegisterMulticast("udp://224.224.1.9:7880", None, "") # 鍚姩鎺ュ彛 api.Init() + logger_system.info("L1璁㈤槄鏈嶅姟鍚姩鎴愬姛") + # 娴嬭瘯閾捐矾 + # level1_data_dict["000969"] = ( + # "000969", 9.46, 9.11, 771000*100, time.time()) + # level1_data_dict["002292"] = ( + # "002292", 8.06, 9.96, 969500 * 100, time.time()) + + threading.Thread(target=__read_from_strategy, args=(queue_l1_r_strategy_w,), daemon=True).start() + + threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start() + # 绛夊緟绋嬪簭缁撴潫 while True: - print("鏁伴噺", len(level1_data_dict)) + # print("鏁伴噺", len(level1_data_dict)) try: if len(level1_data_dict) < 1: continue @@ -186,19 +287,35 @@ # (浠g爜,鐜颁环,娑ㄥ箙,閲�,鏃堕棿) list_ = [level1_data_dict[k] for k in level1_data_dict] flist = [] + now_time_int = int(tool.get_now_time_str().replace(":", "")) + threshold_rate = constant.L1_MIN_RATE for d in list_: - if d[2] >= 5: - # 娑ㄥ箙灏忎簬5%鐨勯渶瑕佸垹闄� + if d[2] >= threshold_rate or d[0] in fixed_codes: + # 娑ㄥ箙灏忎簬3%鐨勯渶瑕佸垹闄� flist.append(d) flist.sort(key=lambda x: x[2], reverse=True) - datas = flist[:100] - codes = [x[0] for x in datas] - print("浠g爜鏁伴噺:", len(datas)) - __upload_codes_info(pipe_l2, datas) + # 灏嗗浐瀹氫唬鐮佺殑鎺掑湪鏈�鍓� + for code in fixed_codes: + if code in level1_data_dict: + flist.insert(0, level1_data_dict[code]) + # 姝e紡浜ゆ槗涔嬪墠鍏堝鐞嗘瘮杈冨皯鐨勬暟鎹紝涓嶇劧澶勭悊鏃堕棿涔呴�犳垚鏁版嵁鎷ュ牭 + MAX_COUNT = 500 + if now_time_int < int("092600"): + MAX_COUNT = 200 + elif now_time_int < int("092800"): + MAX_COUNT = 300 + elif now_time_int < int("092900"): + MAX_COUNT = 400 + datas = flist[:MAX_COUNT] + if len(datas) > 0: + logger_l2_codes_subscript.info("寮�濮�#鍗庨懌L1涓婁紶浠g爜锛氭暟閲�-{}", len(datas)) + __upload_codes_info(queue_l1_w_strategy_r, datas) except Exception as e: logging.exception(e) + logger_debug.exception(e) finally: time.sleep(3) + # 閲婃斁鎺ュ彛瀵硅薄 api.Release() -- Gitblit v1.8.0