Administrator
2025-05-23 c285883d71ef8a362b012983dadc7ce4256b40f6
bug修复
12个文件已修改
716 ■■■■ 已修改文件
l2_data_parser.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 140 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_block.py 173 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_code_attribute.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_k_datas.py 109 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_l2.py 87 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_log.py 102 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_marshal.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mul_queue.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_shared_memery.py 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_strategy.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py
@@ -276,8 +276,10 @@
        print(f"处理第{index}批次")
        df["TradePrice"] = df["TradePrice"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
        # 计算成交金额
        df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
        # 按SecurityID和BuyNo分组
        grouped = df.groupby(['SecurityID', 'BuyNo'])
@@ -331,6 +333,9 @@
        df = df[df["TickType"] == 'T']
        df["Price"] = df["Price"].apply(str_to_float)
        df["SecurityID"] = df["SecurityID"].apply(code_format)
        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
        # 计算成交金额
        df['TradeMoney'] = df["TradeMoney"].apply(str_to_float)
        # 按SecurityID和BuyNo分组
test/l2_trade_test.py
@@ -17,16 +17,14 @@
from l2.l2_sell_manager import L2MarketSellManager
from l2.l2_transaction_data_manager import HuaXinSellOrderStatisticManager
from log_module import log, log_export, async_log_util
from trade.huaxin import huaxin_trade_api
from trade.radical_buy_data_manager import RedicalBuyDataManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager
from utils import tool, init_data_util
from db import redis_manager_delegate as redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress, l2_data_manager_new, l2_transaction_data_processor, \
    cancel_buy_strategy
from l2.transaction_progress import TradeBuyQueue
from third_data import kpl_util, kpl_data_manager, block_info
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodePlateKeyBuyManager, \
    RadicalBuyBlockManager
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager
from third_data.kpl_data_manager import KPLDataManager
from trade import trade_data_manager, current_price_process_manager, l2_trade_util, trade_manager
import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy
@@ -159,7 +157,7 @@
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            init_data_util.re_set_price_pre(code)
        current_price_process_manager.set_trade_price(code, round(float(gpcode_manager.get_limit_up_price(code)), 2))
        current_price_process_manager.set_trade_price(code, gpcode_manager.get_limit_up_price_as_num(code))
        pss_server, pss_strategy = multiprocessing.Pipe()
        # huaxin_trade_api.run_pipe_trade(pss_server, None, None)
@@ -239,7 +237,7 @@
    # @unittest.skip("跳过此单元测试")
    def test_block(self):
        codes_str = "603778"
        codes_str = "300390"
        codes = codes_str.split(",")  # ["002889", "300337", "001298", "002771"]
        kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas()
        kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(),
@@ -278,7 +276,7 @@
            result = RadicalBuyBlockManager.is_radical_buy(code, yesterday_codes)
            print(code, result)
            if result[0]:
                can_buy_result = RedicalBuyDataManager.can_buy(code)
                can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
                if can_buy_result[0]:
                    print("可以买", code, result)
                else:
@@ -318,6 +316,13 @@
        for d in fdatas:
            l2_transaction_data_processor.HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas(code, d)
    def test_cancel(self):
        code = "603300"
        l2.l2_data_util.load_l2_data(code)
        TradeBuyQueue.get_traded_index = mock.Mock(return_value=(3742, False))
        l2.cancel_buy_strategy.RDCancelBigNumComputer._RDCancelBigNumComputer__watch_indexes_cache ={ code: {3686, 3691, 3693, 3694, 3699, 3700}}
        l2.cancel_buy_strategy.RDCancelBigNumComputer().need_cancel(code, 4370, 4372)
class TestTradedProgress(unittest.TestCase):
    @unittest.skip("跳过此单元测试")
test/test.py
@@ -1,26 +1,126 @@
import base64
import json
import os
import queue
import time
from code_attribute import global_data_loader
from settings.trade_setting import TradeBlockBuyModeManager
from third_data.history_k_data_util import HistoryKDatasUtils
import xlwt
from code_attribute import gpcode_manager
from l2.huaxin import l2_huaxin_util
from utils import tool
def test_active_buy():
    current_rank = 2
    TIME_STR_RANGES = ["10:00:00", "10:30:00", "11:00:00", "13:00:00", "13:30:00", "14:00:00", "14:30:00",
                       "15:00:00"]
    TIME_INT_RANGES = [int(x.replace(':', '')) for x in TIME_STR_RANGES]
    MAX_RANKS = [3, 3, 2, 2, 1, 0, 0, 0]
    now_time_str = "09:36:00".replace(':', '')
    for i in range(len(TIME_INT_RANGES)):
        if int(now_time_str) <= TIME_INT_RANGES[i]:
            if MAX_RANKS[i] > current_rank:
                return True
            break
    return False
class Test:
    def print_log(self, arg):
        print(time.time(), f"arg:{arg}")
        time.sleep(2)
if __name__ == "__main__":
    print( base64.b64decode("WwogICAgWwogICAgICAgICLkuK3lrZflpLQiLAogICAgICAgIDIKICAgIF0KXQ==").decode('utf-8'))
def parse_big_buy_order_count(path_, limit_up_price_dict=None, max_time=None):
    """
    统计某一天的涨停大单数量分布
    @param path_:
    @param limit_up_price_dict:
    @return:
    """
    with open(path_, mode='r', encoding='utf-8') as f:
        lines = f.readlines()
        code_big_order_count_dict = {}
        for line in lines:
            data = line[line.find(" - ") + 3:].strip()
            data = eval(data)
            if data[1] != 0:
                continue
            if data[2][2] < 1000000:
                continue
            if max_time and l2_huaxin_util.convert_time(data[2][3]) > max_time:
                continue
            # if data[0] not in limit_up_price_dict:
            #     continue
            # if abs(data[2][4] - limit_up_price_dict[data[0]]) >= 0.001:
            #     continue
            if data[0] not in code_big_order_count_dict:
                code_big_order_count_dict[data[0]] = [0, 0]
            code_big_order_count_dict[data[0]][0] += 1
            code_big_order_count_dict[data[0]][1] += data[2][2]
        return code_big_order_count_dict
def __load_limit_up_price():
    # 日期下代码的涨停价
    limit_up_price_date_code_dict = {}
    dir_path = r"D:\回测数据\K线数据"
    k_bars_files = os.listdir(dir_path)
    for k_bars_file in k_bars_files:
        code = k_bars_file.split("_")[1][:6]
        with open(f"{dir_path}\\{k_bars_file}", encoding='utf-8', mode='r') as f:
            lines = f.readlines()
            data = lines[0]
            datas = eval(data)
            # {"日期": 涨停价}
            date_limit_price = {
                d["bob"][:10]: gpcode_manager.get_limit_up_price_by_preprice(d["sec_id"], d["pre_close"]) for d in
                datas}
            for date in date_limit_price:
                if date not in limit_up_price_date_code_dict:
                    limit_up_price_date_code_dict[date] = {}
                limit_up_price_date_code_dict[date][code] = round(float(date_limit_price[date]), 2)
    return limit_up_price_date_code_dict
def export(fdatas, dates, file_name):
    wb = xlwt.Workbook(encoding="utf-8")
    ws = wb.add_sheet('sheet1')
    for i in range(len(dates)):
        ws.write(0, i + 1, dates[i])
    index = 0
    for code in fdatas:
        date_count_dict = fdatas[code]
        index += 1
        ws.write(index, 0, code)
        for i in range(len(dates)):
            d = date_count_dict.get(dates[i])
            average_big_money = d[1] // d[0] if d[0] > 0 else 0
            if d[0] > 0:
                ws.write(index, i + 1, f"{d[0]}/{round(average_big_money / 10000, 1)}万")
            else:
                ws.write(index, i + 1, f"")
    wb.save(file_name)
if __name__ == '__main__':
    q = queue.Queue()
    if not  q.empty():
        data = q.get(block=False)
        print(data)
if __name__ == '__main__1':
    # 加载每一天的代码的涨停价
    limit_up_price_date_code_dict = __load_limit_up_price()
    print(limit_up_price_date_code_dict["2025-04-22"]["002719"])
    dir_path = r"D:\回测数据\大单数据"
    files = os.listdir(dir_path)
    dates = []
    big_order_count_date_code_dict = {}
    for file_name in files:
        path_ = f"{dir_path}\\{file_name}"
        date = file_name[-14:-4]
        dates.append(date)
        code_big_order_count_dict = parse_big_buy_order_count(path_, limit_up_price_date_code_dict[date])
        print(code_big_order_count_dict)
        big_order_count_date_code_dict[date] = code_big_order_count_dict
    # 转为:{代码:{"日期":大单数}}
    fdata = {}
    for date in big_order_count_date_code_dict:
        for code in big_order_count_date_code_dict[date]:
            if code not in fdata:
                fdata[code] = {}
            fdata[code][date] = big_order_count_date_code_dict[date][code]
    for code in fdata:
        for date in dates:
            if date not in fdata[code]:
                # 默认填充0
                fdata[code][date] = [0, 0]
    dates.sort(reverse=True)
    export(fdata, dates, "D:/test.xls")
test/test_block.py
@@ -1,14 +1,21 @@
import copy
import json
import time
from db import redis_manager
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_data_manager, kpl_util, kpl_api
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager, KPLCodeJXBlockManager
import constant
from code_attribute import global_data_loader
from l2 import code_price_manager
from log_module import async_log_util
from log_module.log import logger_debug
from third_data import kpl_data_manager, kpl_util, block_info, kpl_api
from third_data.code_plate_key_manager import RealTimeKplMarketData
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager
from third_data.third_blocks_manager import BlockMapManager, CodeThirdBlocksManager, SOURCE_TYPE_KPL
from utils import tool
from utils.kpl_data_db_util import KPLLimitUpDataUtil
from trade.buy_radical import new_block_processor
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager
from utils import tool, init_data_util
def block_run():
@@ -30,23 +37,135 @@
            f"{limit_up_sequence[0]}-{limit_up_sequence[1]}({limit_up_sequence[2]}&{limit_up_sequence[2] - limit_up_sequence[3]})")
