Administrator
2025-06-03 c4ed4da4ac8b8bc24e0a3ed0e782e9248b4a511c
servers/huaxin_trade_server.py
@@ -3,7 +3,6 @@
import hashlib
import json
import logging
import multiprocessing
import queue
import random
import socket
@@ -18,11 +17,12 @@
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader, zyltgb_util
from code_attribute.code_l1_data_manager import L1DataManager
from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, GreenListCodeManager
from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, \
    WantBuyCodesManager
from huaxin_client import l2_data_transform_protocol
from huaxin_client.trade_transform_protocol import TradeResponse
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \
    l2_data_source_util, l2_data_log
    l2_data_source_util, l2_data_log, data_callback
from l2.cancel_buy_strategy import GCancelBigNumComputer, \
    DCancelBigNumComputer, RDCancelBigNumComputer
from l2.code_price_manager import Buy1PriceManager
@@ -37,26 +37,30 @@
from log_module import async_log_util, log_export
from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \
    logger_system, logger_trade, logger_local_huaxin_l1_trade_info, logger_l2_codes_subscript, logger_l2_radical_buy
from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager, RealTimeKplMarketData
from third_data.history_k_data_util import JueJinApi
    logger_system, logger_trade, logger_l2_radical_buy, logger_l2_trade
from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager, kpl_api, kpl_util
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, RealTimeKplMarketData, \
    KPLPlateForbiddenManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from trade import l2_trade_util, \
    trade_data_manager, trade_constant, buy_open_limit_up_strategy
from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy, block_special_codes_manager
from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy
from trade.buy_money_count_setting import BuyMoneyAndCountSetting, BuyMoneyUtil
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \
    huaxin_trade_record_manager, huaxin_sell_util
    huaxin_trade_record_manager
from api.outside_api_command_callback import OutsideApiCommandCallback
from trade.huaxin.huaxin_trade_record_manager import DelegateRecordManager
from trade.order_statistic import DealAndDelegateWithBuyModeDataManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager, \
    EveryLimitupBigDealOrderManager, RadicalCodeMarketInfoManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, \
    EveryLimitupBigDealOrderManager, RadicalCodeMarketInfoManager, BeforeSubDealBigOrderManager, \
    EveryLimitupBigDelegateOrderManager
from trade.sell.sell_rule_manager import TradeRuleManager
from trade.trade_data_manager import RadicalBuyDealCodesManager
from trade.trade_manager import CodesTradeStateManager
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util, init_data_util
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util, init_data_util, \
    output_util
trade_data_request_queue = queue.Queue(maxsize=1000)
@@ -75,6 +79,8 @@
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __KPLCodeJXBlockManager = KPLCodeJXBlockManager()
    __GCancelBigNumComputer = GCancelBigNumComputer()
    # L2进程对应订阅的代码: {"进程ID": 代码列表}
    __pid_l2_subscript_codes = {}
    def setup(self):
        self.__init()
@@ -221,13 +227,61 @@
                            fresults = []
                            if codes:
                                for code in codes:
                                    try:
                                        # 获取成交大单:(参考大单金额,已成交大单金额,大单要求金额)
                                        th, is_temp = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code)
                                        deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info(
                                            code, gpcode_manager.get_limit_up_price_as_num(code))
                                        deal_big_order_info = (
                                            output_util.money_desc(th), output_util.money_desc(deal_big_money_info[1]),
                                            output_util.money_desc(deal_big_money_info[2]))
                                    except:
                                        deal_big_order_info = None
                                    code_name = gpcode_manager.get_code_name(code)
                                    fresults.append((code, code_name))
                                    fresults.append((code, code_name, deal_big_order_info))
                            fdata = middle_api_protocol.load_l2_subscript_codes(fresults)
                            middle_api_protocol.request(fdata)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "l2_subscript_codes_v2":
                        try:
                            data = data_json["data"]
                            datas = data["data"]
                            pid, datas = datas[0], datas[1]
                            self.__pid_l2_subscript_codes[pid] = datas
                            # print("l2_subscript_codes", data_json)
                            fcodes = []
                            for pid in self.__pid_l2_subscript_codes:
                                codes = self.__pid_l2_subscript_codes[pid]
                                fcodes.extend(codes)
                            # 订阅的代码
                            huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(fcodes)
                            # 上传数据
                            codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes()
                            l2_log.codeLogQueueDistributeManager.set_l2_subscript_codes(codes)
                            fresults = []
                            if codes:
                                for code in codes:
                                    try:
                                        # 获取成交大单:(参考大单金额,已成交大单金额,大单要求金额)
                                        th, is_temp = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code)
                                        deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info(
                                            code, gpcode_manager.get_limit_up_price_as_num(code))
                                        deal_big_order_info = (
                                            output_util.money_desc(th), output_util.money_desc(deal_big_money_info[1]),
                                            output_util.money_desc(deal_big_money_info[2]))
                                    except:
                                        deal_big_order_info = None
                                    code_name = gpcode_manager.get_code_name(code)
                                    fresults.append((code, code_name, deal_big_order_info))
                            fdata = middle_api_protocol.load_l2_subscript_codes(fresults)
                            middle_api_protocol.request(fdata)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "get_level1_codes":
                        # print("get_level1_codes")
                        # 获取level1的代码
@@ -249,11 +303,6 @@
                            code_name_map[d["sec_id"]] = d["sec_name"]
                        # 保存代码名称
                        CodesNameManager().add_code_names(code_name_map)
                        # 更新辨识度代码
                        threading.Thread(target=block_special_codes_manager.update_block_special_codes,
                                         daemon=True).start()
                        sk.sendall(
                            socket_util.load_header(json.dumps({"code": 0, "data": fdatas}).encode(encoding='utf-8')))
@@ -314,13 +363,16 @@
    def __process_buy_open_limit_up_datas(cls, datas):
        """
        处理排1的数据
        @param datas: [(代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)]
        @param datas: [(代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间, 昨日收盘价(集合竞价才有))]
        @return:
        """
        # 9:25之后不再处理
        if tool.get_now_time_as_int() > int("092500"):
            return
        for d in datas:
            # 计算当前是否是涨停状态
            if len(d) == 11:
                async_log_util.info(logger_debug, f"开1数据:{d}")
            if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(d[0]):
                # 09:19:50 到 09:20:00判断是否要撤单
                if int("091950") <= int(d[9].replace(":", "")) < int("092000"):
@@ -346,6 +398,10 @@
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.__process_buy_open_limit_up_datas(datas)
        # 根据高标的实时涨幅计算拉黑板块
        rate_dict = {d[0]: d[2] for d in datas}
        cls.__process_l1_data_thread_pool.submit(
            lambda: KPLPlateForbiddenManager().compute(rate_dict))
        # 9:30之前采用非线程
        if int(tool.get_now_time_str().replace(":", "")) < int("093000") or True:
            HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)
@@ -364,8 +420,8 @@
        thread_id = random.randint(0, 100000)
        l2_log.threadIds[code] = thread_id
        l2_data_count = len(_datas)
        l2_log.info(code, hx_logger_l2_orderdetail,
                    f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}")
        # l2_log.info(code, hx_logger_l2_orderdetail,
        #             f"{code}#耗时:{use_time}-{thread_id}#数量:{l2_data_count}#{_datas[-1]}")
        # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托")
        try:
@@ -384,34 +440,15 @@
    @classmethod
    def l2_market_data(cls, code, data):
        def update_kpl_jx_block(code_, buy_1_price_, limit_up_price_):
            # ----------------------------------板块相关------------------------------
            try:
                if code_ in cls.__updating_jx_blocks_codes:
                    return
                cls.__updating_jx_blocks_codes.add(code_)
                cls.__KPLCodeJXBlockManager.load_jx_blocks(code_, buy_1_price_, limit_up_price_,
                                                           kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reasons())
                # 更新板块信息
                latest_current_limit_up_records = kpl_data_manager.get_latest_current_limit_up_records()
                CodePlateKeyBuyManager.update_can_buy_blocks(code_,
                                                             kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
                                                             kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
                                                             latest_current_limit_up_records,
                                                             block_info.get_before_blocks_dict(),
                                                             kpl_data_manager.KPLLimitUpDataRecordManager.get_current_limit_up_reason_codes_dict())
            finally:
                cls.__updating_jx_blocks_codes.discard(code_)
        time_str = f"{data['dataTimeStamp']}"
        if time_str.startswith("9"):
            time_str = "0" + time_str
        time_str = time_str[:6]
        time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
        time_str = l2_huaxin_util.convert_time(time_str)
        buy_1_price, buy_1_volume = data["buy"][0]
        sell_1_price, sell_1_volume = data["sell"][0]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        # 涨幅
        price = data['lastPrice']
        code_price_manager.CurrentPriceManager.set_current_price(code, price)
        code_price_manager.Buy1PriceManager().set_latest_buy1_money(code, buy_1_price, buy_1_volume)
@@ -428,11 +465,11 @@
        except Exception as e:
            logger_debug.exception(e)
        if limit_up_price is not None:
        pre_close_price = CodePrePriceManager.get_price_pre_cache(code)
        if pre_close_price is not None:
            average_rate = None
            try:
                average_price = data["totalValueTrade"] / data["totalVolumeTrade"]
                pre_close_price = CodePrePriceManager.get_price_pre_cache(code)
                average_rate = round((average_price - pre_close_price) / pre_close_price, 4)
            except:
                pass
@@ -441,33 +478,10 @@
                                                          limit_up_price,
                                                          sell_1_price, sell_1_volume // 100, average_rate)
            latest_3m_buy1_money_list = code_price_manager.Buy1PriceManager().get_latest_3m_buy1_money_list(code)
            # -----------------------------重新计算L撤后---------------------------
            # 暂时不更新,无意义
            # 如果时涨停状态
            # if abs(float(limit_up_price) - float(buy_1_price)) < 0.001:
            #     # 是否处于下单状态
            #     state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
            #     if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or constant.TEST:
            #         if latest_3m_buy1_money_list and tool.trade_time_sub(latest_3m_buy1_money_list[-1][0],
            #                                                              latest_3m_buy1_money_list[0][0]) >= 2 * 60:
            #             # 2分钟以内,标准差在10%以内
            #             c_start_index = None
            #             for i in range(len(latest_3m_buy1_money_list) - 1, -1, -1):
            #                 if tool.trade_time_sub(latest_3m_buy1_money_list[-1][0],
            #                                        latest_3m_buy1_money_list[i][0]) >= 2 * 60:
            #                     c_start_index = i
            #                     break
            #             if c_start_index is not None:
            #                 latest_3m_buy1_money_list = copy.deepcopy(latest_3m_buy1_money_list[c_start_index:])
            #                 latest_3m_buy1_money_list = [x[1] for x in latest_3m_buy1_money_list]
            #                 avg_val = numpy.mean(numpy.array(latest_3m_buy1_money_list))
            #                 max_val = max(latest_3m_buy1_money_list)
            #                 min_val = min(latest_3m_buy1_money_list)
            #                 if abs(max_val - avg_val) / avg_val < 0.1 and abs(min_val - avg_val) / avg_val < 0.1:
            #                     # 买1封单额平稳
            #                     LCancelBigNumComputer().re_compute_l_down_watch_indexes(code)
            threading.Thread(target=lambda: update_kpl_jx_block(code, buy_1_price, limit_up_price), daemon=True).start()
            # 拉取总大单成交
        threading.Thread(
            target=lambda: radical_buy_data_manager.TotalDealBigOrderInfoManager.update_big_order_info(code, data[
                "totalValueTrade"]), daemon=True).start()
        async_log_util.info(hx_logger_l2_market_data, f"{code}#{data}")
@@ -475,14 +489,35 @@
        L2MarketSellManager().set_current_total_sell_data(code, time_str,
                                                          data["totalAskVolume"] * data["avgAskPrice"],
                                                          data["totalAskVolume"], sell_1_info, data.get("sell"))
        # 炸板
        if sell_1_info and sell_1_info[1] > 0:
            if BeforeSubDealBigOrderManager().is_need_update(code):
                #  炸板更新数据
                cls.__sell_thread_pool.submit(
                    lambda: radical_buy_data_manager.pull_pre_deal_big_orders(code))
        if data["sell"] and len(data["sell"]) > 1 and data["sell"][1][1] > 0:
            # 出现卖二
            radical_buy_strategy.clear_data(code, force=True, market_time_str=time_str)
        # 设置扫入数据
        RadicalCodeMarketInfoManager().set_market_info(code, time_str, round(float(limit_up_price), 2), data["buy"][0], sell_1_info)
        RadicalCodeMarketInfoManager().set_market_info(code, time_str, limit_up_price, data["buy"][0],
                                                       sell_1_info)
        # 判断是否下单
        state = CodesTradeStateManager().get_trade_state_cache(code)
        if not trade_util.is_can_order_by_state(state):
            # 不处于可下单状态
            RadicalBuyDataManager().market_info_change(code)
        # 是否即将炸开
        if RadicalCodeMarketInfoManager().is_almost_open_limit_up(code):
            # 即将炸开
            total_deal_big_order_info = radical_buy_data_manager.get_total_deal_big_order_info(code, limit_up_price)
            if total_deal_big_order_info and total_deal_big_order_info[0] <= 0:
                EveryLimitupBigDealOrderManager.clear(code, f"板上放量:{time_str}")
                # 大单足够
                # l2_trade_single_callback.process_limit_up_active_buy(code, [], is_almost_open_limit_up=True,
                #                                                      l2_market_time_str=time_str)
    @classmethod
    def trading_order_canceled(cls, code, order_no):
@@ -616,6 +651,9 @@
    __radical_buy_by_blocks_result_cache = {}
    def OnTradeSingle(self, code, big_buy_order_count, _type, data):
        # 暂时不处理
        if True:
            return
        # 只处理深证的票
        try:
            # 判断是否下单
@@ -704,11 +742,15 @@
        except Exception as e:
            logger_debug.exception(e)
    def __process_limit_up_active_buy(self, code, transaction_datas):
    def process_limit_up_active_buy(self, code, transaction_datas, is_almost_open_limit_up=False,
                                    l2_market_time_str='', no_left_limit_up_sell=False):
        """
        处理涨停主动买
        @param no_left_limit_up_sell: 是否还有剩余涨停卖尚未成交
        @param code:
        @param transaction_datas:
        @param is_almost_open_limit_up: 是否即将炸板
        @param l2_market_time_str: 时间
        @return: 是否清除本次上板数据
        """
        __start_time = time.time()
@@ -718,53 +760,33 @@
            if not trade_util.is_can_order_by_state(state):
                # 不处于可下单状态
                return True
            async_log_util.info(logger_l2_radical_buy, f"涨停主动买:{code}-{transaction_datas[-1]}")
            if transaction_datas:
                l2_log.info(code, logger_l2_radical_buy, f"涨停主动买:{code}-{transaction_datas[-1]}")
            else:
                l2_log.info(code, logger_l2_radical_buy, f"即将炸板:{code}-{is_almost_open_limit_up}-{l2_market_time_str}")
            deal_codes = RadicalBuyDealCodesManager().get_deal_codes()
            # 判断今日扫入的代码数量是否大于阈值
            radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting()
            MAX_COUNT = 4 if radical_buy_setting is None else radical_buy_setting[0]
            if not GreenListCodeManager().is_in_cache(code):
            if not WantBuyCodesManager().is_in_cache(code):
                # 加绿不判断板块是否成交
                if len(deal_codes) >= MAX_COUNT:
                    async_log_util.info(logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
                    l2_log.info(code, logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
                    return True
            if code in deal_codes:
                async_log_util.info(logger_l2_radical_buy, f"该代码已经成交:{code}")
                l2_log.info(code, logger_l2_radical_buy, f"该代码已经成交:{code}")
                return True
            # 单票是否可买
            can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
            if can_buy_result[0]:
                # 获取激进买的板块
                result_cache = self.__radical_buy_by_blocks_result_cache.get(code)
                if not result_cache or result_cache[0] < time.time():
                    # 不存在/过期
                    yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                    if yesterday_codes is None:
                        yesterday_codes = set()
                    # 计算是否可以扫入
                    radical_result = RadicalBuyBlockManager.is_radical_buy(code, yesterday_codes)
                    async_log_util.info(logger_l2_radical_buy, f"计算板块结果:{code}-{radical_result}")
                    result_cache = (time.time() + 3, radical_result)
                    self.__radical_buy_by_blocks_result_cache[code] = result_cache
                    RadicalBuyDealCodesManager().set_code_blocks(code, radical_result[0])
                # 取缓存
                result = result_cache[1]
                if result[0]:
                f_buy_blocks, orgin_buy_blocks = radical_buy_strategy.compute_can_radical_buy_blocks(code, deal_codes)
                if orgin_buy_blocks:
                    if not f_buy_blocks:
                        return True
                    # 买入的板块
                    buy_blocks = result[0]
                    # 如果关键词包含已成交的原因就不再下单
                    # 获取已经成交代码的板块
                    try:
                        # ---------------判断板块是否还可以买入----------------
                        f_buy_blocks = radical_buy_data_manager.is_block_can_radical_buy(code, buy_blocks, deal_codes)
                        if not f_buy_blocks:
                            return True
                        buy_blocks = f_buy_blocks
                    except Exception as e:
                        logger_debug.exception(e)
                    buy_blocks = f_buy_blocks
                    # 判断当前时间段是否可以买入
                    mode = OrderBeginPosInfo.MODE_RADICAL
                    can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode,
@@ -773,42 +795,44 @@
                                                                    DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info(
                                                                        mode))
                    if not can_buy:
                        async_log_util.info(logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}")
                        l2_log.info(code, logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}")
                        return True
                    # -----根据成交比例判断是否可买------
                    result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas)
                    async_log_util.info(logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
                    result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas,
                                                                                             is_almost_open_limit_up,
                                                                                             no_left_limit_up_sell=no_left_limit_up_sell)
                    l2_log.info(code, logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
                    in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
                    buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),
                                              in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks]
                    if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE:
                        if not GreenListCodeManager().is_in_cache(code):
                        if not WantBuyCodesManager().is_in_cache(code):
                            # 加绿的不需要判断如下问题
                            if tool.get_now_time_as_int() < 93100:
                                radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
                                async_log_util.info(logger_l2_radical_buy,
                                                    f"09:31之前不交易:{code}")
                                return True
                            # if tool.get_now_time_as_int() < 93100:
                            #     radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
                            #     async_log_util.info(logger_l2_radical_buy,
                            #                         f"09:31之前不交易:{code}")
                            #     return True
                            # 判断是否开得太高
                            open_price = L1DataManager.get_open_price(code)
                            if not radical_buy_strategy.is_can_buy_with_open_price(code, open_price):
                                async_log_util.info(logger_l2_radical_buy,
                                                    f"开得太高:{code}")
                                l2_log.info(code, logger_l2_radical_buy, f"开得太高:{code}")
                                radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
                                return True
                            if not RadicalCodeMarketInfoManager().is_opened_limit_up(code):
                                async_log_util.info(logger_l2_radical_buy,
                                                    f"没有炸过板:{code}")
                                return True
                            # if not RadicalCodeMarketInfoManager().is_opened_limit_up(code):
                            #     # 辨识度的票首封可买
                            #
                            #     async_log_util.info(logger_l2_radical_buy,
                            #                         f"没有炸过板:{code}")
                            #     return True
                        radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code)
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT and not tool.is_sh_code(code):
                        if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT:
                            # 上证不能根据成交买入
                            refer_sell_data = L2MarketSellManager().get_refer_sell_data(code,
                                                                                        l2_huaxin_util.convert_time(
                                                                                            transaction_datas[-1][3]))
                            latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3])
                            refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, latest_deal_time)
                            total_datas = l2_data_util.local_today_datas.get(code)
                            buy_single_index, buy_exec_index = total_datas[-1]["index"], total_datas[-1]["index"]
                            buy_volume_rate = L2TradeDataProcessor.volume_rate_info[code][0]
@@ -816,6 +840,13 @@
                            if refer_sell_data:
                                sell_info = (refer_sell_data[0], refer_sell_data[1])
                            threshold_money = 0
                            every_deal_orders = EveryLimitupBigDealOrderManager.list_big_buy_deal_orders(code)
                            if every_deal_orders:
                                min_order_no_info = min(every_deal_orders, key=lambda x: x[0])
                                min_order_no = min_order_no_info[0]
                            else:
                                min_order_no = transaction_datas[-1][6]
                            order_begin_pos_info = OrderBeginPosInfo(buy_single_index=buy_single_index,
                                                                     buy_exec_index=buy_exec_index,
                                                                     buy_compute_index=buy_exec_index,
@@ -823,48 +854,145 @@
                                                                     max_num_set=set(),
                                                                     buy_volume_rate=buy_volume_rate,
                                                                     mode=OrderBeginPosInfo.MODE_RADICAL,
                                                                     mode_desc=f"扫入买入:{buy_blocks}",
                                                                     mode_desc=f"扫入买入:{buy_blocks}, 大单成交最小订单号:{min_order_no}",
                                                                     sell_info=sell_info,
                                                                     threshold_money=threshold_money)
                                                                     threshold_money=threshold_money,
                                                                     min_order_no= min_order_no
                                                                     )
                            L2TradeDataProcessor.save_order_begin_data(code, order_begin_pos_info)
                            buy_result = L2TradeDataProcessor.start_buy(code, total_datas[-1], total_datas[-1]["index"],
                                                                        True, block_info=buy_blocks_with_money)
                            if buy_result:
                                # 下单成功
                                radical_buy_data_manager.BlockPlaceOrderRecordManager().add_record(code, buy_blocks)
                                radical_buy_strategy.clear_data(code)
                                radical_buy_strategy.clear_data(code, force=True)
                                RDCancelBigNumComputer().clear_data(code)
                                # 大单成交足够
                                RadicalBuyDataManager().big_order_deal_enough(code)
                            return True
                        else:
                            if transaction_datas:
                                latest_buy_no = transaction_datas[-1][6]
                                latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3])
                            else:
                                # 如果没有成交数据,就取最近的买单号
                                total_datas = l2_data_util.local_today_datas.get(code)
                                latest_buy_no = 0
                                for index in range(total_datas[-1]["index"], -1, -1):
                                    if L2DataUtil.is_buy(total_datas[index]["val"]):
                                        latest_buy_no = int(total_datas[index]["val"]["orderNo"])
                                        break
                                latest_deal_time = l2_market_time_str
                            RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict[code] = (
                                time.time() + 30, transaction_datas[-1][6], buy_blocks,
                                l2_huaxin_util.convert_time(transaction_datas[-1][3]), buy_blocks_with_money)
                                time.time() + 60, latest_buy_no, buy_blocks,
                                latest_deal_time, buy_blocks_with_money, is_almost_open_limit_up)
                            return False
                    else:
                        async_log_util.info(logger_l2_radical_buy, f"不能下单:{code}-{result_by_volume}")
                        l2_log.info(code, logger_l2_radical_buy, f"不能下单:{code}-{result_by_volume}")
                        return False
                else:
                    volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code)
                    async_log_util.info(logger_l2_radical_buy, f"没有可扫入的板块:{code},量比:{volume_rate}")
                    l2_log.info(code, logger_l2_radical_buy, f"没有可扫入的板块:{code},量比:{volume_rate}")
                    return True
            else:
                async_log_util.info(logger_l2_radical_buy, f"目前代码不可交易:{code}-{can_buy_result[1]}")
                l2_log.info(code, logger_l2_radical_buy, f"目前代码不可交易:{code}-{can_buy_result[1]}")
                return True
        except Exception as e:
            async_log_util.info(logger_debug, f"激进买计算异常:{str(e)}")
            l2_log.info(code, logger_debug, f"激进买计算异常:{str(e)}")
            logger_debug.exception(e)
        finally:
            use_time = time.time() - __start_time
            if use_time > 0.005:
                async_log_util.info(logger_debug, f"扫入处理时长:{code}-{use_time}")
                l2_log.info(code, logger_debug, f"扫入处理时长:{code}-{use_time}")
    def OnLimitUpActiveBuy(self, code, transaction_datas):
        can_clear_before_data = self.__process_limit_up_active_buy(code, transaction_datas)
    def OnLimitUpActiveBuy(self, code, transaction_datas, no_left_limit_up_sell):
        can_clear_before_data = self.process_limit_up_active_buy(code, transaction_datas,
                                                                 no_left_limit_up_sell=no_left_limit_up_sell)
        if can_clear_before_data:
            # 清除
            EveryLimitupBigDealOrderManager.clear(code)
            EveryLimitupBigDealOrderManager.clear(code, "处理涨停成交数据")
        pass
    def OnLastLimitUpSellDeal(self, code, data):
        """
        最后一笔涨停卖数据成交
        @param code:
        @param data:  (data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType'])
        @return:
        """
        if True:
            return
        if data[6] < data[7]:
            # 非主动买
            return
        # 根据板块判断是否可买
        state = CodesTradeStateManager().get_trade_state_cache(code)
        if not trade_util.is_can_order_by_state(state):
            # 不处于可下单状态
            return
        l2_log.info(code, logger_l2_radical_buy, f"最后一笔涨停卖被吃:{code}-{data}")
        deal_codes = RadicalBuyDealCodesManager().get_deal_codes()
        # 判断今日扫入的代码数量是否大于阈值
        radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting()
        MAX_COUNT = 4 if radical_buy_setting is None else radical_buy_setting[0]
        if not WantBuyCodesManager().is_in_cache(code):
            # 加绿不判断板块是否成交
            if len(deal_codes) >= MAX_COUNT:
                l2_log.info(code, logger_l2_radical_buy, f"扫入成交代码个数大于{MAX_COUNT}个:{code}-{deal_codes}")
                return
        if code in deal_codes:
            l2_log.info(code, logger_l2_radical_buy, f"该代码已经成交:{code}")
            return
        # 单票是否可买
        can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
        if not can_buy_result[0]:
            return
        # 获取激进买的板块
        f_buy_blocks, orgin_buy_blocks = radical_buy_strategy.compute_can_radical_buy_blocks(code, deal_codes)
        if not orgin_buy_blocks:
            l2_log.info(code, logger_l2_radical_buy, f"没有可扫入的板块:{code}")
            return
        if not f_buy_blocks:
            return
        # 买入的板块
        buy_blocks = f_buy_blocks
        # 判断当前时间段是否可以买入
        mode = OrderBeginPosInfo.MODE_RADICAL
        can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode,
                                                        DealAndDelegateWithBuyModeDataManager().get_deal_codes_info(
                                                            mode),
                                                        DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info(
                                                            mode))
        if not can_buy:
            l2_log.info(code, logger_l2_radical_buy, f"当前时间段已不能扫入:{code}-{msg}")
            return
        in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
        buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),
                                  in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks]
        if not WantBuyCodesManager().is_in_cache(code):
            # 判断是否开得太高
            open_price = L1DataManager.get_open_price(code)
            if not radical_buy_strategy.is_can_buy_with_open_price(code, open_price):
                l2_log.info(code, logger_l2_radical_buy, f"开得太高:{code}")
                radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
                return
        radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code)
        # 根据L2下单
        latest_buy_no = data[6]
        latest_deal_time = l2_huaxin_util.convert_time(data[3])
        # 清除大单委托数据
        EveryLimitupBigDelegateOrderManager.clear(code, '')
        l2_log.info(code, logger_l2_trade, f"计算完板块与大单,准备下单:{data}")
        RadicalBuyDealCodesManager.buy_by_l2_delegate_expire_time_dict[code] = (
            time.time() + 1, latest_buy_no, buy_blocks,
            latest_deal_time, buy_blocks_with_money, False)
# 回调
@@ -873,33 +1001,58 @@
my_trade_response = MyTradeResponse()
def run_l2_market_info_reciever(queues: list):
    """
    接收L2 market数据
    @param queues:
    @return:
    """
    def recieve_data(queue):
        while True:
            try:
                d = queue.get()
                # {"type": "l2_market", "data": (code, data)}
                if d["type"] == "l2_market":
                    code, market_data = d["data"]
                    my_l2_data_callback.OnMarketData(code, market_data)
            except:
                pass
    for q in queues:
        threading.Thread(target=recieve_data, args=(q,), daemon=True).start()
# 预埋单
def __test_pre_place_order():
    logger_debug.info("进入预埋单测试")
    codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
    if codes:
        for code in codes:
            # 获取昨日收盘价格
            limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
            if not limit_up_price:
                init_data_util.re_set_price_pre(code)
    logger_debug.info(f"进入预埋单测试:{codes}")
    try:
        if codes:
            for code in codes:
                # 获取昨日收盘价格
                limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
            if not limit_up_price:
                logger_debug.info(f"没有获取到涨停价:{code}")
                continue
            shadow_price = tool.get_shadow_price(limit_up_price)
            if not constant.TRADE_ENABLE:
                return
            try:
                volume = tool.get_buy_volume_by_money(limit_up_price, constant.AVAILABLE_BUY_MONEYS[0])
                result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, limit_up_price,
                                                bolcking=False,
                                                shadow_price=shadow_price, shadow_volume=volume)
                async_log_util.info(logger_trade, f"{code}下单结束:{result}")
                buy_open_limit_up_strategy.BuyOpenLimitupDataManager().set_place_order_info(code, volume, volume,
                                                                                            result.get("order_ref"))
            except Exception as e:
                pass
                if not limit_up_price:
                    init_data_util.re_set_price_pre(code, force=True)
                    limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
                if not limit_up_price:
                    logger_debug.info(f"没有获取到涨停价:{code}")
                    continue
                shadow_price = tool.get_shadow_price(limit_up_price)
                if not constant.TRADE_ENABLE:
                    return
                try:
                    volume = tool.get_buy_volume_by_money(limit_up_price, constant.AVAILABLE_BUY_MONEYS[0])
                    result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, limit_up_price,
                                                    blocking=False,
                                                    shadow_price=shadow_price, shadow_volume=volume)
                    l2_log.info(code, logger_trade, f"{code}下单结束:{result}")
                    buy_open_limit_up_strategy.BuyOpenLimitupDataManager().set_place_order_info(code, volume, volume,
                                                                                                result.get("order_ref"))
                except Exception as e:
                    logger_debug.exception(e)
    except Exception as e:
        logger_debug.exception(e)
def __subscript_fixed_codes_l2():
@@ -915,29 +1068,55 @@
            init_data_util.re_set_price_pre(code)
            limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
        min_volume = int(round(50 * 10000 / limit_up_price))
        special_volumes = BuyMoneyUtil.get_possible_buy_volumes(limit_up_price)
        special_volumes |= set([tool.get_buy_volume_by_money(limit_up_price, x) for x in constant.AVAILABLE_BUY_MONEYS])
        # 传递笼子价
        add_datas.append(
            # (代码, 最小量, 涨停价,影子订单价格,买量, 特殊价格)
            (code, min_volume, limit_up_price, round(tool.get_shadow_price(limit_up_price), 2),
             tool.get_buy_volume(limit_up_price),
             [tool.get_buy_volume_by_money(limit_up_price, x) for x in constant.AVAILABLE_BUY_MONEYS]))
             list(special_volumes)))
    huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, 0)
def __update_yesterday_kpl_limit_up_datas():
    day = tool.get_now_date_str()
    day = HistoryKDatasUtils.get_previous_trading_date(day)
    results = kpl_api.getHistoryLimitUpInfo(day)
    result_list = kpl_util.parseDaBanData(json.dumps({"list": results, "errcode": 0}), kpl_util.DABAN_TYPE_LIMIT_UP)
    kpl_data_manager.KPLLimitUpDataRecordManager.save_record(day, result_list, set_not_open=True)
    logger_debug.info("更新昨日开盘啦实时涨停数据")
