Administrator
2024-06-28 4bb98ab3b49687265fb60754d07610d50b3b8431
水下捞相关数据删除/优化目标代码处理
7个文件已修改
340 ■■■■■ 已修改文件
code_attribute/first_target_code_data_processor.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/code_queue_distribute_manager.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py
@@ -73,43 +73,14 @@
                    dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0",
                                     "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100",
                                     "zyltgbUnit": 0})
    # ---保存未筛选的首板代码
    new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
    # 保存自由流通股本,暂时不保存
    # if dataList:
    #     zyltgb_list = []
    #     for data in dataList:
    #         code = data["code"]
    #         if code in global_util.zyltgb_map:
    #             continue
    #         zyltgb_list.append(
    #             {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]})
    #     if zyltgb_list:
    #         ZYLTGBUtil.save_list(zyltgb_list)
    #         global_data_loader.load_zyltgb()
    # 获取昨日收盘价
    need_get_limit_up_codes = set()
    for code in codes:
        # 如果涨停价是空值就需要设置昨日收盘价格
        if gpcode_manager.get_limit_up_price(code) is None:
            init_data_util.re_set_price_pres([code], True)
    # 板块关键字准备  暂时删除
    # for code in codes:
    #     if __CodesPlateKeysManager.get_history_limit_up_reason(code) is None:
    #         # 从数据库加载历史涨停原因
    #         __CodesPlateKeysManager.set_history_limit_up_reason(code,
    #                                                             KPLLimitUpDataRecordManager.get_latest_blocks_set(
    #                                                                 code))
    #     if __CodesPlateKeysManager.get_blocks(code) is None:
    #         try:
    #             results = kpl_api.getStockIDPlate(code)
    #             bs = [r[1] for r in results]
    #             __CodesPlateKeysManager.set_blocks(code, bs)
    #         except Exception as e:
    #             logging.exception(e)
    #             pass
            need_get_limit_up_codes.add(code)
    if need_get_limit_up_codes:
        init_data_util.re_set_price_pres(list(need_get_limit_up_codes), True)
    logger_l2_codes_subscript.info(f"{request_id}加载l2代码涨停价结束")
    # 获取60天最大记录
    for code in codes:
@@ -208,20 +179,6 @@
    # 初始化板块信息,暂时删除
    # for code in codes:
    #     block_info.init_code(code)
    if new_add_codes:
        gpcode_manager.FirstGPCodesManager().set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes,
                                                                                                                fields="symbol,sec_name,sec_type,sec_level"))
        # 加入首板历史记录
        logger_first_code_record.info("新增首板:{}", new_add_codes)
        # 移除代码
        if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
            listen_codes = gpcode_manager.get_listen_codes()
            for lc in listen_codes:
                if not gpcode_manager.is_in_gp_pool(lc):
                    # 移除代码
                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除")
    # 保存现价
    if dataList:
        situation = MarketSituationManager().get_situation_cache()
huaxin_client/code_queue_distribute_manager.py
@@ -76,11 +76,14 @@
        callback_info = self.get_available_callback()
        if not callback_info:
            distibuted_callbacks_ids = set()
            need_release_codes = set()
            for code in self.distibuted_code_callback_dict:
                distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0])
                # 如果代码没在目标代码中就移除
                if target_codes and code not in target_codes:
                    self.release_distribute_callback(code)
                    need_release_codes.add(code)
            for c in need_release_codes:
                self.release_distribute_callback(c)
            logger_local_huaxin_l2_error.info(f"已经分配的代码:{self.distibuted_code_callback_dict.keys()}")
            logger_local_huaxin_l2_error.info(f"已经分配的callbackid:{distibuted_callbacks_ids}")
            # 删除已经没在目标代码中的分配
inited_data.py
@@ -65,8 +65,6 @@
    L2LimitUpSellStatisticUtil().clear()
    # 重置所有的大单数据
    big_money_num_manager.reset_all()
    # 清除水下捞数据
    __actualPriceProcessor.clear_under_water_data()
    # 载入行业股票代码
    global_data_loader.load_industry()
    # 载入代码自由流通市值
