api/outside_api_command_callback.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
huaxin_client/l2_client.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_test.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
third_data/custom_block_in_money_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade/buy_radical/radical_buy_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
api/outside_api_command_callback.py
@@ -49,7 +49,7 @@ from trade import trade_manager, l2_trade_util, trade_data_manager, trade_constant import l2_data_util as l2_data_util_old from trade.buy_money_count_setting import BuyMoneyAndCountSetting, RadicalBuyBlockCodeCountManager from trade.buy_radical import block_special_codes_manager from trade.buy_radical import block_special_codes_manager, radical_buy_data_manager from trade.huaxin import huaxin_trade_api, huaxin_trade_data_update, \ huaxin_trade_record_manager, huaxin_trade_order_processor, huaxin_sell_util @@ -1417,9 +1417,10 @@ elif ctype == "test_place_order": # 获取相同板块的涨停代码数量 # code = data.get("code") code = data.get("code") # total_datas = l2_data_util.local_today_datas.get(code) # trade_manager.test_order(code, total_datas[-1], total_datas[-1]["index"]) radical_buy_data_manager.pull_pre_deal_big_orders(code) self.send_response({"code": 0, "data": {}}, client_id, request_id) huaxin_client/l2_client.py
@@ -17,7 +17,7 @@ from huaxin_client.l2_data_manager import L2DataUploadManager from log_module import log, async_log_util from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug from utils import tool ###B类### @@ -51,7 +51,6 @@ b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] set_codes_data_queue = queue.Queue(maxsize=1000) market_code_dict = {} ENABLE_NGST = True @@ -106,6 +105,9 @@ self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) def subscribe_codes(self, _codes): self.__subscribe(_codes) def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) @@ -241,6 +243,8 @@ if pRspInfo["ErrorID"] == 0: print("订阅成功") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) # 初始化 SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("订阅响应结束", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) @@ -262,6 +266,8 @@ if pRspInfo["ErrorID"] == 0: print("订阅成功") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) # 初始化 SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("订阅响应结束", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) @@ -326,8 +332,8 @@ "avgAskPrice": pDepthMarketData["AvgAskPrice"], "buy": buys, "sell": sells} market_code_dict[pDepthMarketData['SecurityID']] = time.time() self.l2_data_upload_manager.add_market_data(d) SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID']) except: pass @@ -498,6 +504,45 @@ print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) class SubscriptDefend: """ 订阅守护 定义:当订阅的代码超过一定时间没有回调数据时重新订阅 """ __l2_market_update_time = {} @classmethod def set_l2_market_update(cls, code): cls.__l2_market_update_time[code] = time.time() @classmethod def run(cls): while True: try: now_time = tool.get_now_time_as_int() if now_time < int("093015"): continue if int("112945") < now_time < int("130015"): continue if int("145645") < now_time: continue if spi.subscripted_codes: codes = [] for code in spi.subscripted_codes: # 获取上次更新时间 update_time = cls.__l2_market_update_time.get(code) if update_time and time.time() - update_time > 15: # 需要重新订阅 codes.append(code) if codes: logger_debug.info(f"重新订阅:{codes}") spi.subscribe_codes(codes) except: pass finally: time.sleep(15) class MyL2ActionCallback(L2ActionCallback): def OnSetL2Position(self, codes_data): @@ -621,7 +666,8 @@ if queue_r is not None: t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) t1.start() # 订阅守护 threading.Thread(target=SubscriptDefend.run, daemon=True).start() # 初始化 data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) l2_test.py
@@ -37,15 +37,18 @@ response_data = json.dumps({"code": 0, "data": fdatas}) except Exception as e: response_data = json.dumps({"code": 1, "msg": str(e)}) elif url.path == "/get_big_buy_order_list": # 获获取代码的大买单列表 elif url.path == "/get_big_order_list": # 获获取代码的大买/卖单列表 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict.get('code') try: fdatas = CodeInMoneyManager().get_big_buy_money_list(code) if fdatas is None: fdatas = [] response_data = json.dumps({"code": 0, "data": fdatas}) buy_datas = CodeInMoneyManager().get_big_buy_money_list(code) if buy_datas is None: buy_datas = [] sell_datas = CodeInMoneyManager().get_big_sell_money_list(code) if sell_datas is None: sell_datas = [] response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}}) except Exception as e: response_data = json.dumps({"code": 1, "msg": str(e)}) elif url.path == "/get_code_money_info": third_data/custom_block_in_money_manager.py
@@ -23,8 +23,10 @@ # 总卖单信息:{"code":[金额, 数量]} self.__code_sell_money_dict = {} # 净流入大单金额 self.__code_big_buy_mmoney_list_dict = {} # 净流入大单买金额 self.__code_big_buy_money_list_dict = {} # 净流出大单卖金额 self.__code_big_sell_money_list_dict = {} self.__latest_price = {} self.__load_data() @@ -39,6 +41,11 @@ self.add_data(item) def add_data(self, item): """ 添加数据 @param item: (代码,类型, 订单数据) 订单数据:(订单号, 量, 金额, 时间, 最新成交价格) @return: """ code = item[0] if code not in self.__code_money_dict: self.__code_money_dict[code] = 0 @@ -59,15 +66,22 @@ self.__code_buy_money_dict[code][0] += item[2][2] self.__code_buy_money_dict[code][1] += 1 if code not in self.__code_big_buy_mmoney_list_dict: self.__code_big_buy_mmoney_list_dict[code] = [] # 大买单信息:(金额,最后价格) if code not in self.__code_big_buy_money_list_dict: self.__code_big_buy_money_list_dict[code] = [] # 大买单信息:(金额, 最新价格, 订单号) if len(item[2]) >= 5: self.__code_big_buy_mmoney_list_dict[code].append((item[2][2], item[2][4], item[2][0])) self.__code_big_buy_money_list_dict[code].append((item[2][2], item[2][4], item[2][0])) else: self.__code_money_dict[code] -= item[2][2] self.__code_sell_money_dict[code][0] += item[2][2] self.__code_sell_money_dict[code][1] += 1 # 大卖单信息 if code not in self.__code_big_sell_money_list_dict: self.__code_big_sell_money_list_dict[code] = [] if len(item[2]) >= 5: # 大卖单信息:(金额, 最新价格, 订单号) self.__code_big_sell_money_list_dict[code].append((item[2][2], item[2][4], item[2][0])) self.__latest_price[code] = item[2][4] def get_code_money_dict(self): @@ -96,7 +110,15 @@ @param code: @return:[(金额, 价格, 订单号)] """ return self.__code_big_buy_mmoney_list_dict.get(code) return self.__code_big_buy_money_list_dict.get(code) def get_big_sell_money_list(self, code): """ 获取代码的大买单列表 @param code: @return:[(金额, 价格, 订单号)] """ return self.__code_big_sell_money_list_dict.get(code) def get_latest_price(self, code): return self.__latest_price.get(code) trade/buy_radical/radical_buy_data_manager.py
@@ -9,12 +9,14 @@ import constant import l2_data_util from l2 import l2_data_util as l2_data_util_new from code_attribute import code_nature_analyse, code_volumn_manager, gpcode_manager from code_attribute.code_l1_data_manager import L1DataManager from code_attribute.gpcode_manager import WantBuyCodesManager from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from l2.huaxin import l2_huaxin_util from l2.l2_data_util import L2DataUtil from l2.l2_transaction_data_manager import BigOrderDealManager, HuaXinBuyOrderManager from log_module import async_log_util from log_module.log import logger_l2_radical_buy, logger_debug, logger_l2_radical_buy_data @@ -37,8 +39,10 @@ """ __db = 3 __big_order_threshold = {} # 已经成交的累计大单金额:用于初次上板尚未订阅的情况 # 已经成交的累计大买单金额:用于初次上板尚未订阅的情况 __already_total_deal_big_order_money = {} # 已经成交的累计大卖单金额:用于初次上板尚未订阅的情况 __already_total_sell_deal_big_order_money = {} __redis_manager = redis_manager.RedisManager(3) def __init__(self): @@ -72,29 +76,31 @@ redis_manager.RedisUtils.setex_async(self.__db, f"radical_big_order_threshold-{code}", tool.get_expire(), f"{threshold_money}") def set_big_deal_order_list(self, code, money_list, limit_up_price): def set_big_deal_order_list(self, code, buy_money_list, sell_money_list, limit_up_price, min_order_no): """ 设置大单成交数据 @param code: @param money_list:[(金额,价格,订单号)] @param buy_money_list:[(金额,价格,订单号)] @param sell_money_list:[(金额,价格,订单号)] @param limit_up_price: @param min_order_no: 最小的订单号 @return: """ # 涨停价成交的大单(策略进程尚未统计到的) total_deal_money = 0 total_deal_money_info_list = [] total_deal_buy_money = 0 total_deal_buy_money_info_list = [] limit_up_price_money_list = [] pre_limit_up_price_money_list = [] deal_order_ids = BigOrderDealManager().get_total_buy_order_ids(code) for info in money_list: for info in buy_money_list: if info[1] != limit_up_price: continue limit_up_price_money_list.append(info[0]) if info[2] in deal_order_ids: continue pre_limit_up_price_money_list.append((info[0], info[2])) total_deal_money += info[0] total_deal_money_info_list.append(info) total_deal_buy_money += info[0] total_deal_buy_money_info_list.append(info) if limit_up_price_money_list: # 计算大单的阈值 average_money = sum(limit_up_price_money_list) // len(limit_up_price_money_list) @@ -102,8 +108,25 @@ # 取max((平均值+最大单一半)/2, 平均值) threshold_money = max((max_money // 2 + average_money) // 2, average_money) self.set_big_order_threshold(code, threshold_money) self.__already_total_deal_big_order_money[code] = (total_deal_money, pre_limit_up_price_money_list) async_log_util.info(logger_l2_radical_buy_data, f"之前的大单:{code}-{total_deal_money}-{total_deal_money_info_list}") self.__already_total_deal_big_order_money[code] = (total_deal_buy_money, pre_limit_up_price_money_list) async_log_util.info(logger_l2_radical_buy_data, f"之前的大买单:{code}-{total_deal_buy_money}-{total_deal_buy_money_info_list}") # 处理大卖单 pre_limit_up_price_money_sell_list = [] if min_order_no: total_deal_sell_money_info_list = [] total_deal_sell_money = 0 for info in sell_money_list: if info[1] != limit_up_price: continue if info[2] >= min_order_no: continue pre_limit_up_price_money_sell_list.append((info[0], info[2])) total_deal_sell_money += info[0] total_deal_sell_money_info_list.append(info) async_log_util.info(logger_l2_radical_buy_data, f"之前的大卖单:{code}-{total_deal_sell_money}-{total_deal_sell_money_info_list}") self.__already_total_sell_deal_big_order_money[code] = (total_deal_sell_money, pre_limit_up_price_money_sell_list) def get_big_order_threshold(self, code): """ @@ -124,6 +147,12 @@ if code in self.__already_total_deal_big_order_money: return self.__already_total_deal_big_order_money[code][0] return 0 def get_sell_deal_big_order_money(self, code): if code in self.__already_total_sell_deal_big_order_money: return self.__already_total_sell_deal_big_order_money[code][0] return 0 def get_deal_big_order_money_list(self, code): """ @@ -1508,14 +1537,22 @@ @return: """ response_data = requests.get( "http://127.0.0.1:9005/get_big_buy_order_list?code=" + code_) "http://127.0.0.1:9005/get_big_order_list?code=" + code_) r_str = response_data.text response_data = json.loads(r_str) if response_data["code"] == 0: datas = response_data["data"] if datas: BeforeSubDealBigOrderManager().set_big_deal_order_list(code_, datas, gpcode_manager.get_limit_up_price_as_num(code_)) buy_datas = datas["buy"] sell_datas = datas["sell"] total_datas = l2_data_util_new.local_today_datas.get(code_) min_order_no = None for data in total_datas: if L2DataUtil.is_buy(data["val"]) or L2DataUtil.is_sell(data["val"]): min_order_no = data["val"]["orderNo"] break BeforeSubDealBigOrderManager().set_big_deal_order_list(code_, buy_datas, sell_datas, gpcode_manager.get_limit_up_price_as_num(code_), min_order_no) def get_l2_big_order_deal_info(code_):