Administrator
2024-08-06 fda20e935bea4870544695185ec9427dc66fb168
bug修复/H撤更新/接口修改
10个文件已修改
221 ■■■■ 已修改文件
cancel_strategy/s_l_h_cancel_strategy.py 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_block.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_manager.py 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_limit_up_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/init_data_util.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cancel_strategy/s_l_h_cancel_strategy.py
@@ -1,3 +1,4 @@
import copy
import json
import time
@@ -1654,7 +1655,7 @@
        try:
            LCancelOutOfDateWatchIndexesManager().process(code, start_index, end_index)
        except Exception as e:
            l2_log.l_cancel_debug("L后稳定更新出错:{}",str(e))
            l2_log.l_cancel_debug("L后稳定更新出错:{}", str(e))
        # 下单位临近撤
        can_cancel, cancel_data = False, None
        try:
@@ -1887,6 +1888,7 @@
    # L撤触发的代码
    __l_cancel_triggered_codes = set()
    __h_cancel_update_time_cache = {}
    __instance = None
@@ -1932,6 +1934,9 @@
                                                               trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_H,
                                                               buy_single_index,
                                                               list(indexes)))
        self.__set_watch_indexes(code, indexes)
    def __set_watch_indexes(self, code, indexes):
        CodeDataCacheUtil.set_cache(self.__cancel_watch_indexs_cache, code, indexes)
        key = f"h_cancel_watch_indexs-{code}"
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
@@ -2052,8 +2057,91 @@
        if watch_indexes or watch_indexes_up:
            watch_indexes |= watch_indexes_up
            self.__save_watch_index_set(code, buy_single_index, watch_indexes)
            self.__h_cancel_update_time_cache[code] = total_datas[-1]["val"]["time"]
            l2_log.h_cancel_debug(code, f"设置监听范围, 数据范围:{real_place_order_index}-{end_index} 监听范围-{watch_indexes}")
        # 设置真实下单位置
    def __remove_cancel_long_time(self, code, buy_single_index):
        """
        删除已经撤单很久的数据
        @param code:
        @return:
        """
        watch_indexes = self.__get_watch_index_set_cache(code)
        if not watch_indexes:
            return
        if code not in self.__h_cancel_update_time_cache or tool.trade_time_sub(tool.get_now_time_str(),
                                                                                self.__h_cancel_update_time_cache[
                                                                                    code]) < 30 * 60:
            # 没有更新过或者更新时间小于30分钟就不更新
            return
        watch_indexes = copy.deepcopy(watch_indexes)
        # 删除撤单半小时之前的数据
        total_datas = local_today_datas.get(code)
        remove_indexes = set()
        for i in watch_indexes:
            cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                  i,
                                                                                                  total_datas,
                                                                                                  local_today_canceled_buyno_map.get(
                                                                                                      code))
            if cancel_data and tool.trade_time_sub(total_datas[-1]["val"]["time"],
                                                   cancel_data["val"]["time"]) > 60 * 30:
                # 删除撤单时间30分钟以上的数据
                remove_indexes.add(i)
        if not remove_indexes:
            return
        real_place_order_index = self.__SCancelBigNumComputer.get_real_place_order_index_cache(code)
        if not real_place_order_index:
            return
        transaction_index = self.__transaction_progress_index_dict.get(code)
        if transaction_index is None:
            return
        # 起点为真实下单位置往上数3笔
        start_index = real_place_order_index
        count = 0
        for i in range(real_place_order_index - 1, transaction_index, -1):
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if float(val['price']) * val['num'] < 50 * 100:
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     local_today_canceled_buyno_map.get(
                                                                                                         code))
            if left_count > 0:
                count += 1
                start_index = i
                if count >= 3:
                    break
        watch_indexes = watch_indexes - remove_indexes
        # 新增加囊括
        add_indexes = set()
        for i in range(start_index, total_datas[-1]["index"]):
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if i in watch_indexes:
                continue
            if float(val['price']) * val['num'] < 50 * 100:
                continue
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     local_today_canceled_buyno_map.get(
                                                                                                         code))
            if left_count > 0:
                add_indexes.add(i)
                if len(add_indexes) >= len(remove_indexes):
                    break
        watch_indexes |= add_indexes
        l2_log.h_cancel_debug(code, f"H撤更新:新增索引-{add_indexes} 删除索引-{remove_indexes}")
        self.__save_watch_index_set(code, buy_single_index, watch_indexes)
        # 设置更新时间,
        self.__h_cancel_update_time_cache[code] = total_datas[-1]["val"]["time"]
    def __need_compute_watch_indexes(self, code, transaction_index):
        """
@@ -2214,6 +2302,10 @@
            if rate >= threshold_rate:
                l2_log.h_cancel_debug(code, f"撤单比例:{rate}")
                return True, total_data[-1]
        try:
            self.__remove_cancel_long_time(code, buy_single_index)
        except Exception as e:
            l2_log.h_cancel_debug(code, f"更新H撤囊括范围出错:{str(e)}")
        return False, None
    # 下单成功
code_attribute/first_target_code_data_processor.py
@@ -12,6 +12,7 @@
from code_attribute.gpcode_manager import WantBuyCodesManager
from log_module import async_log_util
from log_module.log import logger_first_code_record, logger_l2_codes_subscript
from third_data import history_k_data_manager
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
@@ -69,7 +70,7 @@
                # 获取涨停价
                _limit_up_price = gpcode_manager.get_limit_up_price(code)
                if not _limit_up_price:
                    init_data_util.re_set_price_pres([code], True)
                    history_k_data_manager.re_set_price_pres([code], True)
                    # 再次获取涨停价
                    _limit_up_price = gpcode_manager.get_limit_up_price(code)
                if _limit_up_price:
@@ -85,7 +86,7 @@
        if gpcode_manager.get_limit_up_price(code) is None:
            need_get_limit_up_codes.add(code)
    if need_get_limit_up_codes:
        init_data_util.re_set_price_pres(list(need_get_limit_up_codes), True)
        history_k_data_manager.re_set_price_pres(list(need_get_limit_up_codes), True)
    logger_l2_codes_subscript.info(f"{request_id}加载l2代码涨停价结束")
    # 获取60天最大记录
    for code in codes:
@@ -107,6 +108,9 @@
                    volumes_data = HistoryKDataManager().get_history_bars(code, latest_trading_date)
                if not volumes_data:
                    volumes_data = init_data_util.get_volumns_by_code(code, 150)
                    async_log_util.info(logger_l2_codes_subscript, f"{request_id}从网络加载K线数据:{code}")
                if not volumes_data:
                    continue
                volumes = init_data_util.parse_max_volume(code, volumes_data[:90],
                                                          code_nature_analyse.is_new_top(code,
                                                                                         limit_up_price,
@@ -115,7 +119,7 @@
                                                              code,
                                                              limit_up_price,
                                                              volumes_data[:90]))
                logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
                async_log_util.info(logger_first_code_record, f"{code} 获取到首板60天最大量:{volumes}")
                code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2], volumes[3])
                # 保存K线形态
@@ -226,7 +230,7 @@
    # 获取涨停价
    if temp_codes:
        # 获取涨停价
        init_data_util.re_set_price_pres(temp_codes)
        history_k_data_manager.re_set_price_pres(temp_codes)
        # 重新获取涨停价
        for code in temp_codes:
            limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -252,7 +256,7 @@
            gpcode_manager.FirstCodeManager().add_limited_up_record([code])
        pricePre = gpcode_manager.CodePrePriceManager.get_price_pre_cache(code)
        if pricePre is None:
            init_data_util.re_set_price_pres([code])
            history_k_data_manager.re_set_price_pres([code])
        rate = round((float(price) - pricePre) * 100 / pricePre, 1)
        prices.append(
code_attribute/gpcode_manager.py
@@ -595,8 +595,7 @@
    # 设置收盘价
    @classmethod
    def set_price_pre(cls, code, price, force=False):
        codes = get_gp_list()
        if code not in codes and not FirstCodeManager().is_in_first_record_cache(code) and not force:
        if code in cls.__price_pre_cache and not force:
            return
        price = round(float(price), 2)
        logger_pre_close_price.info(f"{code}-{price}")
huaxin_client/l1_client.py
@@ -277,11 +277,13 @@
                    plist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            # 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵
            MAX_COUNT = 1000
            MAX_COUNT = 500
            if now_time_int < int("092600"):
                MAX_COUNT = 100
            elif now_time_int < int("092800"):
                MAX_COUNT = 200
            elif now_time_int < int("092800"):
                MAX_COUNT = 300
            elif now_time_int < int("092900"):
                MAX_COUNT = 400
            datas = flist[:MAX_COUNT]
            # 将持仓股加入进去
            datas.extend(plist)
l2/huaxin/huaxin_target_codes_manager.py
@@ -11,7 +11,7 @@
from db import redis_manager_delegate as redis_manager
from log_module import async_log_util
from log_module.log import logger_l2_codes_subscript, logger_debug
from third_data import kpl_data_manager, kpl_api
from third_data import kpl_data_manager, kpl_api, history_k_data_manager
from trade import current_price_process_manager
from utils import tool, global_util, init_data_util
@@ -80,7 +80,10 @@
            if not gpcode_manager.get_limit_up_price(code):
                need_get_limit_up_codes.append(code)
        if need_get_limit_up_codes:
            init_data_util.re_set_price_pres(need_get_limit_up_codes, True)
            # 加载昨日收盘价
            async_log_util.info(logger_l2_codes_subscript, f"({request_id})准备加载昨日收盘价")
            history_k_data_manager.re_set_price_pres(need_get_limit_up_codes, True)
            async_log_util.info(logger_l2_codes_subscript, f"({request_id})昨日收盘价加载完成")
        for d in datas:
            code = d[0]
@@ -113,7 +116,7 @@
                if zylt:
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if not limit_up_price:
                        init_data_util.re_set_price_pre(code, True)
                        history_k_data_manager.re_set_price_pres([code], True)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price:
                        zylt = int(zylt / d[1] * float(limit_up_price))
servers/data_server.py
@@ -9,7 +9,7 @@
from code_attribute.gpcode_manager import BlackListCodeManager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager
from utils import global_util, tool, data_export_util
from code_attribute import gpcode_manager
from log_module import log, log_analyse, log_export, async_log_util
@@ -711,6 +711,15 @@
            # 获取手续费详情
            try:
                fdata = {"delegates": {}}
                # 获取本月的手续费
                end_date = tool.get_now_date_str("%Y%m%d")
                start_date = f"{end_date[:6]}01"
                delegates = trade_data_manager.AccountMoneyManager().get_delegated_count_info(start_date, end_date)
                deal_count = trade_data_manager.AccountMoneyManager().get_deal_count(start_date, end_date)
                cost = sum([round(0.1 * x[1], 2) for x in delegates])
                make = deal_count * 5
                fdata["month_commission"] = round(make - cost, 2)
                # 计算当日手续费详情
                delegates = trade_data_manager.AccountMoneyManager().get_delegated_count_info()
                delegates = [{"count": x[1], "price": 0.1, "money": round(0.1 * x[1], 2)} for x in delegates]
                fdata["delegates"]["buy"] = delegates[0]
@@ -719,7 +728,7 @@
                fdata["delegates"]["sell"] = delegates[3]
                deal_count = trade_data_manager.AccountMoneyManager().get_deal_count()
                fdata["deal"] = {"count": deal_count, "price": 5, "money": round(5 * deal_count, 2)}
                fdata["commission"] =  trade_data_manager.AccountMoneyManager().get_commission_cache()
                fdata["commission"] = trade_data_manager.AccountMoneyManager().get_commission_cache()
                response_data = json.dumps({"code": 0, "data": fdata})
            except Exception as e:
                logger_debug.exception(e)
@@ -803,6 +812,10 @@
                        LatestLimitUpBlockManager().set_current_limit_up_data(tool.get_now_date_str(), result_list_)
                    except:
                        pass
                    try:
                        CodeLimitUpSequenceManager().set_current_limit_up_datas(result_list_)
                    except:
                        pass
                    self.__kplDataManager.save_data(type_, result_list_)
            except Exception as e:
                logger_debug.exception(e)
test/test_block.py
@@ -1,21 +1,33 @@
from third_data import  kpl_data_manager, kpl_util
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager
from third_data import kpl_data_manager, kpl_util, block_info
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager
from utils import tool
def block_run():
    current_datas = kpl_data_manager.KPLDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP)
    kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(),current_datas)
    kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), current_datas)
    LatestLimitUpBlockManager().set_current_limit_up_data(tool.get_now_date_str(), current_datas)
    LatestLimitUpBlockManager().statistics_limit_up_block_infos()
def get_code_current_block():
    kpl_data_manager.get_current_limit_up_data_records(10)
    current_datas = kpl_data_manager.KPLDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP)
    kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), current_datas)
    CodeLimitUpSequenceManager().set_current_limit_up_datas(current_datas)
    code = "301136"
    limit_up_sequence = CodeLimitUpSequenceManager.get_current_limit_up_sequence(code)
    if limit_up_sequence:
        print(
            f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})")
if __name__ == "__main__":
    block_run()
    get_code_current_block()
    # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    # code_plate_key_manager.ForbiddenBlockManager().add("测试2")
    # code_plate_key_manager.ForbiddenBlockManager().add("测试3")
    # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    # print( code_plate_key_manager.ForbiddenBlockManager().is_in("测试"))
    # print(code_plate_key_manager.ForbiddenBlockManager().is_in("测试1"))
    # RedisUtils.run_loop()
    # RedisUtils.run_loop()
third_data/history_k_data_manager.py
@@ -1,11 +1,13 @@
"""
历史K线管理
"""
import copy
import datetime
import os
import threading
import constant
from code_attribute import gpcode_manager
from huaxin_client import l1_subscript_codes_manager
from log_module.log import logger_debug
from third_data import history_k_data_util
@@ -45,9 +47,24 @@
    return len(codes)
def re_set_price_pres(codes, force=False):
    day = tool.get_now_date_str()
    # 通过历史数据缓存获取
    not_codes = []
    for code in codes:
        pre_close = HistoryKDataManager().get_pre_close(code, day)
        if pre_close is not None:
            gpcode_manager.CodePrePriceManager.set_price_pre(code, pre_close, force)
        else:
            not_codes.append(code)
    if not_codes:
        init_data_util.re_set_price_pres(not_codes, force)
class HistoryKDataManager:
    __instance = None
    __db = 0
    __history_k_day_datas = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -100,15 +117,20 @@
        path_str = f"{cache_dir}/{file_name}"
        if os.path.exists(path_str) and not force:
            return
        if day not in self.__history_k_day_datas:
            self.__history_k_day_datas[day] = {}
        if datas:
            self.__history_k_day_datas[day][code] = datas
            # 将日期格式化
            fdatas = []
            for d in datas:
                for k in d:
                    if type(d[k]) == datetime.datetime:
                        d[k] = d[k].strftime("%Y-%m-%d %H:%M:%S")
        with open(path_str, encoding="utf-8", mode='w') as f:
            f.write(f"{datas}")
                dd = copy.deepcopy(d)
                for k in dd:
                    if type(dd[k]) == datetime.datetime:
                        dd[k] = dd[k].strftime("%Y-%m-%d %H:%M:%S")
                fdatas.append(dd)
            with open(path_str, encoding="utf-8", mode='w') as f:
                f.write(f"{fdatas}")
        self.__del_outdate_datas(code)
    def get_history_bars(self, code, day):
@@ -118,6 +140,8 @@
        @param day:
        @return:
        """
        if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
            return self.__history_k_day_datas[day][code]
        cache_dir = self.__get_cache_dir()
        file_name = f"{day}_{code}.txt"
        path_str = f"{cache_dir}/{file_name}"
