Administrator
2023-10-16 dfc036d63be5d12261ca14c3abb8959a547ba5ee
L前撤修改/华鑫逐笔成交数据处理独立
4个文件已修改
1个文件已添加
306 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 86 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 81 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -842,26 +842,33 @@
        total_datas = local_today_datas.get(code)
        if total_datas:
            # 计算的上截至位距离下截至位纯买额要小于2.5倍m值
            thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
            thresh_hold_money = int(thresh_hold_money * 2.5)
            threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
            # thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
            # thresh_hold_money = int(thresh_hold_money * 2.5)
            min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
            # 统计净涨停买的数量
            total_num = 0
            not_cancel_indexes = []
            re_start_index = start_index
            for j in range(end_index, start_index, -1):
            for j in range(start_index, end_index):
                data = total_datas[j]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if val["num"] < min_num:
                    continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
                                                                                                         j,
                                                                                                         total_datas,
                                                                                                         local_today_canceled_buyno_map.get(
                                                                                                             code))
                if left_count > 0:
                    total_num += val['num']
                    if total_num >= threshold_num:
                        re_start_index = j - 1
                    not_cancel_indexes.append(j)
            if not_cancel_indexes:
                temp_count = len(not_cancel_indexes)
                # 取后1/3的数据
                temp_index = int(temp_count * 2 / 3)
                re_start_index = not_cancel_indexes[temp_index]
            MIN_MONEYS = [300, 200, 100, 50]
            watch_indexes = set()
@@ -899,13 +906,12 @@
    # 设置真实下单位置
    def set_real_place_order_index(self, code, index, buy_single_index=None):
        self.__real_place_order_index_dict[code] = index
        if buy_single_index:
            self.compute_watch_index(code, buy_single_index, index)
        if self.__last_trade_progress_dict.get(code):
            self.compute_watch_index(code, self.__last_trade_progress_dict.get(code), index)
            self.__compute_trade_progress_near_by_indexes(code, self.__last_trade_progress_dict.get(code) + 1, index)
        else:
            if buy_single_index:
                self.compute_watch_index(code, buy_single_index, index)
                self.__compute_trade_progress_near_by_indexes(code, buy_single_index, index)
            self.__compute_trade_progress_near_by_indexes(code, buy_single_index, index)
    # 计算范围内的成交位临近未撤大单
    def __compute_trade_progress_near_by_indexes(self, code, start_index, end_index):
@@ -947,18 +953,19 @@
        # 重新计算成交位置临近大单撤单
        self.__compute_trade_progress_near_by_indexes(code, index + 1, self.__real_place_order_index_dict.get(code))
        try:
            # 已经有计算的无法触发计算
            old_watch_indexes = self.__get_watch_indexes_cache(code)
            if old_watch_indexes and self.__last_trade_progress_dict.get(code):
                return
        finally:
            self.__last_trade_progress_dict[code] = index
        if self.__real_place_order_index_dict.get(code):
            # 触发计算
            self.compute_watch_index(code, self.__last_trade_progress_dict.get(code),
                                     self.__real_place_order_index_dict.get(code))
        # 成交进度与L下撤无关
        # try:
        #     # 已经有计算的无法触发计算
        #     old_watch_indexes = self.__get_watch_indexes_cache(code)
        #     if old_watch_indexes and self.__last_trade_progress_dict.get(code):
        #         return
        # finally:
        #     self.__last_trade_progress_dict[code] = index
        #
        # if self.__real_place_order_index_dict.get(code):
        #     # 触发计算
        #     self.compute_watch_index(code, self.__last_trade_progress_dict.get(code),
        #                              self.__real_place_order_index_dict.get(code))
    def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
        watch_indexes = self.__get_watch_indexes_cache(code)
@@ -1018,10 +1025,16 @@
        watch_indexes = self.__get_near_by_trade_progress_indexes_cache(code)
        if not watch_indexes:
            return False, None
        # 计算监听的总条数
        total_count = 0
        for wi in watch_indexes:
            total_count += total_data[wi]["re"]
        # 权重
        WATCH_INDEX_WEIGHTS = [3, 2, 1]
        total_count_weight = 0
        for wi in range(0, len(watch_indexes)):
            if wi < len(WATCH_INDEX_WEIGHTS):
                total_count_weight += WATCH_INDEX_WEIGHTS[wi]
            else:
                total_count_weight += WATCH_INDEX_WEIGHTS[-1]
        # 判断撤单中是否有监听中的索引
        need_compute = False
        for i in range(start_index, end_index + 1):
@@ -1036,8 +1049,10 @@
                    need_compute = True
                    break
        if need_compute:
            watch_indexes_list = list(watch_indexes)
            watch_indexes_list.sort()
            # 计算撤单比例
            canceled_count = 0
            canceled_count_weight = 0
            canceled_indexes = []
            for wi in watch_indexes:
                canceled_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
@@ -1047,8 +1062,13 @@
                                                                                                            code))
                if canceled_data:
                    canceled_indexes.append(canceled_data["index"])
                    canceled_count += total_data[wi]["re"]
            rate = round(canceled_count / total_count, 3)
                    canceled_count_weight += total_data[wi]["re"]
                    pos_index = watch_indexes_list.index(wi)
                    if pos_index < len(WATCH_INDEX_WEIGHTS):
                        canceled_count_weight += WATCH_INDEX_WEIGHTS[pos_index]
                    else:
                        canceled_count_weight += WATCH_INDEX_WEIGHTS[-1]
            rate = round(canceled_count_weight / total_count_weight, 3)
            thresh_cancel_rate = LCancelRateManager.get_cancel_rate(code, is_up=True)
            l2_log.l_cancel_debug(code, f"计算范围:{start_index}-{end_index},成交位临近已撤单比例:{rate}/{thresh_cancel_rate}")
            if rate >= thresh_cancel_rate:
@@ -1058,7 +1078,7 @@
                if real_place_order_index and trade_progress_index:
                    total_num = 0
                    thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
                    thresh_hold_money = thresh_hold_money * 2
                    thresh_hold_money = thresh_hold_money * 3
                    # 阈值为2倍m值
                    thresh_hold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100)
                    for i in range(trade_progress_index + 1, real_place_order_index):
@@ -1089,10 +1109,8 @@
    def need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
        if buy_exec_index is None:
            return False, "尚未找到下单位置", ""
        time_space = tool.trade_time_sub(total_data[start_index]["val"]["time"],
                                         total_data[buy_exec_index]["val"]["time"])
        # 守护S撤以外的数据
        if int(tool.get_now_time_str().replace(":", "")) > int("145700"):
        if int(tool.get_now_time_str().replace(":", "")) > int("145700") and not constant.TEST:
            return False, None, ""
        # 下单位临近撤
        can_cancel, cancel_data = False, None
l2/l2_transaction_data_manager.py
New file
@@ -0,0 +1,105 @@
"""
L2成交数据处理器
"""
import time
from code_attribute import gpcode_manager
from l2 import l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import LCancelRateManager, DCancelBigNumComputer, LCancelBigNumComputer, \
    SecondCancelBigNumComputer
from l2.l2_data_manager_new import L2TradeDataProcessor
from l2.l2_data_util import L2DataUtil
from log_module import async_log_util
from log_module.log import hx_logger_l2_transaction, logger_l2_trade_buy_queue, hx_logger_l2_upload
from trade import current_price_process_manager, trade_manager, l2_trade_factor
from trade.deal_big_money_manager import DealOrderNoManager
from trade.l2_trade_factor import L2PlaceOrderParamsManager
class HuaXinTransactionDatasProcessor:
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    @classmethod
    def process_huaxin_transaction_datas(cls, code, datas):
        # 设置成交价
        current_price_process_manager.set_trade_price(code, datas[-1][1])
        total_datas = l2_data_util.local_today_datas.get(code)
        __start_time = time.time()
        try:
            buyno_map = l2_data_util.local_today_buyno_map.get(code)
            if not buyno_map:
                if trade_manager.CodesTradeStateManager().get_trade_state(
                        code) != trade_manager.TRADE_STATE_NOT_TRADE:
                    l2_data_util.load_l2_data(code)
                    buyno_map = l2_data_util.local_today_buyno_map.get(code)
            # async_log_util.debug(hx_logger_l2_transaction,
            #                     f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
            if buyno_map is None:
                buyno_map = {}
            buy_progress_index = None
            for i in range(len(datas) - 1, -1, -1):
                d = datas[i]
                buy_no = f"{d[6]}"
                if buyno_map and buy_no in buyno_map:
                    async_log_util.info(hx_logger_l2_transaction, f"{code}成交进度:{buyno_map[buy_no]['index']}")
                    buy_progress_index = buyno_map[buy_no]["index"]
                    break
            # ------统计保存成交大单订单号------
            origin_ordernos = set()
            for d in datas:
                buy_no = f"{d[6]}"
                origin_ordernos.add(buy_no)
            # ---------------------处理成交大单----------------------
            big_money_count = 0
            for buy_no in origin_ordernos:
                # 查询是否为大单
                if buy_no in buyno_map:
                    data = buyno_map[buy_no]
                    val = data["val"]
                    if not L2DataUtil.is_limit_up_price_buy(val):
                        continue
                    if l2_data_util.is_big_money(val):
                        # 涨停买的大单
                        big_money_count += 1
                        DealOrderNoManager().add_orderno(code, buy_no)
            if big_money_count > 0:
                # 统计大单/m值成交比
                total_deal_nums = DealOrderNoManager().get_deal_nums(code, buyno_map)
                thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
                limit_up_price = gpcode_manager.get_limit_up_price(code)
                if limit_up_price:
                    rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
                    LCancelRateManager().set_big_num_deal_rate(code, rate)
            if buy_progress_index is not None:
                # 获取执行位时间
                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                    code)
                cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                    buy_progress_index)
                limit_up_price = gpcode_manager.get_limit_up_price(code)
                if buy_exec_index and buy_exec_index > -1:
                    m_base_val = L2PlaceOrderParamsManager.get_base_m_val(code)
                    need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                  buy_progress_index,
                                                                                  buy_exec_index,
                                                                                  total_datas,
                                                                                  m_base_val,
                                                                                  limit_up_price)
                    if need_cancel:
                        L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                LCancelBigNumComputer().set_trade_progress(code, buy_progress_index, total_datas)
                SecondCancelBigNumComputer().set_transaction_index(
                    code,
                    buy_progress_index)
            else:
                pass
        except Exception as e:
            hx_logger_l2_transaction.exception(e)
        finally:
            use_time = int((time.time() - __start_time) * 1000)
            if use_time > 10:
                async_log_util.info(hx_logger_l2_upload, f"{code}处理成交用时:{use_time}")
test/l2_trade_test.py
@@ -84,13 +84,18 @@
                except Exception as e:
                    pass
    @unittest.skip("跳过此单元测试")
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()
        code = "603178"
        code = "002771"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
        l2.l2_data_util.local_today_num_operate_map.get(code).clear()
        l2.l2_data_util.local_today_buyno_map.get(code).clear()
        l2.l2_data_util.local_today_canceled_buyno_map.get(code).clear()
        if total_datas[0]["index"] > 0:
            # 拼接数据
            for i in range(0, total_datas[0]["index"]):
@@ -171,6 +176,7 @@
            l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0,
                                                                          0)
    @unittest.skip("跳过此单元测试")
    def test_place_order(self):
        code = "002241"
        l2.l2_data_util.load_l2_data(code)
trade/huaxin/huaxin_trade_api.py
@@ -10,6 +10,8 @@
import time
import concurrent.futures
from code_attribute import gpcode_manager
from huaxin_client import constant as huaxin_client_constant
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade, \
    logger_system
@@ -65,16 +67,28 @@
                                acceptTime = data.get("acceptTime")
                                insertDate = data.get("insertDate")
                                direction = data.get("direction")
                                limitPrice = data.get("limitPrice")
                                volume = data.get("volume")
                                order = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID,
                                                          insertTime=insertTime, acceptTime=acceptTime,
                                                          insertDate=insertDate, direction=direction)
                                TradeResultProcessor.process_order(order)
                                # 订单相关回调
                                # 重新请求委托列表与资金
                                huaxin_trade_data_update.add_delegate_list("来自交易管道")
                                huaxin_trade_data_update.add_deal_list()
                                huaxin_trade_data_update.add_money_list()
                                # 获取涨停价
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price and volume == huaxin_client_constant.SHADOW_ORDER_VOLUME and abs(
                                        float(limitPrice) - float(limit_up_price)) < 0.01:
                                    # 影子订单变化
                                    # 如果是影子订单就不应该更新
                                    pass
                                else:
                                    # 订单相关回调
                                    # 重新请求委托列表与资金
                                    huaxin_trade_data_update.add_delegate_list("来自交易管道")
                                    huaxin_trade_data_update.add_deal_list()
                                    huaxin_trade_data_update.add_money_list()
                            # print("响应结果:", data_json['data'])
                        finally:
                            pass
trade/huaxin/huaxin_trade_server.py
@@ -38,6 +38,7 @@
from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager
from l2.l2_data_manager_new import L2TradeDataProcessor
from l2.l2_data_util import L2DataUtil
from l2.l2_transaction_data_manager import HuaXinTransactionDatasProcessor
from log_module import async_log_util, log_export
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue, \
@@ -306,85 +307,7 @@
    def l2_transaction(cls, code, datas):
        async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}")
        if datas:
            # 设置成交价
            current_price_process_manager.set_trade_price(code, datas[-1][1])
            total_datas = l2_data_util.local_today_datas.get(code)
            __start_time = time.time()
            try:
                buyno_map = l2_data_util.local_today_buyno_map.get(code)
                if not buyno_map:
                    if trade_manager.CodesTradeStateManager().get_trade_state(
                            code) != trade_manager.TRADE_STATE_NOT_TRADE:
                        l2_data_util.load_l2_data(code)
                        buyno_map = l2_data_util.local_today_buyno_map.get(code)
                async_log_util.info(hx_logger_l2_transaction,
                                    f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
                if buyno_map is None:
                    buyno_map = {}
                buy_progress_index = None
                for i in range(len(datas) - 1, -1, -1):
                    d = datas[i]
                    buy_no = f"{d[6]}"
                    if buyno_map and buy_no in buyno_map:
                        async_log_util.info(hx_logger_l2_transaction, f"{code}成交进度:{buyno_map[buy_no]['index']}")
                        buy_progress_index = buyno_map[buy_no]["index"]
                        break
                # ------统计保存成交大单订单号------
                origin_ordernos = set()
                for d in datas:
                    buy_no = f"{d[6]}"
                    origin_ordernos.add(buy_no)
                big_money_count = 0
                for buy_no in origin_ordernos:
                    # 查询是否为大单
                    if buy_no in buyno_map:
                        data = buyno_map[buy_no]
                        val = data["val"]
                        if not L2DataUtil.is_limit_up_price_buy(val):
                            continue
                        if l2_data_util.is_big_money(val):
                            # 涨停买的大单
                            big_money_count += 1
                            DealOrderNoManager().add_orderno(code, buy_no)
                if big_money_count > 0:
                    total_deal_nums = DealOrderNoManager().get_deal_nums(code, buyno_map)
                    thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price:
                        rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
                        LCancelRateManager().set_big_num_deal_rate(code, rate)
                if buy_progress_index is not None:
                    # 获取执行位时间
                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                        code)
                    cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                    num_operate_map = l2_data_util.local_today_num_operate_map.get(
                        code)
                    async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                        buy_progress_index)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if buy_exec_index and buy_exec_index > -1:
                        m_base_val = L2PlaceOrderParamsManager.get_base_m_val(code)
                        need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                      buy_progress_index,
                                                                                      buy_exec_index,
                                                                                      total_datas,
                                                                                      m_base_val,
                                                                                      limit_up_price)
                        if need_cancel:
                            L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                    LCancelBigNumComputer().set_trade_progress(code, buy_progress_index, total_datas)
                    SecondCancelBigNumComputer().set_transaction_index(
                        code,
                        buy_progress_index)
                else:
                    pass
            except Exception as e:
                hx_logger_l2_transaction.exception(e)
            finally:
                async_log_util.info(hx_logger_l2_upload, f"{code}处理成交用时:{int((time.time() - __start_time) * 1000)}")
            HuaXinTransactionDatasProcessor.process_huaxin_transaction_datas(code,datas)
    @classmethod
    def l2_market_data(cls, code, data):