if __name__ == "__main__":
    limit_up_timestamp = time.time()
    current_before_codes_info = [("000333", time.time())]
    if tool.trade_time_sub(tool.timestamp_format(limit_up_timestamp, '%H:%M:%S'),
                           tool.timestamp_format(current_before_codes_info[-1][1], '%H:%M:%S')) >= 10 * 60:
        print("123123")
def test_block():
    codes_str = "002845"
    codes = codes_str.split(",")  # ["002889", "300337", "001298", "002771"]
    kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas()
    kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(),
                                                             kpl_data_manager.KPLDataManager.get_data(
                                                                 kpl_util.KPLDataType.LIMIT_UP))
    for code in codes:
        # KPLCodeJXBlockManager().load_jx_blocks(code, 23.52,23.62,
        #                                        kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reasons())
        #
        block_info.init_code(code)
        # latest_current_limit_up_records = kpl_data_manager.get_latest_current_limit_up_records()
    # code = "603825"
    # k3 = CodesHisReasonAndBlocksManager().get_history_blocks(code)
    # print(k3)
    # k3 = CodesHisReasonAndBlocksManager().get_history_blocks(code)
    # print(k3)
    # print(KPLLimitUpDataUtil.get_latest_block_infos(code="000561"))
    # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    # code_plate_key_manager.ForbiddenBlockManager().add("测试2")
    # code_plate_key_manager.ForbiddenBlockManager().add("测试3")
    # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    # print( code_plate_key_manager.ForbiddenBlockManager().is_in("测试"))
    # print(code_plate_key_manager.ForbiddenBlockManager().is_in("测试1"))
    # RedisUtils.run_loop()
        init_data_util.re_set_price_pre(code, True)
        limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code)
        if limit_up_data:
            limit_up_time = tool.to_time_str(limit_up_data[2])
        RadicalBuyBlockManager.set_current_limit_up_datas(
            kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas)
        # CodePlateKeyBuyManager.update_can_buy_blocks(code,
        #                                              kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
        #                                              kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
        #                                              latest_current_limit_up_records,
        #                                              block_info.get_before_blocks_dict(),
        #                                              kpl_data_manager.KPLLimitUpDataRecordManager.get_current_limit_up_reason_codes_dict())
        # can_buy_result = CodePlateKeyBuyManager.can_buy(code)
        # print(can_buy_result)
        # if can_buy_result:
        #     if can_buy_result[0]:
        #         blocks = ",".join([f"{x[0]}-{x[1] + 1}({x[2]}&{x[3] - x[2]})" for x in can_buy_result[0]])
        #         print(blocks)
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        if yesterday_codes is None:
            yesterday_codes = set()
        result = RadicalBuyBlockManager.is_radical_buy(code, yesterday_codes)
        print(code, result)
        if result[0]:
            can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
            if can_buy_result[0]:
                print("可以买", code, result)
            else:
                print("不可以买", code, can_buy_result)
    # l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first(code, None)
def update_history_limit_up_codes():
    day = "2024-12-19"
    for i in range(0, 1):
        day = HistoryKDatasUtils.get_next_trading_date(day)
        # if day == "2024-12-20":
        #    break
        results = kpl_api.getHistoryLimitUpInfo(day)
        result_list = kpl_util.parseDaBanData(json.dumps({"list": results, "errcode": 0}), kpl_util.DABAN_TYPE_LIMIT_UP)
        kpl_data_manager.KPLLimitUpDataRecordManager.save_record(day, result_list, set_not_open=True)
def do_limit_up(result_list_):
    def request_new_blocks_codes(blocks_info, all_new_blocks):
        """
                请求新板块的代码
                @param blocks_info:[(板块名称,板块代码)]
                @return:
                """
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        for bi in blocks_info:
            result = kpl_api.getCodesByPlate(bi[1])
            result = json.loads(result)
            code_info_list = []
            for d in result["list"]:
                if d[0] in yesterday_codes:
                    continue
                # 涨幅要大于5%
                rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10))
                if rate < 5:
                    continue
                # 格式:(代码,涨幅)
                code_info_list.append((d[0], d[6]))
            if code_info_list:
                # 将代码加入新题材
                new_block_processor.process_new_block_by_component_codes(bi[0], set([x[0] for x in code_info_list]),
                                                                         all_new_blocks)
    try:
        if result_list_:
            try:
                # 新题材
                new_block_codes = new_block_processor.screen_new_blocks_with_limit_up_datas(
                    [(x[0], x[5]) for x in result_list_])
                if new_block_codes:
                    # 统计板块的代码
                    records = KPLLimitUpDataRecordManager.total_datas
                    block_plate_code_dict = {}
                    for x in records:
                        block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15]
                    # 新板块
                    update_new_block_plates = []
                    for b in new_block_codes:
                        for c in new_block_codes[b]:
                            new_block_processor.process_new_block_by_limit_up_list(c, b)
                    for r in new_block_codes:
                        if r in block_plate_code_dict:
                            update_new_block_plates.append((r, block_plate_code_dict[r]))
                    if update_new_block_plates:
                        # 需要获取板块下的代码
                        request_new_blocks_codes(update_new_block_plates, new_block_codes.keys())
            except:
                pass
    except Exception as e:
        logger_debug.exception(e)
if __name__ == "__main__":
    try:
        global_data_loader.load_zyltgb()
        KPLLimitUpDataRecordManager.load_total_datas()
        records = KPLLimitUpDataRecordManager.total_datas
        # 计算今日新增的题材概念
        with open("D:/trade_cache/2025-04-10_limit_up.log") as f:
            lines = f.readlines()
            do_limit_up(json.loads(lines[0]))
        code = "002163"
        print(LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code))
        print(BlockSpecialCodesManager().get_code_blocks(code))
        print(BlockSpecialCodesManager().get_code_blocks_dict())
    except:
        pass
test/test_code_attribute.py
@@ -1,4 +1,4 @@
from code_attribute import code_nature_analyse
from code_attribute import code_nature_analyse, code_volumn_manager
from utils import init_data_util, tool
@@ -23,35 +23,20 @@
def __get_refer_volume_info():
    code = "600053"
    limit_up_price = 14.91
    code = "301511"
    limit_up_price = 4.82
    volumes_data = init_data_util.get_volumns_by_code(code, 150)
    volumes_data = volumes_data[1:]
    volumes = init_data_util.parse_max_volume(code, volumes_data[:90],
                                              code_nature_analyse.is_new_top(code,
                                                                             limit_up_price,
                                                                             volumes_data[
                                                                             :90]) or code_nature_analyse.is_near_top(
                                                  code,
                                                  limit_up_price,
                                                  volumes_data[:90]))
    # volumes_data = volumes_data[1:]
    volumes = init_data_util.parse_max_volume_new(code, volumes_data[:90])
    max_volume_in_5_days = init_data_util.parse_max_volume_in_days(volumes_data, 5)
    code_volumn_manager.CodeVolumeManager().set_histry_volumn(code, volumes[0], volumes[1], volumes[2], volumes[3],
                                                              max_volume_in_5_days)
    print(volumes)
if __name__ == "__main__":
    code = "301337"
    limit_up_price = 34.56
    volumes_data = init_data_util.get_volumns_by_code(code, 150)
    volumes = init_data_util.parse_max_volume(code, volumes_data[:90],
                                              code_nature_analyse.is_new_top(code,
                                                                             limit_up_price,
                                                                             volumes_data[
                                                                             :90]) or code_nature_analyse.is_near_top(
                                                  code,
                                                  limit_up_price,
                                                  volumes_data[:90]))
    # 保存K线形态
    k_format = code_nature_analyse.get_k_format(code, limit_up_price, volumes_data)
    __get_refer_volume_info()
    # is_too_high()
    # code = "601022"
    # volumes_data = init_data_util.get_volumns_by_code(code, 150)
test/test_k_datas.py
@@ -1,21 +1,100 @@
import threading
import time
from code_attribute import code_nature_analyse, gpcode_manager
from log_module.log import logger_debug
from third_data import history_k_data_manager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi
from trade import l2_trade_util
from utils import tool, init_data_util
if __name__ == '__main__':
    # code = "603960"
    # day = tool.get_now_date_str()
    # datas = init_data_util.get_volumns_by_code(code, 150)
    # HistoryKDataManager().save_history_bars(code, day, datas, force=True)
    # print(HistoryKDataManager().get_history_bars(code, day))
    # print(HistoryKDataManager().get_pre_close(code, day))
    # history_k_data_manager.re_set_price_pres([code])
    # volumes_data = init_data_util.get_volumns_by_code("600211", 150)
    # for volume in volumes_data:
    #     print(volume)
    code = "000333"
    now_day = "2024-10-10"
def __update():
    time.sleep(5)
    history_k_data_manager.update_history_k_bars()
def __is_normal_in_5d(code):
    """
    最近5天是否处于正常状态
    @param code:
    @return:
    """
    now_day = tool.get_now_date_str()
    results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix([code]),
                                                tool.date_sub(now_day, 30), tool.date_sub(now_day, 1))
    print(results)
    results = results[-5:]
    normal = True
    for r in results:
        if r["sec_level"] != 1:
            normal = False
            break
    return normal
if __name__ == '__main__1':
    codes = {'300947', '002025', '002875', '000757', '603076', '600785', '300766', '600595', '603511', '002205',
             '002943', '002802', '002715', '603967', '600818', '000796', '600960', '603882', '002883', '603578',
             '600650', '603028', '002905', '300058', '000503', '603228', '603920', '603693', '002209', '300819',
             '002105', '300155', '002152', '002982', '600588', '001282', '605186', '600889', '002364', '603106',
             '300451', '600506', '002559', '002363', '603888', '605259', '605069', '002622', '001258', '601566',
             '002946', '002733', '002448', '000056', '600101', '603979', '002800', '603881', '003010', '002295',
             '600322', '603580', '300153', '605398', '300946', '603506', '601218', '603938', '600120', '601619',
             '002229', '002990', '002298', '603305', '002188', '002560', '300244', '001313', '000899', '600319',
             '603183', '603876', '002662', '601579', '605333', '000818', '300069', '000048', '002881', '600590',
             '002261', '002734', '600868', '002285', '002847', '002444', '000715', '601777', '000710', '002133',
             '600143', '600280', '605011', '600728', '300488', '002286', '600718', '002354', '002475', '600114',
             '603312', '300454', '300941', '000151', '600708', '002163', '002305', '002851', '002173', '002785',
             '600032', '000920', '002913', '001389', '603121', '603063', '002384', '300067', '603273', '600602',
             '000599', '603517', '002335', '002529', '002463', '600719', '601921', '603280', '600336', '600533',
             '002358', '002357', '600979', '301396', '002068', '605255', '002248', '000722', '000688', '002518',
             '603001', '600318', '600095', '300746', '001333', '002976', '002122', '301230', '603829', '300599',
             '001279', '000007', '603269', '002743', '603286', '002134', '603193', '600376', '002970', '603860',
             '002044', '000657', '002273', '605228', '002953', '002484', '600758', '600456', '605388', '603729',
             '300288', '001298', '300528', '300643', '002707', '603108', '605162', '000592', '603716', '000558',
             '600173', '002380', '300534', '600238', '002394', '603901', '002418', '002965', '002296', '603566',
             '002779', '000888', '600825', '603268', '003038', '002993', '000659', '002580', '300253', '603215',
             '002725', '603038', '002938', '002666', '002599', '600665', '603500', '002065', '002760', '301299',
             '300078', '002436', '601869', '603610', '002888', '000815', '600183', '603238', '002829', '603296',
             '603950', '603109', '000428'}
    print(",".join(codes))
if __name__ == '__main__':
    code = "002809"
    limit_up_price = 13.01
    volumes_data = init_data_util.get_volumns_by_code(code, 150)
    volumes = init_data_util.parse_max_volume_new(code, volumes_data[:60])
    if code_nature_analyse.is_price_too_high_in_days(code, volumes_data, limit_up_price)[0]:
        pass
    # 保存K线形态
    k_format = code_nature_analyse.get_k_format(code, limit_up_price, volumes_data)
    if k_format[10] and k_format[10][0]:
        # 近5个交易日有涨停/炸板/跌停
        price_info = k_format[10][1]
        p_price_info = price_info[0]  # (价格,量)
        t_price_info = price_info[1]  # (价格,量)/None
        # 初始化不能买
        can_buy = False
        # 抛压已释放:今日涨停价≥T高价且T高价当日成交量≥P高价对应的那一天的成交量*101%
        if t_price_info and limit_up_price >= t_price_info[0] and t_price_info[1] >= p_price_info[0] * 1.01:
            can_buy = True
        if not can_buy:
            # 抛压当日大幅释放:今日涨停价≥T高价+2%且今日涨停实时成交量≥T高价当日成交量*90%
            if t_price_info and limit_up_price >= t_price_info[0] * 1.02:
                today_volume = int(98.02 * 10000 * 100)
                if today_volume >= t_price_info[1] * 0.9:
                    can_buy = True
        if not can_buy:
            # 抛压今日强势释放:没有T高价时,今日涨停价≥P高价+6%且今日涨停实时成交量≥P高价当日成交量*101%
            if not t_price_info and limit_up_price >= p_price_info[0] * 1.06:
                today_volume = int(98.02 * 10000 * 100)
                if today_volume >= p_price_info[1] * 1.01:
                    can_buy = True
        if not can_buy:
            print("抛压没释放")
        else:
            print("抛压已释放")
test/test_l2.py
@@ -1,11 +1,80 @@
import csv
import time
import unittest
from unittest import mock
from huaxin_client.l2_client_test import L2TransactionDataManager
from l2 import l2_data_util
from l2.huaxin.huaxin_delegate_postion_manager import RealDelegateOrderPositionManager
from l2.transaction_progress import TradeBuyQueue
from trade.huaxin import huaxin_trade_record_manager
# class TestRealPlaceOrderIndex(unittest.TestCase):
#
#     @unittest.skip("跳过此单元测试")
#     def test_sort(self):
#         l2_data_util.load_l2_data_all()
#         code = "600797"
#         today_datas = l2_data_util.local_today_datas.get(code)
#         RealDelegateOrderPositionManager.place_order(code, [(600, 13.60, 80030), (800, 13.60, 80035)], 23900,
#                                                      today_datas[23900])
#         datas = [today_datas[23903:23930], today_datas[23930:23998], today_datas[814:826]]
#         for data in datas:
#             results = RealDelegateOrderPositionManager.compute_l2_place_order_position(code, data, strict_match=True)
#             print(results)
#
#     @unittest.skip("跳过此单元测试")
#     def test_recompute_real_place_order_index(self):
#         """
#         重新计算真实下单位置
#         @return:
#         """
#         code = "600410"
#         l2_data_util.load_l2_data_all()
#         total_datas = l2_data_util.local_today_datas.get(code)
#         order_info = ([(2700, 7.94, 80160), (2900, 7.94, 80165)], total_datas[7723], time.time(), 7722)
#         TradeBuyQueue.get_traded_index = mock.Mock(return_value=(7722, False))
#
#         huaxin_trade_record_manager.DelegateRecordManager.list_current_delegates = mock.Mock(return_value=[{"acceptTime" : "10:41:55","volume":2700},{"acceptTime" : "10:41:55","volume":2900}])
#
#         result = RealDelegateOrderPositionManager().recompute_l2_place_order_position(code, order_info, 7744, 1)
#         print(result)
# class L2DataTest(unittest.TestCase):
from utils import tool
l2_data_manager_dict = {}
def parse_transaction():
    path_str = r"E:\测试数据\transaction_test.csv"
    with open(path_str, 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
        headers = next(csv_reader)
        print("表头:", headers)
        for row in csv_reader:
            item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
                    "TradeVolume": int(row[4]),
                    "OrderTime": int(row[2]), "MainSeq": int(row[6]),
                    "SubSeq": int(row[7]), "BuyNo": int(row[8]),
                    "SellNo": int(row[9]),
                    "ExecType": int(row[5])}
            if item["TradePrice"] <=0:
                continue
            # print(item)
            if item["SecurityID"] not in l2_data_manager_dict:
                l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], True)
            l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
    for code in l2_data_manager_dict:
        __L2TransactionDataManager: L2TransactionDataManager = l2_data_manager_dict.get(code)
        if __L2TransactionDataManager.big_accurate_buy_order_queue.qsize() or __L2TransactionDataManager.big_accurate_sell_order_queue.qsize():
            print(code, "大买单数量", __L2TransactionDataManager.big_accurate_buy_order_queue.qsize())
            print(code, "大卖单数量", __L2TransactionDataManager.big_accurate_sell_order_queue.qsize())
if __name__ == "__main__":
    # codes = ["002194"]
    # for code in codes:
    #     l2_data_util.load_l2_data(code)
    #     L2MarketSellManager().set_current_total_sell_data(code,"10:20:54",1500000,45612,(8.94,1200))
    #     L2TradeDataProcessor.test__compute_active_order_begin_pos(code, 1, 5886, 5896)
    zyltgb = 310*1e8
    zyltgb_unit_y = round(zyltgb / 100000000, 1)
    threshold_big_deal = (2 * zyltgb_unit_y + 339) * 10000
    print(threshold_big_deal)
    print( tool.trade_time_add_second(tool.get_now_time_str(), -3600).replace(":", ""))
    parse_transaction()
test/test_log.py
@@ -1,28 +1,78 @@
from code_attribute.code_data_util import ZYLTGBUtil
from code_attribute.gpcode_manager import CodePrePriceManager
from huaxin_client import l1_subscript_codes_manager
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_error
from third_data import history_k_data_util, kpl_api
import http
import json
import logging
import socketserver
import time
from http.server import BaseHTTPRequestHandler
import dask
from code_attribute.gpcode_manager import BlackListCodeManager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api
from third_data.custom_block_in_money_manager import CodeInMoneyManager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager
from third_data.third_blocks_manager import BlockMapManager
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from trade.buy_strategy import OpenLimitUpGoodBlocksBuyStrategy
from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager
from utils import global_util, tool, data_export_util
from code_attribute import gpcode_manager
from log_module import log_analyse, log_export, async_log_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
from output import code_info_output, limit_up_data_filter, output_util, kp_client_msg_manager
from trade import bidding_money_manager, trade_manager, l2_trade_util, trade_record_log_util, trade_constant, \
    trade_data_manager
import concurrent.futures
# 禁用http.server的日志输出
logger = logging.getLogger("http.server")
logger.setLevel(logging.CRITICAL)
class DataServer(BaseHTTPRequestHandler):
    # 禁用日志输出
    def log_message(self, format, *args):
        pass
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        async_log_util.info(logger_request_api, f"开始请求{tool.get_thread_id()}-{url}")
        response_data = ""
        if url.path == "/get_block_codes_money":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            block = ps_dict['block']
            try:
                pass
                response_data = json.dumps({"code": 0, "data": data})
            except Exception as e:
                logger_debug.exception(e)
                logging.exception(e)
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
def run(addr, port):
    handler = DataServer
    try:
        httpd = ThreadedHTTPServer((addr, port), handler)
        print("HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{port} 启动失败")
if __name__ == "__main__":
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
    codes = set()
    if codes_sh:
        for code_byte in codes_sh:
            codes.add(code_byte.decode())
        for code_byte in codes_sz:
            codes.add(code_byte.decode())
    updated_codes = ZYLTGBUtil.get_today_updated_volume_codes()
    codes = codes - set(updated_codes)
    # 获取最近的价格
    price_datas = history_k_data_util.JueJinApi.get_gp_current_info(codes, "symbol,price")
    price_dict = {x['symbol'].split(".")[1]: x['price'] for x in price_datas}
    for code in price_dict:
        try:
            zylt = kpl_api.getZYLTAmount(code)
            ZYLTGBUtil.save_volume(code, int(zylt / price_dict[code]))
        except:
            pass
    run("", 9004)
test/test_marshal.py
@@ -3,7 +3,7 @@
if __name__ == '__main__':
    bytes = marshal.dumps({"code": 1})
    queue = multiprocessing.Queue()
    queue = multiprocessing.Queue(maxsize=1024)
    queue.put_nowait(bytes)
    d=queue.get()
    print(marshal.loads(d))
test/test_mul_queue.py
@@ -30,7 +30,7 @@
            logging.exception(e)
if __name__ == "__main__":
    q = multiprocessing.Queue()
    q = multiprocessing.Queue(maxsize=1024)
    while True:
        try:
            val = q.get_nowait()
@@ -41,7 +41,7 @@
            print("出错")
if __name__ == "__main__1":
    q = multiprocessing.Queue()
    q = multiprocessing.Queue(maxsize=1024)
    p1 = multiprocessing.Process(target=run_process1, args=(q,))
    p2 = multiprocessing.Process(target=run_process2, args=(q,))
    p3 = multiprocessing.Process(target=run_process3, args=(q,))
test/test_shared_memery.py
@@ -1,23 +1,33 @@
import multiprocessing.shared_memory
import multiprocessing
import time
from multiprocessing import Manager, Process
def producer(shr):
    with shr.txn() as shm:
        for i in range(10):
            shm[i] = i
def producer(d):
    l = []
    for i in range(10):
        d.append(i)
        if i == 4:
            d[:] = []
        time.sleep(0.1)
def consumer(shr):
    with shr.txn() as shm:
        for i in range(10):
            print(shm[i])
def consumer(d):
    for i in range(10000):
        start_time = time.time()
        print(len(d))
        end_time = time.time()
        print("耗时:", end_time - start_time)
        time.sleep(1)
if __name__ == "__main__":
    with multiprocessing.shared_memory.SharedMemory(create=True, size=80) as shr:
        p1 = multiprocessing.Process(target=producer, args=(shr,))
        p2 = multiprocessing.Process(target=consumer, args=(shr,))
    with Manager() as manager:
        d = manager.list()
        p1 = multiprocessing.Process(target=producer, args=(d,))
        for i in range(100):
            p2 = multiprocessing.Process(target=consumer, args=(d,))
            p2.start()
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        input()
test/test_strategy.py
@@ -15,7 +15,7 @@
if __name__ == '__main__':
    dates = history_k_data_util.JueJinApi.get_trading_dates("2024-05-01", "2024-08-19")
    dates = history_k_data_util.HistoryKDatasUtils.get_trading_dates("2024-05-01", "2024-08-19")
    datas_dict = {}
    for day in dates:
        datas = __load_limit_up_records(day)