Administrator
2025-01-06 6567e8cd1fb11ea10912bb3ac5bf2965c74c0e4b
统计大卖单
5个文件已修改
173 ■■■■ 已修改文件
api/outside_api_command_callback.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/custom_block_in_money_manager.py 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -49,7 +49,7 @@
from trade import trade_manager, l2_trade_util, trade_data_manager, trade_constant
import l2_data_util as l2_data_util_old
from trade.buy_money_count_setting import BuyMoneyAndCountSetting, RadicalBuyBlockCodeCountManager
from trade.buy_radical import block_special_codes_manager
from trade.buy_radical import block_special_codes_manager, radical_buy_data_manager
from trade.huaxin import huaxin_trade_api, huaxin_trade_data_update, \
    huaxin_trade_record_manager, huaxin_trade_order_processor, huaxin_sell_util
@@ -1417,9 +1417,10 @@
            elif ctype == "test_place_order":
                # 获取相同板块的涨停代码数量
                # code = data.get("code")
                code = data.get("code")
                # total_datas = l2_data_util.local_today_datas.get(code)
                # trade_manager.test_order(code, total_datas[-1], total_datas[-1]["index"])
                radical_buy_data_manager.pull_pre_deal_big_orders(code)
                self.send_response({"code": 0, "data": {}},
                                   client_id,
                                   request_id)
huaxin_client/l2_client.py
@@ -17,7 +17,7 @@
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug
from utils import tool
###B类###
@@ -51,7 +51,6 @@
                 b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
set_codes_data_queue = queue.Queue(maxsize=1000)
market_code_dict = {}
ENABLE_NGST = True
@@ -106,6 +105,9 @@
            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
    def subscribe_codes(self, _codes):
        self.__subscribe(_codes)
    def __subscribe(self, _codes):
        sh, sz = self.__split_codes(_codes)
@@ -241,6 +243,8 @@
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -262,6 +266,8 @@
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
            # 初始化
            SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID'])
        if bIsLast == 1:
            print("订阅响应结束", self.subscripted_codes)
            l2_data_manager.add_subscript_codes(self.subscripted_codes)
@@ -326,8 +332,8 @@
                 "avgAskPrice": pDepthMarketData["AvgAskPrice"],
                 "buy": buys,
                 "sell": sells}
            market_code_dict[pDepthMarketData['SecurityID']] = time.time()
            self.l2_data_upload_manager.add_market_data(d)
            SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID'])
        except:
            pass
@@ -498,6 +504,45 @@
            print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index]))
class SubscriptDefend:
    """
    订阅守护
    定义:当订阅的代码超过一定时间没有回调数据时重新订阅
    """
    __l2_market_update_time = {}
    @classmethod
    def set_l2_market_update(cls, code):
        cls.__l2_market_update_time[code] = time.time()
    @classmethod
    def run(cls):
        while True:
            try:
                now_time = tool.get_now_time_as_int()
                if now_time < int("093015"):
                    continue
                if int("112945") < now_time < int("130015"):
                    continue
                if int("145645") < now_time:
                    continue
                if spi.subscripted_codes:
                    codes = []
                    for code in spi.subscripted_codes:
                        # 获取上次更新时间
                        update_time = cls.__l2_market_update_time.get(code)
                        if update_time and time.time() - update_time > 15:
                            # 需要重新订阅
                            codes.append(code)
                    if codes:
                        logger_debug.info(f"重新订阅:{codes}")
                        spi.subscribe_codes(codes)
            except:
                pass
            finally:
                time.sleep(15)
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes_data):
@@ -621,7 +666,8 @@
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        # 订阅守护
        threading.Thread(target=SubscriptDefend.run, daemon=True).start()
        # 初始化
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
        l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager)
l2_test.py
@@ -37,15 +37,18 @@
                response_data = json.dumps({"code": 0, "data": fdatas})
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
        elif url.path == "/get_big_buy_order_list":
            # 获获取代码的大买单列表
        elif url.path == "/get_big_order_list":
            # 获获取代码的大买/卖单列表
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict.get('code')
            try:
                fdatas = CodeInMoneyManager().get_big_buy_money_list(code)
                if fdatas is None:
                    fdatas = []
                response_data = json.dumps({"code": 0, "data": fdatas})
                buy_datas = CodeInMoneyManager().get_big_buy_money_list(code)
                if buy_datas is None:
                    buy_datas = []
                sell_datas = CodeInMoneyManager().get_big_sell_money_list(code)
                if sell_datas is None:
                    sell_datas = []
                response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}})
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
        elif url.path == "/get_code_money_info":
third_data/custom_block_in_money_manager.py
@@ -23,8 +23,10 @@
        # 总卖单信息:{"code":[金额, 数量]}
        self.__code_sell_money_dict = {}
        # 净流入大单金额
        self.__code_big_buy_mmoney_list_dict = {}
        # 净流入大单买金额
        self.__code_big_buy_money_list_dict = {}
        # 净流出大单卖金额
        self.__code_big_sell_money_list_dict = {}
        self.__latest_price = {}
        self.__load_data()
@@ -39,6 +41,11 @@
                    self.add_data(item)
    def add_data(self, item):
        """
        添加数据
        @param item: (代码,类型, 订单数据)  订单数据:(订单号, 量, 金额, 时间, 最新成交价格)
        @return:
        """
        code = item[0]
        if code not in self.__code_money_dict:
            self.__code_money_dict[code] = 0
@@ -59,15 +66,22 @@
            self.__code_buy_money_dict[code][0] += item[2][2]
            self.__code_buy_money_dict[code][1] += 1
            if code not in self.__code_big_buy_mmoney_list_dict:
                self.__code_big_buy_mmoney_list_dict[code] = []
            # 大买单信息:(金额,最后价格)
            if code not in self.__code_big_buy_money_list_dict:
                self.__code_big_buy_money_list_dict[code] = []
            # 大买单信息:(金额, 最新价格, 订单号)
            if len(item[2]) >= 5:
                self.__code_big_buy_mmoney_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
                self.__code_big_buy_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
        else:
            self.__code_money_dict[code] -= item[2][2]
            self.__code_sell_money_dict[code][0] += item[2][2]
            self.__code_sell_money_dict[code][1] += 1
            # 大卖单信息
            if code not in self.__code_big_sell_money_list_dict:
                self.__code_big_sell_money_list_dict[code] = []
            if len(item[2]) >= 5:
                # 大卖单信息:(金额, 最新价格, 订单号)
                self.__code_big_sell_money_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
        self.__latest_price[code] = item[2][4]
    def get_code_money_dict(self):
@@ -96,7 +110,15 @@
        @param code:
        @return:[(金额, 价格, 订单号)]
        """
        return self.__code_big_buy_mmoney_list_dict.get(code)
        return self.__code_big_buy_money_list_dict.get(code)
    def get_big_sell_money_list(self, code):
        """
        获取代码的大买单列表
        @param code:
        @return:[(金额, 价格, 订单号)]
        """
        return self.__code_big_sell_money_list_dict.get(code)
    def get_latest_price(self, code):
        return self.__latest_price.get(code)
trade/buy_radical/radical_buy_data_manager.py
@@ -9,12 +9,14 @@
import constant
import l2_data_util
from l2 import l2_data_util as l2_data_util_new
from code_attribute import code_nature_analyse, code_volumn_manager, gpcode_manager
from code_attribute.code_l1_data_manager import L1DataManager
from code_attribute.gpcode_manager import WantBuyCodesManager
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from l2.huaxin import l2_huaxin_util
from l2.l2_data_util import L2DataUtil
from l2.l2_transaction_data_manager import BigOrderDealManager, HuaXinBuyOrderManager
from log_module import async_log_util
from log_module.log import logger_l2_radical_buy, logger_debug, logger_l2_radical_buy_data
@@ -37,8 +39,10 @@
    """
    __db = 3
    __big_order_threshold = {}
    # 已经成交的累计大单金额:用于初次上板尚未订阅的情况
    # 已经成交的累计大买单金额:用于初次上板尚未订阅的情况
    __already_total_deal_big_order_money = {}
    # 已经成交的累计大卖单金额:用于初次上板尚未订阅的情况
    __already_total_sell_deal_big_order_money = {}
    __redis_manager = redis_manager.RedisManager(3)
    def __init__(self):