l2/huaxin/huaxin_target_codes_manager.py
@@ -120,14 +120,6 @@
                    # 保存自由流通股本
                    ZYLTGBUtil.save_async(code, zylt, price)
                    global_util.zyltgb_map[code] = int(zylt)
            # 自由流通市值不符合标准
            # zyltgb = global_util.zyltgb_map.get(code)
            # if zyltgb:
            #     zyltgb_as_yi = round(zyltgb / 100000000, 2)
            #     if zyltgb_as_yi < zyltgb_thresholds[0] or zyltgb_as_yi > zyltgb_thresholds[1]:
            #         if not want_codes or code not in want_codes:
            #             # 想买单中的不能排除
            #             continue
            # 保存今日实时量
            temp_volumns.append((code, d[3]))
l2/l2_data_manager_new.py
@@ -705,11 +705,8 @@
                # 不处于可下单状态
                return False
            __start_time = tool.get_now_timestamp()
            can, need_clear_data, reason = False, False, ""
            if not is_first_code:
                can, need_clear_data, reason = cls.__can_buy(code)
            else:
                can, need_clear_data, reason = cls.__can_buy_first(code)
            can, need_clear_data, reason = cls.__can_buy_first(code)
            # __start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - __start_time, "最后判断是否能下单", force=True)
            # 删除虚拟下单
@@ -821,136 +818,6 @@
        #                 return False, "14:00后老大都开盘涨停,老二不能撤单"
        return True, ""
    # 是否可以买
    # 返回是否可以买,是否需要清除之前的买入信息,原因
    @classmethod
    def __can_buy(cls, code):
        __start_time = t.time()
        if not cls.__TradeStateManager.is_can_buy_cache():
            return False, True, f"今日已禁止交易"
        # 之前的代码
        # 首板代码且尚未涨停过的不能下单
        # is_limited_up = gpcode_manager.FirstCodeManager().is_limited_up(code)
        # if not is_limited_up:
        #     gpcode_manager.FirstCodeManager().add_limited_up_record([code])
        #     place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(
        #         code)
        #     if place_order_count == 0:
        #         trade_data_manager.PlaceOrderCountManager().place_order(code)
        #     return False, True, "首板代码,且尚未涨停过"
        try:
            # 买1价格必须为涨停价才能买
            # buy1_price = cls.buy1PriceManager().get_price(code)
            # if buy1_price is None:
            #     return False, "买1价尚未获取到"
            # limit_up_price = gpcode_manager.get_limit_up_price(code)
            # if limit_up_price is None:
            #     return False, "尚未获取到涨停价"
            # if abs(float(buy1_price) - float(limit_up_price)) >= 0.001:
            #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
            # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
            total_datas = local_today_datas[code]
            if total_datas[-1]["index"] + 1 > len(total_datas):
                return False, True, "L2数据错误"
            try:
                sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
                l2_log.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
                if sell1_time is not None and sell1_volumn > 0:
                    # 获取执行位信息
                    order_begin_pos = cls.__get_order_begin_pos(code)
                    buy_nums = order_begin_pos.num
                    for i in range(order_begin_pos.buy_exec_index + 1, total_datas[-1]["index"] + 1):
                        _val = total_datas[i]["val"]
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy(_val):
                            # 涨停买
                            buy_nums += _val["num"] * total_datas[i]["re"]
                        elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                            buy_nums -= _val["num"] * total_datas[i]["re"]
                    if buy_nums < sell1_volumn * 0.49:
                        return False, False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
            except Exception as e:
                logging.exception(e)
            # 量比超过1.3的不能买
            volumn_rate = cls.volume_rate_info[code][0]
            if volumn_rate >= 1.3:
                return False, False, "最大量比超过1.3不能买"
            limit_up_time = cls.__LimitUpTimeManager.get_limit_up_time_cache(code)
            if limit_up_time is not None:
                limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second(
                    limit_up_time)
                if limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second(
                        "13:00:00"):
                    return False, False, "二板下午涨停的不能买,涨停时间为{}".format(limit_up_time)
                if limit_up_time_seconds >= l2.l2_data_util.L2DataUtil.get_time_as_second("14:55:00"):
                    return False, False, "14:55后涨停的不能买,涨停时间为{}".format(limit_up_time)
            # 同一板块中老二后面的不能买
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, False, "没有获取到行业"
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
                # 当老大老二当前没涨停
                return False, False, "同一板块中老三,老四,...不能买"
            if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
                # 水下捞且板块中的票小于16不能买
                # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
                #         industry) <= 16:
                #     return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
                # 水下捞自由流通市值大于老大的不要买
                if codes_index.get(code) != 0:
                    # 获取老大的市值
                    for c in codes_index:
                        if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c):
                            return False, False, "水下捞,不是老大,且自由流通市值大于老大"
            # 13:30后涨停,本板块中涨停票数<29不能买
            # if limit_up_time is not None:
            #     if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
            #         if global_util.industry_hot_num.get(industry) < 16:
            #             return False, "13:30后涨停,本板块中涨停票数<16不能买"
            if codes_index.get(code) is not None and codes_index.get(code) == 1:
                # 如果老大已经买成功了, 老二就不需要买了
                first_codes = []
                for key in codes_index:
                    if codes_index.get(key) == 0:
                        first_codes.append(key)
                # 暂时注释掉
                # for key in first_codes:
                #     state = trade_manager.get_trade_state(key)
                #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                #         # 老大已经买成功了
                #         return False, "老大{}已经买成功,老二无需购买".format(key)
                #
                # # 有9点半涨停的老大才能买老二,不然不能买
                # # 获取老大的涨停时间
                # for key in first_codes:
                #     # 找到了老大
                #     time_ = limit_up_time_manager.get_limit_up_time(key)
                #     if time_ == "09:30:00":
                #         return True, "9:30涨停的老大,老二可以下单"
                # return False, "老大非9:30涨停,老二不能下单"
            # 过时  老二,本板块中涨停票数<29 不能买
            # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
            #         industry) is not None:
            #     if global_util.industry_hot_num.get(industry) < 29:
            #         return False, "老二,本板块中涨停票数<29不能买"
            # 可以下单
            return True, False, None
        finally:
            # l2_data_log.l2_time(code, round((t.time() - __start_time) * 1000), "是否可以下单计算")
            pass
    @classmethod
    def __can_buy_first(cls, code):
trade/current_price_process_manager.py
@@ -8,7 +8,6 @@
from l2.huaxin import huaxin_target_codes_manager
from log_module import async_log_util
from log_module.log import logger_l2_codes_subscript
from ths import client_manager
import constant
from code_attribute import gpcode_manager
from utils import tool, import_util
@@ -24,16 +23,7 @@
def accept_prices(prices, request_id=None):
    # 获取首板代码
    first_codes = gpcode_manager.FirstGPCodesManager().get_first_gp_codes_cache()
    print("总价格代码数量:", len(prices))
    if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
        __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对, 暂时不需要
    # if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
    #     logger_l2_codes_subscript.info("采集到的代码数量不正确:{}", len(prices))
    #     return
    now_str = tool.get_now_time_str()
    # 获取想买单
    want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
@@ -41,7 +31,6 @@
        _code_list = []
        _delete_list = []
        temp_prices = []
        temp_rates = []
        for d in prices:
            code, price = d["code"], float(d["price"])
            temp_prices.append((code, price))
@@ -55,8 +44,9 @@
                if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED:
                    is_want_buy = True
                rate = round((price - pricePre) * 100 / pricePre, 2)
                if first_codes and code in first_codes:
                    rate = rate / 2
                if tool.is_ge_code(rate):
                    # 创业板的涨幅需要打折
                    rate = rate/2
                if rate >= 0 and not trade_manager.ForbiddenBuyCodeByScoreManager().is_in_cache(code):
                    # 暂存涨幅为正的代码
                    _code_list.append((rate, code, 1 if is_want_buy else 0))
@@ -66,11 +56,6 @@
                    # 暂存涨幅为负的代码
                    _delete_list.append((rate, code, 0))
                try:
                    temp_rates.append((code, rate))
                except Exception as e:
                    logging.exception(e)
                try:
                    __actualPriceProcessor.save_current_price(code, price,
                                                              gpcode_manager.get_limit_up_price_by_preprice(code,
                                                                                                            pricePre) == tool.to_price(
@@ -79,7 +64,6 @@
                    logging.exception(e)
                    logger_l2_codes_subscript.exception(e)
        gpcode_manager.set_prices(temp_prices)
        __actualPriceProcessor.process_rates(temp_rates, now_str)
        # -------------------------------处理交易位置分配---------------------------------
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: (e.__getitem__(2), e.__getitem__(0)), reverse=True)
@@ -100,6 +84,9 @@
            new_code_list.remove(item)
        # 截取前几个代码填充
        add_list = new_code_list[:max_count]
        async_log_util.info(logger_l2_codes_subscript,
                            f"({request_id})需要订阅的代码:{add_list}")
        # 后面的代码全部删除
        _delete_list.extend(new_code_list[max_count:])
trade/trade_data_manager.py
@@ -191,7 +191,6 @@
# 代码实时价格管理器
class CodeActualPriceProcessor:
    __under_water_last_time_cache = {}
    __code_current_rate_cache = {}
    __code_current_rate_latest = {}
    __db = 0
@@ -212,12 +211,6 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "under_water_last_time-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__under_water_last_time_cache, code, val)
            keys = RedisUtils.keys(__redis, "code_current_rate-*")
            for k in keys:
                code = k.split("-")[-1]
@@ -227,49 +220,6 @@
            pass
        finally:
            RedisUtils.realse(__redis)
    # 保存跌价的时间
    def __save_down_price_time(self, code, time_str):
        key = "under_water_last_time-{}".format(code)
        tool.CodeDataCacheUtil.set_cache(self.__under_water_last_time_cache, code, time_str)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), time_str)
    def __remove_down_price_time(self, code):
        key = "under_water_last_time-{}".format(code)
        tool.CodeDataCacheUtil.clear_cache(self.__under_water_last_time_cache, code)
        RedisUtils.delete(self.__get_redis(), key)
    def __get_last_down_price_time(self, code):
        key = "under_water_last_time-{}".format(code)
        return RedisUtils.get(self.__get_redis(), key)
    def __get_last_down_price_time_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__under_water_last_time_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    def __increment_down_price_time(self, code, seconds):
        key = "under_water_seconds-{}".format(code)
        RedisUtils.incrby(
            self.__get_redis(), key, seconds)
        # 设置个失效时间
        RedisUtils.expire(self.__get_redis(), key, tool.get_expire())
    def __get_down_price_time_as_seconds(self, code):
        key = "under_water_seconds-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None
        else:
            return int(val)
    # 清除所有的水下捞数据
    def clear_under_water_data(self):
        key = "under_water_*"
        keys = RedisUtils.keys(self.__get_redis(), key)
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
    def __save_current_price_codes_count(self, count):
        key = "current_price_codes_count"
@@ -315,54 +265,6 @@
            return cache_result[1]
        return None
    def process_rate(self, code, rate, time_str):
        # 保存目前的代码涨幅
        self.__save_current_rate(code, rate)
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
            return
        # now_str = tool.get_now_time_str()
        if rate >= 0:
            down_start_time = self.__get_last_down_price_time_cache(code)
            if down_start_time is None:
                return
            else:
                # 累计增加时间
                time_second = tool.trade_time_sub(time_str, down_start_time)
                self.__increment_down_price_time(code, time_second)
                # 删除起始时间
                self.__remove_down_price_time(code)
        else:
            # 记录开始值
            if self.__get_last_down_price_time_cache(code) is None:
                self.__save_down_price_time(code, time_str)
    # datas:[(代码,比例)]
    def process_rates(self, datas, time_str):
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
            return
        # 保存目前的代码涨幅
        self.__save_current_rates(datas)
        # now_str = tool.get_now_time_str()
        for d in datas:
            code, rate = d[0], d[1]
            if rate >= 0:
                down_start_time = self.__get_last_down_price_time_cache(code)
                if down_start_time is None:
                    continue
                else:
                    # 累计增加时间
                    time_second = tool.trade_time_sub(time_str, down_start_time)
                    self.__increment_down_price_time(code, time_second)
                    # 删除起始时间
                    self.__remove_down_price_time(code)
            else:
                # 记录开始值
                if self.__get_last_down_price_time_cache(code) is None:
                    self.__save_down_price_time(code, time_str)
    # 保存现价
    def save_current_price(self, code, price, is_limit_up):
        global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time()))
@@ -378,20 +280,6 @@
    def get_current_price_codes_count(self):
        return self.__get_current_price_codes_count()
    # 是否为水下捞
    def is_under_water(self, code, now_time=None):
        time_seconds = self.__get_down_price_time_as_seconds(code)
        if time_seconds is None:
            return False
        else:
            if time_seconds >= constant.UNDER_WATER_PRICE_TIME_AS_SECONDS:
                if now_time is None:
                    now_time = tool.get_now_time_str()
                space = tool.trade_time_sub(now_time, "09:30:00")
                if space > 0 and time_seconds / space >= 0.2:
                    return True
            return False
    # 当前代码是否涨停
    def current_is_limit_up(self, code):