From f51466f1d4563f97b1ec620b70a1c94f01a6a2e1 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期五, 21 七月 2023 16:23:51 +0800 Subject: [PATCH] 交易优化 --- l2_client.py | 193 ++++++++++++++++++++++++------------------------ 1 files changed, 96 insertions(+), 97 deletions(-) diff --git a/l2_client.py b/l2_client.py index e89cb5c..30239db 100644 --- a/l2_client.py +++ b/l2_client.py @@ -2,6 +2,7 @@ # -*- coding: UTF-8 -*- import json import logging +import queue import threading import time @@ -36,6 +37,9 @@ SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] +spi = None +set_codes_data_queue = queue.Queue() +market_code_dict = {} class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): @@ -50,58 +54,57 @@ self.__api = api self.is_login = False - # 璁㈤槄浠g爜,[(浠g爜,鏈�浣庢墜鏁�,娑ㄥ仠浠�)] - def set_codes_data(self, codes_data): - print("set_codes_data", codes_data) + def __split_codes(self, codes): + szse_codes = [] + sse_codes = [] + for code in codes: + if code.find("00") == 0: + szse_codes.append(code.encode()) + elif code.find("60") == 0: + sse_codes.append(code.encode()) + return sse_codes, szse_codes - def split_codes(codes): - szse_codes = [] - sse_codes = [] - for code in codes: - if code.find("00") == 0: - szse_codes.append(code.encode()) - elif code.find("60") == 0: - sse_codes.append(code.encode()) - return sse_codes, szse_codes + # 鏂板璁㈤槄 - # 鏂板璁㈤槄 - def subscribe(_codes): - sh, sz = split_codes(_codes) - logger_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}") - logger_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}") - if sh: - # 璁㈤槄閫愮瑪濮旀墭 - result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}") - # 璁㈤槄閫愮瑪鎴愪氦 - result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}") + # 鍙栨秷璁㈤槄 + def __unsubscribe(self, _codes): + sh, sz = self.__split_codes(_codes) + logger_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}") + logger_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}") + if sh: + # 鍙栨秷璁㈤槄閫愮瑪濮旀墭 + self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦 + self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + if sz: + self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}") - if sz: - result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz锛歿result}") - result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz锛歿result}") - result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz锛歿result}") + def __subscribe(self, _codes): + sh, sz = self.__split_codes(_codes) + logger_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}") + logger_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}") + if sh: + # 璁㈤槄閫愮瑪濮旀墭 + result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}") + # 璁㈤槄閫愮瑪鎴愪氦 + result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}") - # 鍙栨秷璁㈤槄 - def unsubscribe(_codes): - sh, sz = split_codes(_codes) - logger_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}") - logger_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}") - if sh: - # 鍙栨秷璁㈤槄閫愮瑪濮旀墭 - self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦 - self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - if sz: - self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) - self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}") + if sz: + result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz锛歿result}") + result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz锛歿result}") + result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz锛歿result}") + + def __process_codes_data(self, codes_data): if not self.is_login and not constant.TEST: raise Exception("L2灏氭湭鐧诲綍") @@ -120,13 +123,15 @@ l2_data_manager.target_codes.discard(c) for c in add_codes: l2_data_manager.run_upload_task(c) - if len(add_codes) > 10: - unsubscribe(add_codes) - subscribe(add_codes) - unsubscribe(del_codes) + self.__subscribe(add_codes) + self.__unsubscribe(del_codes) # 璁剧疆鏈�杩戠殑浠g爜鍒楄〃 self.latest_codes_set = codes + + # 璁㈤槄浠g爜,[(浠g爜,鏈�浣庢墜鏁�,娑ㄥ仠浠�)] + def set_codes_data(self, codes_data): + self.__process_codes_data(codes_data) def set_code_special_watch_volume(self, code, volume): # 鏈夋晥鏈熶负3s @@ -134,44 +139,23 @@ def OnFrontConnected(self): print("OnFrontConnected") + + logout_req = lev2mdapi.CTORATstpUserLogoutField() + self.__api.ReqUserLogout(logout_req, 1) + time.sleep(1) # 璇锋眰鐧诲綍 login_req = lev2mdapi.CTORATstpReqUserLoginField() - self.__api.ReqUserLogin(login_req, 1) + self.__api.ReqUserLogin(login_req, 2) def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast): print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % ( pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast)) if pRspInfo['ErrorID'] == 0: + print("----L2琛屾儏鐧诲綍鎴愬姛----") self.is_login = True - if g_SubMarketData: - self.__api.SubscribeMarketData(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE); - self.__api.SubscribeMarketData(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE); - - if g_SubTransaction: - self.__api.SubscribeTransaction(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE); - self.__api.SubscribeTransaction(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE); - - if g_SubOrderDetail: - self.__api.SubscribeOrderDetail(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE); - self.__api.SubscribeOrderDetail(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE); - - if g_SubXTSTick: - self.__api.SubscribeXTSTick(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE); - - if g_SubXTSMarketData: - self.__api.SubscribeXTSMarketData(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE); - - if g_SubBondMarketData: - self.__api.SubscribeBondMarketData(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE); - - if g_SubBondTransaction: - self.__api.SubscribeBondTransaction(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE); - - if g_SubBondOrderDetail: - self.__api.SubscribeBondOrderDetail(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE); - # 4.0.5鐗堟湰鎺ュ彛 - if g_SubNGTSTick: - self.__api.SubscribeNGTSTick(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE); + # t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True) + # # 鍚庡彴杩愯 + # t1.start() def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubMarketData") @@ -184,22 +168,32 @@ def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubOrderDetail") + # try: print("璁㈤槄缁撴灉锛�", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], pRspInfo["ErrorMsg"]) - logger_l2_subscript.info(f"璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") + logger_l2_subscript.info( + f"璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") if pRspInfo["ErrorID"] == 0: print("璁㈤槄鎴愬姛") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) if bIsLast == 1: - l2_data_manager.add_subscript_codes(self.subscripted_codes) + t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes), + daemon=True) + # 鍚庡彴杩愯 + t1.start() def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspUnSubOrderDetail") - code = pSpecificSecurity['SecurityID'] - if code in self.subscripted_codes: - self.subscripted_codes.remove(code) - if bIsLast == 1: - l2_data_manager.add_subscript_codes(self.subscripted_codes) + try: + code = pSpecificSecurity['SecurityID'] + self.subscripted_codes.discard(code) + if bIsLast == 1: + t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes), + daemon=True) + # 鍚庡彴杩愯 + t1.start() + except Exception as e: + logging.exception(e) def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubBondMarketData") @@ -238,7 +232,10 @@ (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) ]} - l2_data_manager.add_market_data(d) + market_code_dict[pDepthMarketData['SecurityID']] = time.time() + + t1 = threading.Thread(target=lambda: l2_data_manager.add_market_data(d), daemon=True) + t1.start() # 杈撳嚭琛屾儏蹇収鏁版嵁 # print( @@ -273,14 +270,14 @@ def OnRtnTransaction(self, pTransaction): code = str(pTransaction['SecurityID']) min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) - if min_volume is None: - # 榛樿绛涢��50w - if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: - return - elif pTransaction['TradeVolume'] < min_volume: - return # 杈撳嚭閫愮瑪鎴愪氦鏁版嵁 if pTransaction['ExecType'] == b"2": + if min_volume is None: + # 榛樿绛涢��50w + if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: + return + elif pTransaction['TradeVolume'] < min_volume: + return # 鎾ゅ崟 item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], "Volume": pTransaction['TradeVolume'], @@ -302,7 +299,7 @@ print("閫愮瑪濮旀墭", item) l2_data_manager.add_l2_order_detail(item, True) else: - if abs(pTransaction['TradePrice'] - limit_up_price) < 0.001: + if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: # 娑ㄥ仠浠� # 鎴愪氦 item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], @@ -483,6 +480,8 @@ SendResponseSkManager.send_normal_response("l2_cmd", SendResponseSkManager.load_response(client_id, request_id, {"code": 0, "msg": "璁剧疆鎴愬姛"})) + + except Exception as e: logging.exception(e) SendResponseSkManager.send_error_response("common", request_id, client_id, str(e)) -- Gitblit v1.8.0