Administrator
2024-11-21 a0f4a1d5bed0b4be8be122e90d2f95b76f178a94
servers/huaxin_trade_server.py
@@ -291,59 +291,6 @@
    __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    __updating_jx_blocks_codes = set()
    @classmethod
    def sell(cls, datas):
        rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL])
        excuted_rule_ids = set()
        if rules:
            for d in datas:
                code = d[0]
                # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
                buy1_volume = d[6]
                buy1_price = d[5]
                if buy1_volume:
                    for r in rules:
                        # 生效时间
                        if r.code == code:
                            # --------判断是否可以执行--------
                            can_excute = False
                            if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2):
                                # 价格已经触发
                                if r.buy1_volume:
                                    if r.buy1_volume >= buy1_volume:
                                        # 量价触发
                                        can_excute = True
                                        async_log_util.info(logger_trade, f"触发卖规则:量触发{buy1_volume}/{r.buy1_volume}")
                                else:
                                    can_excute = True
                                    async_log_util.info(logger_trade, f"触发卖规则:价格触发{buy1_price}/{r.buy1_price}")
                                    # 价格触发
                                # 获取价格类型
                                if not can_excute:
                                    continue
                                # 请求卖出锁
                                TradeRuleManager().require_sell_lock(r.id_)
                                try:
                                    if r.id_ in excuted_rule_ids:
                                        continue
                                    excuted_rule_ids.add(r.id_)
                                    # 获取最新的执行状况
                                    r = TradeRuleManager().get_by_id(r.id_)
                                    if r.excuted:
                                        continue
                                    # 提交卖
                                    limit_down_price = gpcode_manager.get_limit_down_price(code)
                                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                                    huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price,
                                                                limit_down_price,
                                                                buy1_price)
                                    TradeRuleManager().excuted(r.id_)
                                except Exception as e:
                                    logger_debug.exception(e)
                                finally:
                                    TradeRuleManager().release_sell_lock(r.id_)
    # 保存现价
    @classmethod
    def __save_l1_current_price(cls, datas):
@@ -400,14 +347,6 @@
        else:
            cls.__process_l1_data_thread_pool.submit(
                lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id))
    @classmethod
    def set_l1_trade_codes_info(cls, data_json):
        data = data_json["data"]
        request_id = data_json["request_id"]
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.sell(datas)
    @classmethod
    def l2_order(cls, code, _datas, timestamp):
@@ -536,13 +475,6 @@
    def trading_order_canceled(cls, code, order_no):
        pass
    @classmethod
    def test_sell(cls):
        # (代码, 现价, 涨幅, 量, 更新时间, 买1价格, 买1量)
        datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210),
                 ("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)]
        cls.sell(datas)
def clear_invalid_client():
    logger_system.info(f"trade_server clear_invalid_client 线程ID:{tool.get_thread_id()}")
@@ -553,28 +485,6 @@
            pass
        finally:
            time.sleep(2)
def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_l1 线程ID:{tool.get_thread_id()}")
    if queue_l1_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
                    # 大于10s的数据放弃处理
                    if type_ == "set_target_codes":
                        async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据")
                        if time.time() * 1000 - timestamp > 10 * 1000:
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logger_debug.exception(e)
# 排得太远撤单
@@ -641,28 +551,6 @@
            logger_debug.exception(e)
        finally:
            time.sleep(3)
def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}")
    if queue_l1_trade_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_trade_w_strategy_r.get()
                if val:
                    async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}")
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    if type_ == "upload_l1_trade_datas":
                        # 处理专为交易提供的L1数据
                        TradeServerProcessor.set_l1_trade_codes_info(val)
                        async_log_util.info(logger_local_huaxin_l1_trade_info, val)
            except Exception as e:
                logger_local_huaxin_l1_trade_info.exception(e)
                logging.exception(e)
class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack):
@@ -873,7 +761,8 @@
                    result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas)
                    async_log_util.info(logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
                    in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
                    buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),in_blocks.index(b) if b in  in_blocks else -1) for b in buy_blocks]
                    buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),
                                              in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks]
                    if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE:
                        if tool.get_now_time_as_int() < 93200:
                            radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
@@ -1018,19 +907,17 @@
    threading.Thread(target=run_pending, daemon=True).start()
    l2_data_util.load_l2_data_all(True)
    # L2成交信号回调
    L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback())
    # 加载自由流通量
    global_data_loader.load_zyltgb_volume_from_db()
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        queue_l1_trade_w_strategy_r, trade_ipc_addr):
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):
    """
    @param queue_strategy_r_trade_w:
    @param queue_l1_w_strategy_r:
    @param queue_strategy_w_trade_r:
    @param queue_strategy_w_trade_r_for_read:
    @param queue_l1_trade_w_strategy_r:
    @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址)
    @return:
    """
@@ -1050,32 +937,15 @@
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                                        queue_strategy_w_trade_r_for_read, trade_ipc_addr)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)
        t1.start()
        # 监听l1交易那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True)
        t1.start()
        # 下单距离太远取消订单
        t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True)
        t1.start()
        # 同步异步日志
        t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
        t1.start()
        # 同步L2的异步日志
        l2_log.codeLogQueueDistributeManager.run_async()
        t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
        # 清理无用的客户端
        t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
        laddr = "0.0.0.0", 10008
        try:
            tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle