From a0f4a1d5bed0b4be8be122e90d2f95b76f178a94 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 21 十一月 2024 17:41:22 +0800 Subject: [PATCH] 精简代码/代码归类 --- servers/huaxin_trade_server.py | 142 ++--------------------------------------------- 1 files changed, 6 insertions(+), 136 deletions(-) diff --git a/servers/huaxin_trade_server.py b/servers/huaxin_trade_server.py index c72c2a5..5623b2b 100644 --- a/servers/huaxin_trade_server.py +++ b/servers/huaxin_trade_server.py @@ -291,59 +291,6 @@ __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) __updating_jx_blocks_codes = set() - @classmethod - def sell(cls, datas): - rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL]) - excuted_rule_ids = set() - if rules: - for d in datas: - code = d[0] - # 鏍煎紡 (浠g爜,鐜颁环,娑ㄥ箙,閲�,鏇存柊鏃堕棿,涔�1浠锋牸,涔�1閲�) - buy1_volume = d[6] - buy1_price = d[5] - if buy1_volume: - for r in rules: - # 鐢熸晥鏃堕棿 - if r.code == code: - # --------鍒ゆ柇鏄惁鍙互鎵ц-------- - can_excute = False - if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2): - # 浠锋牸宸茬粡瑙﹀彂 - if r.buy1_volume: - if r.buy1_volume >= buy1_volume: - # 閲忎环瑙﹀彂 - can_excute = True - async_log_util.info(logger_trade, f"瑙﹀彂鍗栬鍒欙細閲忚Е鍙憑buy1_volume}/{r.buy1_volume}") - else: - can_excute = True - async_log_util.info(logger_trade, f"瑙﹀彂鍗栬鍒欙細浠锋牸瑙﹀彂{buy1_price}/{r.buy1_price}") - # 浠锋牸瑙﹀彂 - # 鑾峰彇浠锋牸绫诲瀷 - if not can_excute: - continue - - # 璇锋眰鍗栧嚭閿� - TradeRuleManager().require_sell_lock(r.id_) - try: - if r.id_ in excuted_rule_ids: - continue - excuted_rule_ids.add(r.id_) - # 鑾峰彇鏈�鏂扮殑鎵ц鐘跺喌 - r = TradeRuleManager().get_by_id(r.id_) - if r.excuted: - continue - # 鎻愪氦鍗� - limit_down_price = gpcode_manager.get_limit_down_price(code) - limit_up_price = gpcode_manager.get_limit_up_price(code) - huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price, - limit_down_price, - buy1_price) - TradeRuleManager().excuted(r.id_) - except Exception as e: - logger_debug.exception(e) - finally: - TradeRuleManager().release_sell_lock(r.id_) - # 淇濆瓨鐜颁环 @classmethod def __save_l1_current_price(cls, datas): @@ -400,14 +347,6 @@ else: cls.__process_l1_data_thread_pool.submit( lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)) - - @classmethod - def set_l1_trade_codes_info(cls, data_json): - data = data_json["data"] - request_id = data_json["request_id"] - datas = data["data"] - cls.__save_l1_current_price(datas) - cls.sell(datas) @classmethod def l2_order(cls, code, _datas, timestamp): @@ -536,13 +475,6 @@ def trading_order_canceled(cls, code, order_no): pass - @classmethod - def test_sell(cls): - # (浠g爜, 鐜颁环, 娑ㄥ箙, 閲�, 鏇存柊鏃堕棿, 涔�1浠锋牸, 涔�1閲�) - datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210), - ("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)] - cls.sell(datas) - def clear_invalid_client(): logger_system.info(f"trade_server clear_invalid_client 绾跨▼ID:{tool.get_thread_id()}") @@ -553,28 +485,6 @@ pass finally: time.sleep(2) - - -def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue): - logger_system.info(f"trade_server __recv_pipe_l1 绾跨▼ID:{tool.get_thread_id()}") - if queue_l1_w_strategy_r is not None: - while True: - try: - val = queue_l1_w_strategy_r.get() - if val: - val = json.loads(val) - # print("鏀跺埌鏉ヨ嚜L1鐨勬暟鎹細", val["type"]) - # 澶勭悊鏁版嵁 - type_ = val["type"] - timestamp = val.get("time") - # 澶т簬10s鐨勬暟鎹斁寮冨鐞� - if type_ == "set_target_codes": - async_log_util.info(logger_l2_codes_subscript, f"绛栫暐鎺ユ敹鍒版暟鎹�") - if time.time() * 1000 - timestamp > 10 * 1000: - continue - TradeServerProcessor.set_target_codes(val) - except Exception as e: - logger_debug.exception(e) # 鎺掑緱澶繙鎾ゅ崟 @@ -641,28 +551,6 @@ logger_debug.exception(e) finally: time.sleep(3) - - -def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue): - logger_system.info(f"trade_server __recv_pipe_l1_trade 绾跨▼ID:{tool.get_thread_id()}") - if queue_l1_trade_w_strategy_r is not None: - while True: - try: - val = queue_l1_trade_w_strategy_r.get() - if val: - async_log_util.info(logger_local_huaxin_l1_trade_info, f"瀹㈡埛绔帴鏀讹細{val}") - val = json.loads(val) - # print("鏀跺埌鏉ヨ嚜L1鐨勬暟鎹細", val["type"]) - # 澶勭悊鏁版嵁 - type_ = val["type"] - if type_ == "upload_l1_trade_datas": - # 澶勭悊涓撲负浜ゆ槗鎻愪緵鐨凩1鏁版嵁 - TradeServerProcessor.set_l1_trade_codes_info(val) - async_log_util.info(logger_local_huaxin_l1_trade_info, val) - - except Exception as e: - logger_local_huaxin_l1_trade_info.exception(e) - logging.exception(e) class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack): @@ -873,7 +761,8 @@ result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas) async_log_util.info(logger_l2_radical_buy, f"閲忎拱鍏ョ粨鏋滃垽鏂細{code}, 缁撴灉锛歿result_by_volume} 鏉垮潡锛歿buy_blocks}") in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() - buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] + buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b), + in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE: if tool.get_now_time_as_int() < 93200: radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) @@ -1018,19 +907,17 @@ threading.Thread(target=run_pending, daemon=True).start() l2_data_util.load_l2_data_all(True) + # L2鎴愪氦淇″彿鍥炶皟 L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback()) # 鍔犺浇鑷敱娴侀�氶噺 global_data_loader.load_zyltgb_volume_from_db() -def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, - queue_l1_trade_w_strategy_r, trade_ipc_addr): +def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr): """ @param queue_strategy_r_trade_w: - @param queue_l1_w_strategy_r: @param queue_strategy_w_trade_r: @param queue_strategy_w_trade_r_for_read: - @param queue_l1_trade_w_strategy_r: @param trade_ipc_addr: 浜ゆ槗IPC鍦板潃锛氾紙涓嬪崟ipc鍦板潃,鎾ゅ崟ipc鍦板潃锛� @return: """ @@ -1050,32 +937,15 @@ huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr) - # 鐩戝惉l1閭h竟浼犺繃鏉ョ殑浠g爜 - t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True) - t1.start() - - # 鐩戝惉l1浜ゆ槗閭h竟浼犺繃鏉ョ殑浠g爜 - t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True) - t1.start() - # 涓嬪崟璺濈澶繙鍙栨秷璁㈠崟 t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True) t1.start() - # 鍚屾寮傛鏃ュ織 - t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True) - t1.start() - - # 鍚屾L2鐨勫紓姝ユ棩蹇� - l2_log.codeLogQueueDistributeManager.run_async() - - t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True) - t1.start() - - logger_system.info("create TradeServer") + # 娓呯悊鏃犵敤鐨勫鎴风 t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) t1.start() + logger_system.info("create TradeServer") laddr = "0.0.0.0", 10008 try: tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle -- Gitblit v1.8.0