| | |
| | | __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | __updating_jx_blocks_codes = set() |
| | | |
| | | @classmethod |
| | | def sell(cls, datas): |
| | | rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL]) |
| | | excuted_rule_ids = set() |
| | | if rules: |
| | | for d in datas: |
| | | code = d[0] |
| | | # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量) |
| | | buy1_volume = d[6] |
| | | buy1_price = d[5] |
| | | if buy1_volume: |
| | | for r in rules: |
| | | # 生效时间 |
| | | if r.code == code: |
| | | # --------判断是否可以执行-------- |
| | | can_excute = False |
| | | if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2): |
| | | # 价格已经触发 |
| | | if r.buy1_volume: |
| | | if r.buy1_volume >= buy1_volume: |
| | | # 量价触发 |
| | | can_excute = True |
| | | async_log_util.info(logger_trade, f"触发卖规则:量触发{buy1_volume}/{r.buy1_volume}") |
| | | else: |
| | | can_excute = True |
| | | async_log_util.info(logger_trade, f"触发卖规则:价格触发{buy1_price}/{r.buy1_price}") |
| | | # 价格触发 |
| | | # 获取价格类型 |
| | | if not can_excute: |
| | | continue |
| | | |
| | | # 请求卖出锁 |
| | | TradeRuleManager().require_sell_lock(r.id_) |
| | | try: |
| | | if r.id_ in excuted_rule_ids: |
| | | continue |
| | | excuted_rule_ids.add(r.id_) |
| | | # 获取最新的执行状况 |
| | | r = TradeRuleManager().get_by_id(r.id_) |
| | | if r.excuted: |
| | | continue |
| | | # 提交卖 |
| | | limit_down_price = gpcode_manager.get_limit_down_price(code) |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price, |
| | | limit_down_price, |
| | | buy1_price) |
| | | TradeRuleManager().excuted(r.id_) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | finally: |
| | | TradeRuleManager().release_sell_lock(r.id_) |
| | | |
| | | # 保存现价 |
| | | @classmethod |
| | | def __save_l1_current_price(cls, datas): |
| | |
| | | else: |
| | | cls.__process_l1_data_thread_pool.submit( |
| | | lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)) |
| | | |
| | | @classmethod |
| | | def set_l1_trade_codes_info(cls, data_json): |
| | | data = data_json["data"] |
| | | request_id = data_json["request_id"] |
| | | datas = data["data"] |
| | | cls.__save_l1_current_price(datas) |
| | | cls.sell(datas) |
| | | |
| | | @classmethod |
| | | def l2_order(cls, code, _datas, timestamp): |
| | |
| | | def trading_order_canceled(cls, code, order_no): |
| | | pass |
| | | |
| | | @classmethod |
| | | def test_sell(cls): |
| | | # (代码, 现价, 涨幅, 量, 更新时间, 买1价格, 买1量) |
| | | datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210), |
| | | ("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)] |
| | | cls.sell(datas) |
| | | |
| | | |
| | | def clear_invalid_client(): |
| | | logger_system.info(f"trade_server clear_invalid_client 线程ID:{tool.get_thread_id()}") |
| | |
| | | pass |
| | | finally: |
| | | time.sleep(2) |
| | | |
| | | |
| | | def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue): |
| | | logger_system.info(f"trade_server __recv_pipe_l1 线程ID:{tool.get_thread_id()}") |
| | | if queue_l1_w_strategy_r is not None: |
| | | while True: |
| | | try: |
| | | val = queue_l1_w_strategy_r.get() |
| | | if val: |
| | | val = json.loads(val) |
| | | # print("收到来自L1的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | timestamp = val.get("time") |
| | | # 大于10s的数据放弃处理 |
| | | if type_ == "set_target_codes": |
| | | async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据") |
| | | if time.time() * 1000 - timestamp > 10 * 1000: |
| | | continue |
| | | TradeServerProcessor.set_target_codes(val) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | |
| | | |
| | | # 排得太远撤单 |
| | |
| | | logger_debug.exception(e) |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | |
| | | def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue): |
| | | logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}") |
| | | if queue_l1_trade_w_strategy_r is not None: |
| | | while True: |
| | | try: |
| | | val = queue_l1_trade_w_strategy_r.get() |
| | | if val: |
| | | async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}") |
| | | val = json.loads(val) |
| | | # print("收到来自L1的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | if type_ == "upload_l1_trade_datas": |
| | | # 处理专为交易提供的L1数据 |
| | | TradeServerProcessor.set_l1_trade_codes_info(val) |
| | | async_log_util.info(logger_local_huaxin_l1_trade_info, val) |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_l1_trade_info.exception(e) |
| | | logging.exception(e) |
| | | |
| | | |
| | | class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack): |
| | |
| | | result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas) |
| | | async_log_util.info(logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}") |
| | | in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() |
| | | buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] |
| | | buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b), |
| | | in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] |
| | | if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE: |
| | | if tool.get_now_time_as_int() < 93200: |
| | | radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) |
| | |
| | | |
| | | threading.Thread(target=run_pending, daemon=True).start() |
| | | l2_data_util.load_l2_data_all(True) |
| | | # L2成交信号回调 |
| | | L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback()) |
| | | # 加载自由流通量 |
| | | global_data_loader.load_zyltgb_volume_from_db() |
| | | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, |
| | | queue_l1_trade_w_strategy_r, trade_ipc_addr): |
| | | def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr): |
| | | """ |
| | | @param queue_strategy_r_trade_w: |
| | | @param queue_l1_w_strategy_r: |
| | | @param queue_strategy_w_trade_r: |
| | | @param queue_strategy_w_trade_r_for_read: |
| | | @param queue_l1_trade_w_strategy_r: |
| | | @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址) |
| | | @return: |
| | | """ |
| | |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, trade_ipc_addr) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 监听l1交易那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 下单距离太远取消订单 |
| | | t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True) |
| | | t1.start() |
| | | |
| | | # 同步异步日志 |
| | | t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True) |
| | | t1.start() |
| | | |
| | | # 同步L2的异步日志 |
| | | l2_log.codeLogQueueDistributeManager.run_async() |
| | | |
| | | t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True) |
| | | t1.start() |
| | | |
| | | logger_system.info("create TradeServer") |
| | | # 清理无用的客户端 |
| | | t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) |
| | | t1.start() |
| | | |
| | | logger_system.info("create TradeServer") |
| | | laddr = "0.0.0.0", 10008 |
| | | try: |
| | | tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle |