Administrator
2024-01-10 a3a15c958b4a3e04f6dd90ea52904ff1a48f75a9
G撤修改
3个文件已修改
1个文件已添加
109 ■■■■■ 已修改文件
l2/cancel_buy_strategy.py 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
msg/push_msg_manager.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py
@@ -35,7 +35,7 @@
    SecondCancelBigNumComputer().set_real_place_order_index(code, index)
    LCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index=buy_single_index)
    HourCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index)
    GCancelBigNumComputer().set_real_place_order_index(code, index)
    GCancelBigNumComputer().set_real_place_order_index(code, index, buy_single_index)
class SecondCancelBigNumComputer:
@@ -1680,6 +1680,9 @@
# ---------------------------------G撤-------------------------------
class GCancelBigNumComputer:
    __real_place_order_index_dict = {}
    __trade_progress_index_dict = {}
    __watch_indexes_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
@@ -1687,15 +1690,65 @@
            cls.__instance = super(GCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def set_real_place_order_index(self, code, index):
    def set_real_place_order_index(self, code, index, buy_single_index):
        self.__real_place_order_index_dict[code] = index
        start_index = buy_single_index
        if code in self.__trade_progress_index_dict:
            start_index = self.__trade_progress_index_dict.get(code)
        self.__commpute_watch_indexes(code, start_index, index)
    def clear(self, code=None):
        if code:
            if code in self.__real_place_order_index_dict:
                self.__real_place_order_index_dict.pop(code)
            if code in self.__watch_indexes_dict:
                self.__watch_indexes_dict.pop(code)
            if code in self.__trade_progress_index_dict:
                self.__trade_progress_index_dict.pop(code)
        else:
            self.__real_place_order_index_dict.clear()
            self.__watch_indexes_dict.clear()
            self.__trade_progress_index_dict.clear()
    def __commpute_watch_indexes(self, code, traded_index, real_order_index):
        total_datas = local_today_datas.get(code)
        watch_indexes = set()
        for i in range(traded_index, real_order_index):
            # 判断是否有未撤的大单
            data = total_datas[i]
            val = data["val"]
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if val["num"] * float(val["price"]) < 30000:
                continue
            # 是否已撤单
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                     total_datas,
                                                                                                     local_today_canceled_buyno_map.get(
                                                                                                         code))
            if left_count > 0:
                watch_indexes.add(i)
        if not watch_indexes:
            temp_list = []
            for i in range(traded_index, real_order_index):
                data = total_datas[i]
                val = data["val"]
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if val["num"] * float(val["price"]) < 5000:
                    continue
                temp_list.append((val["num"], data))
                if len(temp_list) > 15:
                    break
            temp_list.sort(key=lambda x: x[0], reverse=True)
            if temp_list:
                watch_indexes.add(temp_list[0][1]["index"])
        self.__watch_indexes_dict[code] = watch_indexes
    def set_trade_progress(self, code, buy_single_index, index):
        if self.__trade_progress_index_dict.get(code) != index:
            self.__trade_progress_index_dict[code] = index
            self.__commpute_watch_indexes(code, index, self.__real_place_order_index_dict.get(code))
    def need_cancel(self, code, buy_exec_index, start_index, end_index):
        if code not in self.__real_place_order_index_dict:
@@ -1706,17 +1759,21 @@
        if tool.trade_time_sub(total_datas[end_index]["val"]["time"], total_datas[buy_exec_index]["val"]["time"]) > 15:
            return False, None, "下单15s内才生效"
        watch_indexes = self.__watch_indexes_dict.get(code)
        if watch_indexes is None:
            watch_indexes = set()
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data["val"]
            if not L2DataUtil.is_limit_up_price_buy_cancel(val):
                continue
            if val["num"] * float(val["price"]) < 30000:
            if val["num"] * float(val["price"]) < 5000:
                continue
            buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data,
                                                                                                local_today_buyno_map.get(
                                                                                                    code))
            if buy_index is not None and buy_index < real_place_order_index:
            if buy_index is not None and buy_index < real_place_order_index and buy_index in watch_indexes:
                return True, data, ""
        return False, None, ""
l2/l2_transaction_data_manager.py
@@ -10,7 +10,8 @@
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_util, l2_data_manager, transaction_progress, l2_data_source_util
from l2.cancel_buy_strategy import LCancelRateManager, LCancelBigNumComputer, \
    SecondCancelBigNumComputer, HourCancelBigNumComputer, FastCancelBigNumComputer, UCancelBigNumComputer
    SecondCancelBigNumComputer, HourCancelBigNumComputer, FastCancelBigNumComputer, UCancelBigNumComputer, \
    GCancelBigNumComputer
from l2.l2_data_manager_new import L2TradeDataProcessor
from l2.l2_data_util import L2DataUtil, local_today_canceled_buyno_map
from log_module import async_log_util
@@ -144,8 +145,11 @@
                async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                    buy_progress_index)
                GCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index)
                LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index,
                                                           total_datas)
                FastCancelBigNumComputer().set_trade_progress(code, buy_progress_index)
                SecondCancelBigNumComputer().set_transaction_index(
                    code,
msg/push_msg_manager.py
New file
@@ -0,0 +1,32 @@
"""
推送消息管理器
"""
import concurrent.futures
from utils import middle_api_protocol
TYPE_ORDER_ALMOST_DEAL = "order_almost_deal"  # 订单即将成交
TYPE_DELEGATE_QUEUE_CHANGE = "delegate_queue_change"  # 委托队列变化
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
# 推送订单快成交信息
def push_order_almost_deal(code, code_name):
    __push_msg(TYPE_ORDER_ALMOST_DEAL, data={"code": code, "code_name": code_name})
# 推送委托队列变化消息
def push_delegate_queue_update():
    __push_msg(TYPE_DELEGATE_QUEUE_CHANGE)
# 添加消息
def __push_msg(msg_type, data=None):
    def push():
        fdata = {"type": msg_type}
        if data:
            fdata["data"] = data
        middle_api_protocol.request(middle_api_protocol.load_push_msg(fdata))
    thread_pool.submit(push)
test/l2_trade_test.py
@@ -85,11 +85,11 @@
                except Exception as e:
                    pass
    @unittest.skip("跳过此单元测试")
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        trade_manager.TradeStateManager().open_buy()
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()
        code = "603161"
        code = "002186"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -220,7 +220,7 @@
        l2.l2_data_util.local_today_datas[code] = total_datas
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
    # @unittest.skip("跳过此单元测试")
    @unittest.skip("跳过此单元测试")
    def test_block(self):
        code = "603778"
        # KPLCodeJXBlockManager().load_jx_blocks(code, 23.52,23.62,