Administrator
2024-08-29 7cd10cd4896bd15345968cde6c76cee33b367533
bug修复
8个文件已修改
116 ■■■■ 已修改文件
huaxin_client/l1_client.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/init_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py
@@ -121,20 +121,7 @@
            return
        if pMarketDataField.SecurityName.find("ST") >= 0:
            return
        close_price = round(pMarketDataField.UpperLimitPrice / out_tool.get_limit_up_rate(pMarketDataField.SecurityID),
                            2)
        try:
            # 测试L1数据
            if pMarketDataField.SecurityID == '600636' or pMarketDataField.SecurityID == '002430' or pMarketDataField.SecurityID == '300466':
                d = {"SecurityID": pMarketDataField.SecurityID, "PreClosePrice": pMarketDataField.PreClosePrice,
                     "LastPrice": pMarketDataField.LastPrice, "BidPrice1": pMarketDataField.BidPrice1,
                     "BidVolume1": pMarketDataField.BidVolume1, "AskVolume1": pMarketDataField.AskVolume1,
                     "AskPrice1": pMarketDataField.AskPrice1, "UpperLimitPrice": pMarketDataField.UpperLimitPrice,
                     "UpdateTime": pMarketDataField.UpdateTime, "UpdateMillisec": pMarketDataField.UpdateMillisec}
                logger_local_huaxin_l1.info(f"L1数据:{d}")
        except:
            pass
        close_price = pMarketDataField.PreClosePrice
        lastPrice = pMarketDataField.LastPrice
        if pMarketDataField.BidPrice1:
            lastPrice = pMarketDataField.BidPrice1
@@ -142,19 +129,9 @@
        if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001:
            # 涨停板20%以上的打折
            rate = rate / 2
        # print(pMarketDataField.SecurityID, pMarketDataField.SecurityName, rate, pMarketDataField.Volume)
        level1_data_dict[pMarketDataField.SecurityID] = (
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1)
        # print(
        #     "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]"
        #     % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice,
        #        pMarketDataField.Volume,
        #        pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1,
        #        pMarketDataField.AskPrice1,
        #        pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
__latest_subscript_codes = set()
@@ -296,7 +273,6 @@
            # (代码,现价,涨幅,量,时间)
            list_ = [level1_data_dict[k] for k in level1_data_dict]
            flist = []
            plist = []
            now_time_int = int(tool.get_now_time_str().replace(":", ""))
            threshold_rate = constant.L1_MIN_RATE_PRE if now_time_int < int(
                "094000") else constant.L1_MIN_RATE
@@ -304,8 +280,6 @@
                if d[2] >= threshold_rate:
                    # 涨幅小于5%的需要删除
                    flist.append(d)
                if d[0] in __position_codes:
                    plist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            # 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵
            MAX_COUNT = 500
@@ -316,8 +290,6 @@
            elif now_time_int < int("092900"):
                MAX_COUNT = 400
            datas = flist[:MAX_COUNT]
            # 将持仓股加入进去
            datas.extend(plist)
            if len(datas) > 0:
                logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas))
                __upload_codes_info(queue_l1_w_strategy_r, datas)
l2/huaxin/huaxin_target_codes_manager.py
@@ -89,22 +89,23 @@
            code = d[0]
            if not tool.is_can_buy_code(code):
                continue
            price = d[1]
            # 如果现价是0.0就取买1价
            price = d[1] if d[1] > 0 else d[5]
            # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
            # 剔除昨日涨停的票
            if code in yesterday_codes:
                continue
            # 剔除股价大于40块的票
            if d[1] > constant.MAX_SUBSCRIPT_CODE_PRICE:
            if price > constant.MAX_SUBSCRIPT_CODE_PRICE:
                continue
            # 获取自由流通市值
            # -------获取自由流通市值-------
            if code not in global_util.zyltgb_map:
                # 获取自由流通量
                zylt = None
                zylt_volume = global_util.zylt_volume_map.get(code)
                if zylt_volume and d[1] > 0:
                    zylt = zylt_volume * d[1]
                if zylt_volume and price > 0:
                    zylt = zylt_volume * price
                if not zylt:
                    try:
                        __start_time = time.time()
@@ -122,7 +123,7 @@
                        history_k_data_manager.re_set_price_pres([code], True)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price:
                        zylt = int(zylt / d[1] * float(limit_up_price))
                        zylt = int(zylt / price * float(limit_up_price))
                    global_util.zyltgb_map[code] = int(zylt)
            # 保存今日实时量
            temp_volumns.append((code, d[3]))
@@ -132,7 +133,8 @@
                zyltgb = global_util.zyltgb_map[code]
            # 量的单位为手(不是股)
            # 价格以买1价格处理
            fitem = {"code": code, "price": d[5] if d[5] else d[1], "volume": d[3] // (10000 * 100), "volumeUnit": 1, "time": "00:00:00",
            fitem = {"code": code, "price": price, "volume": d[3] // (10000 * 100), "volumeUnit": 1,
                     "time": "00:00:00",
                     "zyltgb": zyltgb // 10000, "zyltgbUnit": 1}
            flist.append(fitem)
        code_volumn_manager.set_today_volumns(temp_volumns)
l2/l2_data_manager_new.py
@@ -32,7 +32,7 @@
    local_latest_datas, local_today_canceled_buyno_map, local_today_sellno_map
import l2.l2_data_util
from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_debug, \
    logger_l2_not_buy_reasons, logger_real_place_order_position
    logger_l2_not_buy_reasons, logger_real_place_order_position, logger_l2_trade_buy_queue
from trade.trade_data_manager import CodeActualPriceProcessor, PlaceOrderCountManager, AccountMoneyManager
@@ -391,6 +391,8 @@
    @classmethod
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
        now_time_str = tool.get_now_time_str()
        # 将本次中断设置为
        cls.__break_current_batch_data_for_buy_dict[code] = False
        if len(add_datas) > 0:
            # 记录当前批数据的索引
            cls.__processing_data_indexes[code] = (add_datas[0]["index"], add_datas[-1]["index"])
@@ -453,6 +455,7 @@
        #                                    "l2数据预处理时间")
        # 9:29:55开始处理数据
        place_ordered_desc = ""
        if len(add_datas) > 0 and int(tool.get_now_time_str().replace(":", "")) > int("092955"):
            # 是否为首板代码
            is_first_code = True  # gpcode_manager.FirstCodeManager().is_in_first_record(code)
@@ -504,24 +507,29 @@
                state = cls.__CodesTradeStateManager.get_trade_state_cache(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    place_ordered_desc = "已下单"
                    cls.__process_order(code, start_index, end_index, capture_timestamp, is_first_code)
                else:
                    place_ordered_desc = "未下单"
                    # 未挂单,时间相差不大才能挂单
                    # tool.trade_time_sub(latest_time, "09:32:00") < 0
                    if l2.l2_data_util.L2DataUtil.is_same_time(
                            now_time_str, latest_time):
                        cls.__process_not_order(code, start_index, end_index, capture_timestamp, is_first_code)
            l2_log.info(code, logger_l2_process, "code:{} 处理数据范围: {}-{} 处理时间:{} 线程ID:{}", code,
                    else:
                        place_ordered_desc += ":数据延迟"
            l2_log.info(code, logger_l2_process, "code:{} 处理数据范围: {}-{} 处理时间:{} 线程ID:{} 处理情况:{}", code,
                        add_datas[0]["index"],
                        add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                        l2_log.threadIds.get(code))
                        l2_log.threadIds.get(code), place_ordered_desc)
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time, is_first_code):
        cls.__break_current_batch_data_for_buy_dict[code] = False
        __start_time = round(t.time() * 1000)
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
@@ -1020,6 +1028,7 @@
            # 只有板块满足下单之后才能判断其它条件
            range_indexes = cls.__processing_data_indexes.get(code)
            if range_indexes:
                # 是否是量化单
                is_quantization_result = buy_strategy_util.is_quantization(code, range_indexes[0], range_indexes[1])
                if is_quantization_result[0]:
                    cls.__next_buy_time_dict[code] = is_quantization_result[1]
@@ -1341,6 +1350,7 @@
        # 判断下次买入时间是否正确
        if code in cls.__next_buy_time_dict and t.time() < cls.__next_buy_time_dict[code]:
            l2_log.debug(code, f"下次可下单时间({compute_start_index}-{compute_end_index}):{cls.__next_buy_time_dict[code]}")
            return
        if code in cls.__next_buy_time_dict:
            cls.__next_buy_time_dict.pop(code)
@@ -1377,6 +1387,12 @@
            has_single, _index, sell_info, single_msg, mode = cls.__compute_active_order_begin_pos(code, continue_count,
                                                                                                   compute_start_index,
                                                                                                   compute_end_index)
            if not has_single:
                # 没有信号,如果当前数据是涨停买就记录日志,防止记录过多的日志
                if L2DataUtil.is_limit_up_price_buy(
                        total_datas[compute_start_index]["val"]) and L2DataUtil.is_limit_up_price_buy(total_datas[compute_end_index]["val"]):
                    async_log_util.info(logger_l2_trade_buy_queue, "尚未获取到买入信号: code-{} start_index-{}  end_index-{} msg-{}", code,
                                        compute_start_index, compute_end_index, sell_info)
            fast_msg = None
            if has_single:
                order_begin_pos.mode = mode
@@ -1677,6 +1693,14 @@
    # 计算激进买的下单信号
    @classmethod
    def __compute_active_order_begin_pos(cls, code, continue_count, start_index, end_index):
        """
        计算买入信号
        @param code:
        @param continue_count:
        @param start_index:
        @param end_index:
        @return:
        """
        total_datas = local_today_datas[code]
        start_time_str = total_datas[start_index]["val"]["time"]
        if end_index - start_index + 1 < continue_count:
servers/huaxin_trade_server.py
@@ -533,7 +533,7 @@
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logging.exception(e)
                logger_debug.exception(e)
# 排得太远撤单
test/l2_trade_test.py
@@ -18,7 +18,7 @@
from l2.l2_transaction_data_manager import HuaXinSellOrderStatisticManager
from log_module import log, log_export, async_log_util
from trade.huaxin import huaxin_trade_api
from utils import tool
from utils import tool, init_data_util
from db import redis_manager_delegate as redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress, l2_data_manager_new, l2_transaction_data_processor, \
    cancel_buy_strategy
@@ -92,7 +92,7 @@
        constant.TEST = True
        trade_manager.TradeStateManager().open_buy()
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()
        code = "000691"
        code = "000590"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -154,11 +154,13 @@
            KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str()))
        kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas()
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            init_data_util.re_set_price_pre(code)
        current_price_process_manager.set_trade_price(code, round(float(gpcode_manager.get_limit_up_price(code)), 2))
        pss_server, pss_strategy = multiprocessing.Pipe()
        huaxin_trade_api.run_pipe_trade(pss_server, None, None)
        # huaxin_trade_api.run_pipe_trade(pss_server, None, None)
        for indexs in pos_list:
            l2_log.threadIds[code] = mock.Mock(
third_data/code_plate_key_manager.py
@@ -512,7 +512,7 @@
        return self.__redisManager.getRedis()
    # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,精选板块
    def get_plate_keys(self, code, contains_today = True):
    def get_plate_keys(self, code, contains_today=True):
        """
        获取代码的板块: (180天的涨停原因+推荐原因)+今日涨停原因+今日涨停推荐原因+今日推荐原因
        @param code:
@@ -1050,8 +1050,6 @@
        cls.__can_buy_compute_result_dict[code] = (
            can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks)
    @classmethod
    def compute_open_limit_up_code_dict_for_radical_buy(cls, current_limit_up_datas):
        """
@@ -1059,17 +1057,18 @@
        @param current_limit_up_datas:
        @return:
        """
        time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
        timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
        timestamp_start = time.mktime(
            time.strptime(datetime.datetime.now().strftime("%Y-%m-%d") + " 09:25:00", '%Y-%m-%d %H:%M:%S'))
        timestamp_end = time.mktime(time.strptime(datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00", '%Y-%m-%d %H:%M:%S'))
        temp_dict = {}
        for d in current_limit_up_datas:
            code = d[0]
            # d: (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
            # 计算是否开1
            if int(d[2]) >= timestamp:
            if int(d[2]) >= timestamp_end or int(d[2]) < timestamp_start:
                continue
            # 剔除5板以上的
            if d[4].find("连板") > 0 and int(d[4].replace("连板","")) >=5:
            if d[4].find("连板") > 0 and int(d[4].replace("连板", "")) >= 5:
                continue
            buy1_money = huaxin_l1_data_manager.get_buy1_money(code)
            if not buy1_money or buy1_money < 1e8:
@@ -1077,13 +1076,12 @@
            if not tool.is_can_buy_code(code):
                continue
            # 买1是否大于1亿
            blocks =  {d[5]}
            blocks = {d[5]}
            if d[6]:
                blocks |= set(d[6].split("、"))
            blocks-=constant.KPL_INVALID_BLOCKS
            blocks -= constant.KPL_INVALID_BLOCKS
            temp_dict[code] = (kpl_util.get_high_level_count(d[4]), d[6])
        kpl_data_constant.open_limit_up_code_dict_for_radical_buy = temp_dict
    @classmethod
    def is_radical_buy(cls, code):
@@ -1099,9 +1097,6 @@
        # 获取代码的板块
        keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(code, contains_today=False)
        # 获取
if __name__ == "__main__":
third_data/history_k_data_manager.py
@@ -24,7 +24,8 @@
        for code in codes_:
            try:
                datas = init_data_util.get_volumns_by_code(code, 150)
                HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
                if datas:
                    HistoryKDataManager().save_history_bars(code, datas[0]['bob'].strftime("%Y-%m-%d"), datas)
            except Exception as e:
                logger_debug.exception(e)
utils/init_data_util.py
@@ -23,7 +23,7 @@
# 获取近90天的最大量与最近的量
# 获取最近一次涨停/涨停下一个交易日的最大值
def get_volumns_by_code(code, count=60) -> object:
def get_volumns_by_code(code, count=60):
    datas = HistoryKDatasUtils.get_history_tick_n(code, count, "open,high,low,close,volume,pre_close,bob,amount")
    if not datas:
        return None