@@ -72,29 +76,31 @@
        redis_manager.RedisUtils.setex_async(self.__db, f"radical_big_order_threshold-{code}", tool.get_expire(),
                                             f"{threshold_money}")
    def set_big_deal_order_list(self, code, money_list, limit_up_price):
    def set_big_deal_order_list(self, code, buy_money_list, sell_money_list, limit_up_price, min_order_no):
        """
        设置大单成交数据
        @param code:
        @param money_list:[(金额,价格,订单号)]
        @param buy_money_list:[(金额,价格,订单号)]
        @param sell_money_list:[(金额,价格,订单号)]
        @param limit_up_price:
        @param min_order_no: 最小的订单号
        @return:
        """
        # 涨停价成交的大单(策略进程尚未统计到的)
        total_deal_money = 0
        total_deal_money_info_list = []
        total_deal_buy_money = 0
        total_deal_buy_money_info_list = []
        limit_up_price_money_list = []
        pre_limit_up_price_money_list = []
        deal_order_ids = BigOrderDealManager().get_total_buy_order_ids(code)
        for info in money_list:
        for info in buy_money_list:
            if info[1] != limit_up_price:
                continue
            limit_up_price_money_list.append(info[0])
            if info[2] in deal_order_ids:
                continue
            pre_limit_up_price_money_list.append((info[0], info[2]))
            total_deal_money += info[0]
            total_deal_money_info_list.append(info)
            total_deal_buy_money += info[0]
            total_deal_buy_money_info_list.append(info)
        if limit_up_price_money_list:
            # 计算大单的阈值
            average_money = sum(limit_up_price_money_list) // len(limit_up_price_money_list)
@@ -102,8 +108,25 @@
            # 取max((平均值+最大单一半)/2, 平均值)
            threshold_money = max((max_money // 2 + average_money) // 2, average_money)
            self.set_big_order_threshold(code, threshold_money)
        self.__already_total_deal_big_order_money[code] = (total_deal_money, pre_limit_up_price_money_list)
        async_log_util.info(logger_l2_radical_buy_data, f"之前的大单:{code}-{total_deal_money}-{total_deal_money_info_list}")
        self.__already_total_deal_big_order_money[code] = (total_deal_buy_money, pre_limit_up_price_money_list)
        async_log_util.info(logger_l2_radical_buy_data,
                            f"之前的大买单:{code}-{total_deal_buy_money}-{total_deal_buy_money_info_list}")
        # 处理大卖单
        pre_limit_up_price_money_sell_list = []
        if min_order_no:
            total_deal_sell_money_info_list = []
            total_deal_sell_money = 0
            for info in sell_money_list:
                if info[1] != limit_up_price:
                    continue
                if info[2] >= min_order_no:
                    continue
                pre_limit_up_price_money_sell_list.append((info[0], info[2]))
                total_deal_sell_money += info[0]
                total_deal_sell_money_info_list.append(info)
            async_log_util.info(logger_l2_radical_buy_data,
                                f"之前的大卖单:{code}-{total_deal_sell_money}-{total_deal_sell_money_info_list}")
            self.__already_total_sell_deal_big_order_money[code] = (total_deal_sell_money, pre_limit_up_price_money_sell_list)
    def get_big_order_threshold(self, code):
        """
@@ -124,6 +147,12 @@
        if code in self.__already_total_deal_big_order_money:
            return self.__already_total_deal_big_order_money[code][0]
        return 0
    def get_sell_deal_big_order_money(self, code):
        if code in self.__already_total_sell_deal_big_order_money:
            return self.__already_total_sell_deal_big_order_money[code][0]
        return 0
    def get_deal_big_order_money_list(self, code):
        """
@@ -1508,14 +1537,22 @@
    @return:
    """
    response_data = requests.get(
        "http://127.0.0.1:9005/get_big_buy_order_list?code=" + code_)
        "http://127.0.0.1:9005/get_big_order_list?code=" + code_)
    r_str = response_data.text
    response_data = json.loads(r_str)
    if response_data["code"] == 0:
        datas = response_data["data"]
        if datas:
            BeforeSubDealBigOrderManager().set_big_deal_order_list(code_, datas,
                                                                   gpcode_manager.get_limit_up_price_as_num(code_))
            buy_datas = datas["buy"]
            sell_datas = datas["sell"]
            total_datas = l2_data_util_new.local_today_datas.get(code_)
            min_order_no = None
            for data in total_datas:
                if L2DataUtil.is_buy(data["val"]) or L2DataUtil.is_sell(data["val"]):
                    min_order_no = data["val"]["orderNo"]
                    break
            BeforeSubDealBigOrderManager().set_big_deal_order_list(code_, buy_datas, sell_datas,
                                                                   gpcode_manager.get_limit_up_price_as_num(code_), min_order_no)
def get_l2_big_order_deal_info(code_):