Administrator
2023-09-27 f5b416d9cb5214ff0e46ca6305098085d2fe8742
L2与交易之间的通信采用队列方式/实现L撤单比例动态计算
12个文件已修改
314 ■■■■■ 已修改文件
huaxin_client/command_manager.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/deal_big_money_manager.py 128 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 64 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py
@@ -74,20 +74,14 @@
        return cls._instance
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, queue_strategy_trade_read: multiprocessing.Queue):
    def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read: multiprocessing.Queue):
        cls.action_callback = trade_action_callback
        cls.pipe_l2 = pipe_l2
        cls.queue_strategy_trade_read = queue_strategy_trade_read
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
        async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}")
        # 查看是否是设置L2的代码
        if _type == CLIENT_TYPE_CMD_L2:
            cls.pipe_l2.send(
                json.dumps({"type": "set_l2_codes", "data": result_json["data"]}))
            return
        try:
            data = result_json["data"]
            request_id = result_json.get('request_id')
huaxin_client/l2_client.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import json
import logging
import multiprocessing
import os
import queue
import threading
@@ -560,11 +561,11 @@
    api.Init()
def __receive_from_pipe_trade(pipe):
def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            value = pipe.recv()
            value = queue_trade_w_l2_r.get()
            if value:
                value = value.decode("utf-8")
                data = json.loads(value)
@@ -594,13 +595,13 @@
pipe_strategy = None
def run(pipe_trade, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None:
def run(queue_trade_w_l2_r:multiprocessing.Queue, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        if pipe_trade is not None:
            t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True)
        if queue_trade_w_l2_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True)
            t1.start()
        if _pipe_strategy is not None:
            global pipe_strategy
huaxin_client/l2_data_manager.py
@@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
import contextlib
import json
import logging
import mmap
import queue
import random
import threading
huaxin_client/trade_client.py
@@ -157,8 +157,6 @@
        ret = api.ReqOrderInsert(req_field, self.req_id)
        if ret != 0:
            raise Exception('ReqOrderInsert fail, ret[%d]' % ret)
        if l2pipe is not None:
            l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8'))
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束")
        return
@@ -623,7 +621,13 @@
                                   pOrderField.OrderRef, pOrderField.OrderLocalID,
                                   pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
                                   pOrderField.OrderStatus, pOrderField.InsertTime))
            if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown:
            if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_Unknown:
                if queue_trade_w_l2_r is not None:
                    queue_trade_w_l2_r.put_nowait(
                        json.dumps({"type": "listen_volume", "data": {"code": pOrderField.SecurityID,
                                                                      "volume": pOrderField.VolumeTotalOriginal}}).encode(
                            'utf-8'))
            else:
                order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID,
                              "orderLocalID": pOrderField.OrderLocalID,
                              "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
@@ -1071,14 +1075,15 @@
addr, port = constant.SERVER_IP, constant.SERVER_PORT
def run(trade_response_: TradeResponse = None, pipe_l2=None, queue_strategy_trade_write_=None,
def run(trade_response_: TradeResponse = None, queue_trade_w_l2_r_: multiprocessing.Queue = None,
        queue_strategy_trade_write_=None,
        queue_strategy_trade_read=None):
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
        __init_trade_data_server()
        global l2pipe
        l2pipe = pipe_l2
        global queue_trade_w_l2_r
        queue_trade_w_l2_r = queue_trade_w_l2_r_
        global queue_strategy_trade_write
        queue_strategy_trade_write = queue_strategy_trade_write_
@@ -1091,7 +1096,7 @@
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), l2pipe, queue_strategy_trade_read)
        tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read)
        logger_system.info("华鑫交易服务启动")
        tradeCommandManager.run()
    except Exception as e:
inited_data.py
@@ -12,6 +12,7 @@
from db.redis_manager_delegate import RedisUtils
from ths import client_manager
import constant
from trade.deal_big_money_manager import DealOrderNoManager
from trade.trade_manager import AccountAvailableMoneyManager
from utils import global_util, tool
import threading
@@ -73,7 +74,7 @@
    AccountAvailableMoneyManager()
    # 9点25之前删除所有代码
    if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0 or True:
    if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0:
        # 删除L2监听代码
        gpcode_manager.clear_listen_codes()
        # 删除首板代码
@@ -94,6 +95,8 @@
        LCancelBigNumComputer().clear()
        # 清除D撤数据
        DCancelBigNumComputer().clear()
        # 清除大单成交数据
        DealOrderNoManager().clear()