# 做一些初始化的操作
def __init():
    def run_pending():
        # 更新自由流通市值
        schedule.every().day.at("15:10:00").do(zyltgb_util.update_all_zylt_volumes)
        schedule.every().day.at("01:05:00").do(__test_pre_place_order)
        # 测试下单
        schedule.every().day.at("01:02:00").do(__test_pre_place_order)
        # 订阅固定的代码
        schedule.every().day.at("09:10:00").do(__subscript_fixed_codes_l2)
        # 更新K线
        schedule.every().day.at("08:00:01").do(history_k_data_manager.update_history_k_bars)
        schedule.every().day.at("08:30:01").do(history_k_data_manager.update_history_k_bars)
        schedule.every().day.at("09:00:01").do(history_k_data_manager.update_history_k_bars)
        # 更新账户信息
        schedule.every().day.at("09:00:01").do(huaxin_trade_data_update.add_money_list)
        schedule.every().day.at("09:15:20").do(huaxin_trade_data_update.add_money_list)
        schedule.every().day.at("09:15:20").do(huaxin_trade_data_update.add_money_list)
        # 更新昨日实时涨停数据
        schedule.every().day.at("07:58:00").do(__update_yesterday_kpl_limit_up_datas)
        while True:
            schedule.run_pending()
            time.sleep(1)
            try:
                schedule.run_pending()
            except:
                pass
            finally:
                time.sleep(1)
            # 9点半后终止运行
            # if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0:
            #     break
@@ -948,9 +1127,31 @@
    threading.Thread(target=run_pending, daemon=True).start()
    l2_data_util.load_l2_data_all(True)
    # L2成交信号回调
    L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback())
    global l2_trade_single_callback
    l2_trade_single_callback = MyL2TradeSingleCallback()
    data_callback.l2_trade_single_callback = l2_trade_single_callback
    L2TradeSingleDataManager.set_callback(l2_trade_single_callback)
    # 加载自由流通量
    global_data_loader.load_zyltgb_volume_from_db()
    # 获取最近7天涨停数最多的板块
    # try:
    #     if not KPLPlateForbiddenManager().list_all_cache() and tool.get_now_time_as_int() > int("070000"):
    #         # 没有添加过的时候需要重新添加
    #         datas_ = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
    #         if datas_:
    #             for data_ in datas_:
    #                 # 连续2天的板块就不买
    #                 if data_[2] >= 2:
    #                     KPLPlateForbiddenManager().save_plate(data_[0])
    # except:
    #     pass
    # 初始化数据
    BuyMoneyAndCountSetting()
    gpcode_manager.WantBuyCodesManager()
    # 加载历史K线数据
    HistoryKDataManager().load_data()
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):