@@ -135,6 +159,17 @@
                return datas
        return None
    def get_pre_close(self, code, day):
        """
        获取之前的收盘价
        @param code:
        @param day:
        @return:
        """
        if day in self.__history_k_day_datas and code in self.__history_k_day_datas[day]:
            return self.__history_k_day_datas[day][code][0]["close"]
        return None
    def get_history_bars_codes(self, day):
        """
        获取某一天的历史K线的代码数据
third_data/kpl_limit_up_data_manager.py
@@ -52,7 +52,7 @@
        records = get_today_history_limit_up_datas_cache()
        # 按代码排序
        # {"代码":(代码,涨停原因, 涨停时间, 几版)}
        current_code_block_dict = {x[0]: (x[0], x[2], x[5], x[4]) for x in current_limit_up_datas}
        current_code_block_dict = {x[0]: (x[0], x[5], x[2], x[4]) for x in current_limit_up_datas}
        record_code_block_dict = {x[3]: (x[3], x[2], x[5], x[12]) for x in records}
        # 根据涨停原因统计
        # {"板块":{代码}}
utils/init_data_util.py
@@ -12,6 +12,7 @@
def re_set_price_pres(codes, force=False):
    # 通过历史数据缓存获取
    result = HistoryKDatasUtils.get_gp_latest_info(codes)
    for item in result:
        symbol = item['symbol']
@@ -24,12 +25,14 @@
# 获取最近一次涨停/涨停下一个交易日的最大值
def get_volumns_by_code(code, count=60) -> object:
    datas = HistoryKDatasUtils.get_history_tick_n(code, count, "open,high,low,close,volume,pre_close,bob,amount")
    if not datas:
        return None
    # 计算
    datas.sort(key=lambda x: x["bob"], reverse=True)
    return datas
def parse_max_volume(code,  datas, is_new_or_near_top=False):
def parse_max_volume(code, datas, is_new_or_near_top=False):
    result = __parse_max_volume(code, datas, is_new_or_near_top)
    refer_index = result[3]
    # 计算最低价