# 每日初始化
l2/cancel_buy_strategy.py
@@ -692,13 +692,18 @@
    # 设置板块涨停数量(除开自己)
    @classmethod
    def set_block_limit_up_count(cls, code, count):
        cls.__block_limit_up_count_dict[code] = count
    def set_block_limit_up_count(cls, reason_codes_dict):
        for reason in reason_codes_dict:
            codes = reason_codes_dict[reason]
            for c in codes:
                cls.__block_limit_up_count_dict[c] = len(codes) - 1
    # 设置大单成交金额➗固定m值比例
    @classmethod
    def set_big_num_deal_rate(cls, code, rate):
        cls.__big_num_deal_rate_dict[code] = rate
        logger_l2_l_cancel.debug(code, f"设置大单成交金额比值:{rate}")
# 计算成交位置之后的大单(特定笔数)的撤单比例
@@ -871,8 +876,8 @@
        MAX_COUNT = 10
        watch_indexes = set()
        total_num = 0
        thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
        threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
        # thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
        # threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
        for i in range(start_index, end_index):
            data = total_datas[i]
            val = data['val']
@@ -888,7 +893,7 @@
            if left_count > 0:
                total_num += val['num'] * left_count
                watch_indexes.add(i)
                if len(watch_indexes) >= MAX_COUNT or total_num >= threshold_num:
                if len(watch_indexes) >= MAX_COUNT:
                    break
        # 保存数据
        l2_log.l_cancel_debug(code, f"设置成交位临近撤单监控范围:{watch_indexes} 计算范围:{start_index}-{end_index}")
@@ -1031,6 +1036,8 @@
    # 开始撤单
    def start_cancel(self, code, buy_no, total_datas, buy_order_no_map, local_operate_map, m_val_num):
        # TODO 暂时注释掉G撤
        return False,"暂时不执行G撤"
        thresh_num = int(m_val_num * 1)
        place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
        if place_order_index is None:
l2/l2_data_manager_new.py
@@ -213,7 +213,7 @@
    __last_buy_single_dict = {}
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    __latest_process_order_unique_keys = {}
    __latest_process_not_order_unique_keys = {}
    __latest_process_not_order_unique_keys_count = {}
    # 初始化
    __TradePointManager = l2_data_manager.TradePointManager()
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
@@ -762,7 +762,8 @@
                return False, True, f"尚未获取到当前成交价"
            if float(limit_up_price) - float(trade_price) > 0.00001:
                # 计算信号起始位置到当前的手数
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(code)
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                    code)
                num_operate_map = local_today_num_operate_map.get(code)
                total_num = 0
                for i in range(buy_single_index, total_data[-1]["index"] + 1):
@@ -776,9 +777,9 @@
                                                                                                          num_operate_map)
                    total_num += left_count * val["num"]
                m_base_val = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
                thresh_hold_num = m_base_val//(float(gpcode_manager.get_limit_up_price(code))*100)
                thresh_hold_num = m_base_val // (float(gpcode_manager.get_limit_up_price(code)) * 100)
                if total_num < thresh_hold_num * 2:
                    return False, False, f"当前成交价({trade_price})尚未在0档及以内 且 纯买额({total_num})小于2倍M值({thresh_hold_num*2})"
                    return False, False, f"当前成交价({trade_price})尚未在0档及以内 且 纯买额({total_num})小于2倍M值({thresh_hold_num * 2})"
            # 判断成交进度是否距离我们的位置很近
            trade_index, is_default = cls.__TradeBuyQueue.get_traded_index(code)
            if False and not is_default and trade_index:
@@ -946,12 +947,14 @@
        if compute_end_index < compute_start_index:
            return
        unique_key = f"{compute_start_index}-{compute_end_index}"
        if cls.__latest_process_not_order_unique_keys.get(code) == unique_key:
        unique_key = f"{code}-{compute_start_index}-{compute_end_index}"
        if cls.__latest_process_not_order_unique_keys_count.get(unique_key) and cls.__latest_process_not_order_unique_keys_count.get(unique_key) > 2:
            async_log_util.error(logger_l2_error,
                                 f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}")
            return
        cls.__latest_process_not_order_unique_keys[code] = unique_key
        if unique_key not in cls.__latest_process_not_order_unique_keys_count:
            cls.__latest_process_not_order_unique_keys_count[unique_key] = 0
        cls.__latest_process_not_order_unique_keys_count[unique_key] += 1
        _start_time = tool.get_now_timestamp()
        total_datas = local_today_datas[code]
@@ -972,7 +975,8 @@
            if cls.__last_buy_single_dict.get(code) == _index:
                has_single = None
                _index = None
            if _index == 106:
                print("进入调试")
            buy_single_index = _index
            if has_single:
                cls.__last_buy_single_dict[code] = buy_single_index
@@ -1069,6 +1073,7 @@
    # compute_data_count 用于计算的l2数据数量
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
        second_930 = 9 * 3600 + 30 * 60 + 0
        # 倒数100条数据查询
        datas = local_today_datas[code]
@@ -1093,7 +1098,9 @@
                    continue
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 寻找前面continue_count-1个涨停买
                # for j in range(start_index - 1, -1, -1):
                #     if  datas[j]["val"]
                if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]):
                    if start is None:
                        start = i
main.py
@@ -22,7 +22,7 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, queue_strategy_r_trade_w, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, queue_strategy_w_trade_r):
def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue):
    logger_system.info("策略进程ID:{}", os.getpid())
    log.close_print()
    # 初始化参数
@@ -43,7 +43,7 @@
    #
    # 启动L2订阅服务
    t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client",
                          args=(ptl2_l2, psl2_l2, huaxin_trade_server.my_l2_data_callback),
                          args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback),
                          daemon=True)
    t1.start()
    #
@@ -83,8 +83,8 @@
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # 交易与l2之间的通信
        ptl2_trade, ptl2_l2 = multiprocessing.Pipe()
        # 交易写L2读
        queue_trade_w_l2_r = multiprocessing.Queue()
        # 策略与l2之间的通信
        psl2_strategy, psl2_l2 = multiprocessing.Pipe()
@@ -107,11 +107,11 @@
        # 交易进程
        tradeProcess = multiprocessing.Process(
            target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
            target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r))
        tradeProcess.start()
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, queue_strategy_w_trade_r)
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r)
        # 将tradeServer作为主进程
        l1Process.join()
test/l2_trade_test.py
@@ -86,8 +86,7 @@
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        threading.Thread(target=async_log_util.run_sync,daemon=True).start()
        code = "000010"
        code = "002640"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
third_data/data_server.py
@@ -10,7 +10,7 @@
from utils import global_util, tool
from code_attribute import gpcode_manager
from log_module import log, log_analyse, log_export
from l2 import code_price_manager, l2_data_util, l2_data_manager_new
from l2 import code_price_manager, l2_data_util, l2_data_manager_new, cancel_buy_strategy
from l2.cancel_buy_strategy import HourCancelBigNumComputer
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api, block_info
@@ -495,6 +495,18 @@
                        code_price_manager.Buy1PriceManager().set_limit_up_time(code, limit_up_time)
                add_codes = codes_set - self.__latest_limit_up_codes_set
                self.__latest_limit_up_codes_set = codes_set
                if limit_up_reasons:
                    # 统计涨停原因的票的个数
                    limit_up_reason_code_dict = {}
                    for code in limit_up_reasons:
                        b = limit_up_reasons[code]
                        if b not in limit_up_reason_code_dict:
                            limit_up_reason_code_dict[b] = set()
                        limit_up_reason_code_dict[b].add(code)
                    cancel_buy_strategy.LCancelRateManager.set_block_limit_up_count(limit_up_reason_code_dict)
                if add_codes:
                    for code in add_codes:
                        # 根据涨停原因判断是否可以买
@@ -510,7 +522,7 @@
                                        if not current_limit_up_datas:
                                            current_limit_up_datas = []
                                        if not limit_up_record_datas:
                                            limit_up_record_datas=[]
                                            limit_up_record_datas = []
                                        if CodePlateKeyBuyManager.is_need_cancel(code, limit_up_reasons.get(code),
                                                                                 current_limit_up_datas,
                                                                                 limit_up_record_datas,
@@ -518,7 +530,9 @@
                                                                                 before_blocks_dict):
                                            pass
                                            # TODO 测试暂时注释
                                            # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, f"涨停原因({ limit_up_reasons.get(code)})不是老大撤单", "板块撤")
                                            l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code,
                                                                                                f"涨停原因({limit_up_reasons.get(code)})不是老大撤单",
                                                                                                "板块撤")
                            except Exception as e:
                                logger_debug.exception(e)
                kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list_)
trade/deal_big_money_manager.py
@@ -9,16 +9,17 @@
from l2 import l2_data_util, l2_data_source_util
class DealComputeProgressManager:
# 成交进度计算
class DealOrderNoManager:
    __db = 2
    __redisManager = redis_manager.RedisManager(2)
    __deal_compute_progress_cache = {}
    __deal_orderno_cache = {}
    __last_progress = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(DealComputeProgressManager, cls).__new__(cls, *args, **kwargs)
            cls.__instance = super(DealOrderNoManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
@@ -30,90 +31,63 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "deal_compute_info-*")
            keys = RedisUtils.keys(__redis, "deal_orderno-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = RedisUtils.smembers(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__deal_compute_progress_cache, code, val)
                val = set(val)
                tool.CodeDataCacheUtil.set_cache(cls.__deal_orderno_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 获取成交计算进度
    def __get_deal_compute_progress(self, code):
        val = RedisUtils.get(self.__get_redis(), f"deal_compute_info-{code}")
        if val is None:
            return -1, 0
        val = json.loads(val)
        return val[0], val[1]
    # 添加订单号
    def __add_orderno(self, code, orderno):
        RedisUtils.sadd_async(self.__db, f"deal_orderno-{code}", orderno)
        RedisUtils.expire_async(self.__db, f"deal_orderno-{code}", tool.get_expire())
    # 获取成交计算进度
    def get_deal_compute_progress_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__deal_compute_progress_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return -1, 0
    # 移除订单号
    def __remove_orderno(self, code, orderno):
        RedisUtils.srem_async(self.__db, f"deal_orderno-{code}", orderno)
        RedisUtils.expire_async(self.__db, f"deal_orderno-{code}", tool.get_expire())
    # 清除数据
    def clear(self):
        self.__deal_orderno_cache.clear()
        keys = RedisUtils.keys(self.__get_redis(), "deal_orderno-*")
        for k in keys:
            RedisUtils.delete_async(self.__db, k)
    def remove_orderno(self, code, orderno):
        if code in self.__deal_orderno_cache:
            if orderno in self.__deal_orderno_cache[code]:
                self.__deal_orderno_cache[code].discard(orderno)
                self.__remove_orderno(code, orderno)
    def add_orderno(self, code, orderno):
        if code not in self.__deal_orderno_cache:
            self.__deal_orderno_cache[code] = set()
        self.__deal_orderno_cache[code].add(orderno)
        self.__add_orderno(code, orderno)
    # 设置成交进度
    def __set_deal_compute_progress(self, code, index, money):
        tool.CodeDataCacheUtil.set_cache(self.__deal_compute_progress_cache, code, (index, money))
        RedisUtils.setex_async(self.__db, f"deal_compute_info-{code}", tool.get_expire(), json.dumps((index, money)))
    # 设置成交进度
    def set_trade_progress(self, code, progress, total_data, local_today_num_operate_map):
        if self.__last_progress.get(code) == progress:
            return
        self.__last_progress[code] = progress
        # 计算从开始位置到成交位置
        c_index, deal_num = self.get_deal_compute_progress_cache(code)
        process_index = c_index
        for i in range(c_index + 1, progress):
            data = total_data[i]
            val = data['val']
            process_index = i
            # 是否有大单
            if not l2_data_util.is_big_money(val):
                continue
            if l2_data_util.L2DataUtil.is_limit_up_price_buy(val):
                # 是否已经取消
                cancel_data = self.__get_cancel_data(code, data, local_today_num_operate_map)
                if cancel_data is None:
                    deal_num += val["num"] * data["re"]
                    self.__save_traded_index(code, data["index"])
        self.__set_deal_compute_progress(code, process_index, deal_num)
    def get_deal_big_money_num(self, code):
        if code in self.__deal_compute_progress_cache:
            return self.__deal_compute_progress_cache.get(code)[1]
        compute_index, num = self.get_deal_compute_progress_cache(code)
        return num
    def __get_cancel_data(self, code, buy_data, local_today_num_operate_map):
        val = buy_data['val']
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                 local_today_num_operate_map)
                if buy_index == buy_data["index"]:
                    return cancel_data
        return None
    def __save_traded_index(self, code, index):
        RedisUtils.sadd(self.__get_redis(), f"deal_indexes-{code}", index)
        RedisUtils.expire(self.__get_redis(), f"deal_indexes-{code}", tool.get_expire())
    def __get_traded_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"deal_indexes-{code}")
    # 获取成交的索引
    def get_traded_indexes(self, code):
        return self.__get_traded_indexes(code)
    def get_deal_nums(self, code, orderno_map: dict):
        if code not in self.__deal_orderno_cache:
            return 0
        total_num = 0
        for orderno in self.__deal_orderno_cache[code]:
            if orderno_map:
                if orderno in orderno_map:
                    data = orderno_map[orderno]
                    total_num += data["val"]["num"] * data["re"]
        return total_num
def get_deal_big_money_num(code):
    val = DealComputeProgressManager().get_deal_compute_progress_cache(code)
    return val[1]
    val = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code))
    return val
if __name__ == "__main__":
    pass
trade/huaxin/huaxin_trade_server.py
@@ -22,7 +22,7 @@
import constant
import inited_data
import outside_api_command_manager
from code_attribute import gpcode_manager, code_volumn_manager
from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol
@@ -31,10 +31,11 @@
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress, \
    l2_data_log
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
    GCancelBigNumComputer, SecondCancelBigNumComputer
    GCancelBigNumComputer, SecondCancelBigNumComputer, LCancelRateManager
from l2.huaxin import huaxin_target_codes_manager
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
from l2.l2_data_manager_new import L2TradeDataProcessor
from l2.l2_data_util import L2DataUtil
from log_module import async_log_util, log_export
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue, \
@@ -44,7 +45,9 @@
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util
from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util, \
    l2_trade_factor
from trade.deal_big_money_manager import DealOrderNoManager
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \
    huaxin_trade_record_manager
@@ -314,6 +317,8 @@
                        buyno_map = l2_data_util.local_today_buyno_map.get(code)
                async_log_util.info(hx_logger_l2_transaction,
                                    f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
                if buyno_map is None:
                    buyno_map = {}
                buy_progress_index = None
                for i in range(len(datas) - 1, -1, -1):
                    d = datas[i]
@@ -322,6 +327,30 @@
                        async_log_util.info(hx_logger_l2_transaction, f"{code}成交进度:{buyno_map[buy_no]['index']}")
                        buy_progress_index = buyno_map[buy_no]["index"]
                        break
                # ------统计保存成交大单订单号------
                origin_ordernos = set()
                for d in datas:
                    buy_no = f"{d[6]}"
                    origin_ordernos.add(buy_no)
                big_money_count = 0
                for buy_no in origin_ordernos:
                    # 查询是否为大单
                    if buy_no in buyno_map:
                        data = buyno_map[buy_no]
                        val = data["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        if l2_data_util.is_big_money(val):
                            # 涨停买的大单
                            big_money_count += 1
                            DealOrderNoManager().add_orderno(code, buy_no)
                if big_money_count > 0:
                    total_deal_nums = DealOrderNoManager().get_deal_nums(code, buyno_map)
                    thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price:
                        rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
                        LCancelRateManager().set_big_num_deal_rate(code, rate)
                if buy_progress_index is not None:
                    # 获取执行位时间
@@ -403,7 +432,13 @@
        m_val = L2PlaceOrderParamsManager.get_base_m_val(code)
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        m_val_num = int(m_val / (float(limit_up_price) * 100))
        # 处理成交大单
        DealOrderNoManager().remove_orderno(code, f"{order_no}")
        total_deal_nums = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code))
        thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
        if limit_up_price:
            rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
            LCancelRateManager().set_big_num_deal_rate(code, rate)
        try:
            need_cancel, msg = cls.__GCancelBigNumComputer.start_cancel(code, f"{order_no}",
                                                                        l2_data_util.local_today_datas.get(
@@ -861,9 +896,18 @@
if __name__ == "__main__":
    print(int(tool.to_time_str(1684132912).replace(":", "")))
    # code = "000536"
    # TradeServerProcessor.l2_transaction(code,
    #                                     [('000536', 2.08, 20000, 143436460, 2014, 31126359, 16718956, 31126358, '1')])
    while True:
        time.sleep(10)
    code = "002640"
    global_data_loader.init()
    l2_data_util.load_l2_data(code, False, False)
    # DealOrderNoManager().add_orderno(code, "18972810")
    # DealOrderNoManager().add_orderno(code, "18972232")
    # DealOrderNoManager().add_orderno(code, "18972434")
    total_deal_nums = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code))
    print("成交大单手数", total_deal_nums)
    thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    if limit_up_price:
        rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
        LCancelRateManager().set_big_num_deal_rate(code, rate)
        print("撤单比例", LCancelRateManager().get_cancel_rate(code))