From 32203dcb2d06b93e4b6c81f9121b00531a91395e Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 06 六月 2025 18:43:07 +0800
Subject: [PATCH] bug修复

---
 api/outside_api_command_manager.py |  273 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 273 insertions(+), 0 deletions(-)

diff --git a/api/outside_api_command_manager.py b/api/outside_api_command_manager.py
new file mode 100644
index 0000000..ba16e7a
--- /dev/null
+++ b/api/outside_api_command_manager.py
@@ -0,0 +1,273 @@
+"""
+澶栭儴鎺ュ彛绠$悊
+"""
+import json
+import logging
+import random
+import socket
+import threading
+import time
+
+# 蹇冭烦淇℃伅
+from huaxin_client import socket_util
+from huaxin_client.client_network import SendResponseSkManager
+from log_module.log import logger_system, logger_request_api
+from utils import middle_api_protocol, tool
+
+MSG_TYPE_HEART = "heart"
+# 鍛戒护淇℃伅
+MSG_TYPE_CMD = "cmd"
+
+CLIENT_TYPE_TRADE = "trade_low_suction"
+
+# 蹇冭烦鏃堕棿闂撮殧
+HEART_SPACE_TIME = 3
+
+TRADE_DIRECTION_BUY = 1
+TRADE_DIRECTION_SELL = 2
+
+TRADE_TYPE_ORDER = 1
+TRADE_TYPE_CANCEL_ORDER = 2
+
+# 鏁版嵁鎿嶄綔
+OPERRATE_SET = 1  # 璁剧疆
+OPERRATE_DELETE = 2  # 鍒犻櫎
+OPERRATE_GET = 3  # 鑾峰彇
+OPERRATE_ADD = 4  # 鏂板
+
+# 浠g爜鍚嶅崟绫诲瀷
+CODE_LIST_WHITE = "white"
+CODE_LIST_BLACK = "black"
+CODE_LIST_WANT = "want"
+CODE_LIST_PAUSE_BUY = "pause_buy"
+CODE_LIST_MUST_BUY = "must_buy"
+CODE_LIST_GREEN = "green"
+
+# 绫诲瀷
+API_TYPE_TRADE = "trade"  # 浜ゆ槗
+API_TYPE_TRADE_STATE = "trade_state"  # 浜ゆ槗鐘舵��
+API_TYPE_TRADE_MODE = "trade_mode"  # 浜ゆ槗妯″紡
+API_TYPE_SELL_RULE = "sell_rule"  # 鍗栧嚭瑙勫垯
+API_TYPE_CODE_LIST = "code_list"  # 浠g爜鍚嶅崟
+API_TYPE_EXPORT_L2 = "export_l2"  # 瀵煎嚭L2鏁版嵁
+API_TYPE_INIT = "init"  # 鍒濆鍖�
+API_TYPE_REFRESH_TRADE_DATA = "refresh_trade_data"  # 浜ゆ槗鏁版嵁鍒锋柊
+API_TYPE_CODE_ATRRIBUTE = "code_attribute"  # 浠g爜灞炴��
+API_TYPE_CODE_TRADE_STATE = "code_trade_state"  # 浠g爜浜ゆ槗鐘舵��
+API_TYPE_GET_ENV = "get_env"  # 鑾峰彇鐜淇℃伅
+API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes"  # 鍚屾L1闇�瑕佽闃呯殑浠g爜
+API_TYPE_SYSTEM_LOG = "system_log"  # 绯荤粺鏃ュ織
+API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server"  # 浠庢暟鎹湇鍔″櫒鎷夊彇鏁版嵁
+API_TYPE_CODE_TRADE_INFO = "code_trade_info"  # 浠g爜浜ゆ槗淇℃伅
+API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count"  # L2鏈夋晥鐩戝惉鏁伴噺
+API_TYPE_SAVE_RUNNING_DATA = "save_running_data"  # 淇濆瓨杩愯鏃舵暟鎹�
+API_TYPE_GET_CODE_POSITION_INFO = "get_code_position_info"  # 鑾峰彇浠g爜鎸佷粨淇℃伅
+API_TYPE_COMMON_REQUEST = "common_request"  # 閫氱敤璇锋眰
+
+
+class ActionCallback(object):
+    # 浜ゆ槗
+    def OnTrade(self, client_id, request_id, data):
+        pass
+
+    # 浜ゆ槗鐘舵��
+    def OnTradeState(self, client_id, request_id, data):
+        pass
+
+    # 浜ゆ槗妯″紡
+    def OnTradeMode(self, client_id, request_id, data):
+        pass
+
+    # 鍗栧嚭瑙勫垯
+    def OnSellRule(self, client_id, request_id, data):
+        pass
+
+    # 浠g爜鍚嶅崟
+    def OnCodeList(self, client_id, request_id, data):
+        pass
+
+    def OnExportL2(self, client_id, request_id, data):
+        pass
+
+    def OnEveryDayInit(self, client_id, request_id, data):
+        pass
+
+    def OnRefreshTradeData(self, client_id, request_id, data):
+        pass
+
+    def OnGetCodeAttribute(self, client_id, request_id, data):
+        pass
+
+    def OnGetCodeTradeState(self, client_id, request_id, data):
+        pass
+
+    def OnGetEnvInfo(self, client_id, request_id, data):
+        pass
+
+    def OnSyncL2SubscriptCodes(self, client_id, request_id):
+        pass
+
+    def OnGetFromDataServer(self, client_id, request_id, data):
+        pass
+
+    # 浠g爜鐨勪氦鏄撲俊鎭�
+    def OnGetCodeTradeInfo(self, client_id, request_id, data):
+        pass
+
+    def OnGetActiveListenCount(self, client_id, request_id):
+        pass
+
+    def OnSaveRunningData(self, client_id, request_id):
+        pass
+
+    def OnGetCodePositionInfo(self, client_id, request_id, data):
+        pass
+
+    def OnCommonRequest(self, client_id, request_id, data):
+        pass
+
+
+# 浜ゆ槗鎸囦护绠$悊
+# 浜ゆ槗鎸囦护绠$悊
+@tool.singleton
+class ApiCommandManager:
+    trade_ls_client_dict = {}
+    trade_ls_client_count = 0
+
+    def __init__(self, addr, port, trade_action_callback, trade_ls_client_count=20):
+        self.trade_ls_client_dict.clear()
+        self.trade_ls_client_count = trade_ls_client_count
+        self.action_callback = trade_action_callback
+        self.ip_port = (addr, port)
+
+        for i in range(trade_ls_client_count):
+            result = self.__create_and_run_client(CLIENT_TYPE_TRADE, i)
+            self.trade_ls_client_dict[result[0]] = result[1]
+
+    def __create_client(self, client_type, rid):
+        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 鐢熸垚socket锛岃繛鎺erver
+        # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
+        # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000))
+        client.connect(self.ip_port)
+        client.send(SendResponseSkManager.format_response(
+            json.dumps({"type": "register", "data": {"client_type": client_type}, "rid": rid}).encode("utf-8")))
+        client.recv(1024)
+        return client
+
+    def __create_and_run_client(self, type, index=None):
+        key = f"{type}_{round(time.time() * 1000)}_{random.randint(0, 1000)}"
+        if index is not None:
+            key += f"_{index}"
+        sk = self.__create_client(type, key)
+        # 鍙戦�佸績璺�
+        self.__heartbeats_thread(type, key, sk)
+        self.__listen_command_thread(type, key, sk)
+        # print("create_and_run_client success", type, key)
+        logger_request_api.info(f"鍒涘缓鏈湴socket璇锋眰瀹㈡埛绔細{type}")
+        return key, sk
+
+    # 鍚彇鎸囦护
+    def __listen_command(self, _type, client_id, sk):
+        while True:
+            try:
+                result = socket_util.recv_data(sk)[0]
+                if result:
+                    start_time = time.time()
+                    try:
+                        print("鎺ユ敹鏁版嵁", _type, result)
+                        result_json = json.loads(result)
+                        if result_json["type"] == MSG_TYPE_HEART:
+                            # 杩斿洖鍐呭
+                            sk.send(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8'))
+                            continue
+
+                        data = result_json["data"]
+                        content_type = data["type"]
+                        # print("鎺ユ敹鍐呭", data)
+                        request_id = result_json.get('request_id')
+                        if not socket_util.is_client_params_sign_right(result_json):
+                            # print("绛惧悕閿欒")
+                            # 绛惧悕鍑洪敊
+                            SendResponseSkManager.send_error_response(_type, request_id, client_id,
+                                                                      {"code": -1, "msg": "绛惧悕閿欒"})
+                            continue
+                        if content_type == API_TYPE_COMMON_REQUEST:
+                            self.action_callback.OnCommonRequest(client_id, request_id, data)
+                    except Exception as e:
+                        logging.exception(e)
+                    finally:
+                        use_time = int(time.time() - start_time)
+                        if use_time > 5:
+                            result_json = json.loads(result)
+                            logger_request_api.info(f"瓒呮椂5s浠ヤ笂锛歿result_json['data']['type']}")
+                        # 鍙戦�佸搷搴�
+                        sk.send(json.dumps({"type": "cmd_recieve"}).encode('utf-8'))
+                else:
+                    raise Exception("鎺ユ敹鐨勫唴瀹逛负绌�")
+
+            except Exception as e:
+                logging.exception(e)
+                if _type == CLIENT_TYPE_TRADE:
+                    if client_id in self.trade_ls_client_dict:
+                        self.trade_ls_client_dict.pop(client_id)
+                try:
+                    sk.close()
+                except:
+                    pass
+                    # 缁撴潫褰撳墠鐨勬秷鎭惊鐜�
+                break
+
+    def __heart_beats(self, _type, client_id, sk):
+        while True:
+            try:
+                sk.send(socket_util.load_header(json.dumps({"type": "heart", "client_id": client_id}).encode('utf-8')))
+                # print("蹇冭烦淇℃伅鍙戦�佹垚鍔�", client_id)
+            except Exception as e:
+                if _type == CLIENT_TYPE_TRADE:
+                    if client_id in self.trade_ls_client_dict:
+                        self.trade_ls_client_dict.pop(client_id)
+                try:
+                    sk.close()
+                except:
+                    pass
+                    # 缁撴潫褰撳墠鐨勬秷鎭惊鐜�
+                break
+            time.sleep(HEART_SPACE_TIME)
+
+    def __listen_command_thread(self, _type, rid, sk):
+        t1 = threading.Thread(target=lambda: self.__listen_command(_type, rid, sk))
+        t1.setDaemon(True)
+        t1.start()
+
+    def __heartbeats_thread(self, _type, rid, sk):
+        t1 = threading.Thread(target=lambda: self.__heart_beats(_type, rid, sk))
+        t1.setDaemon(True)
+        t1.start()
+
+    def __maintain_client(self):
+        logger_system.info(f"outside_api __maintain_client 绾跨▼ID:{tool.get_thread_id()}")
+        while True:
+            try:
+                if len(self.trade_ls_client_dict) < self.trade_ls_client_count:
+                    for i in range(self.trade_ls_client_count - len(self.trade_ls_client_dict)):
+                        result = self.__create_and_run_client(CLIENT_TYPE_TRADE)
+                        self.trade_ls_client_dict[result[0]] = result[1]
+            except:
+                pass
+            time.sleep(1)
+
+    # 缁存姢杩炴帴鏁扮殑绋冲畾
+    def run(self, blocking=True):
+        # 缁存姢client
+        if blocking:
+            self.__maintain_client()
+        else:
+            t1 = threading.Thread(target=lambda: self.__maintain_client())
+            t1.setDaemon(True)
+            t1.start()
+
+
+if __name__ == "__main__":
+    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, ActionCallback())
+    manager.run()
+    input()

--
Gitblit v1.8.0