From 287c506725b2d970f721f80169f83c2418cb0991 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期三, 04 六月 2025 18:39:02 +0800 Subject: [PATCH] 添加新版低吸中间服务器 --- socket_manager.py | 4 + middle_api_server.py | 4 middle_ls_api_server.py | 93 +++++++++++++++++++++++++++++++ utils/hosting_api_util.py | 16 +++++ utils/huaxin_trade_record_manager.py | 6 +- main.py | 5 + middle_server.py | 16 +++++ low_suction_proxy_server.py | 4 utils/global_data_cache_util.py | 1 9 files changed, 141 insertions(+), 8 deletions(-) diff --git a/low_suction_proxy_server.py b/low_suction_proxy_server.py index e525c0b..ccd4be6 100644 --- a/low_suction_proxy_server.py +++ b/low_suction_proxy_server.py @@ -19,8 +19,8 @@ logger = logging.getLogger("http.server") logger.setLevel(logging.CRITICAL) -# 183.234.94.164/125.93.72.196 -REAL_HOST, REAL_PORT = "183.234.94.164", 12881 +# 183.234.94.163/125.93.72.195 +REAL_HOST, REAL_PORT = "183.234.94.163", 12881 class DataServer(BaseHTTPRequestHandler): diff --git a/main.py b/main.py index b43a17c..2826191 100644 --- a/main.py +++ b/main.py @@ -5,9 +5,12 @@ import data_server import middle_api_server import middle_cb_api_server +import middle_ls_api_server import middle_server from log_module import async_log_util +# cd /usr/local/middle_server_source/gp-server +# PYTHONPATH=../../gp_server_source/lib python3 main.py if __name__ == "__main__": t1 = threading.Thread(target=lambda: middle_api_server.run(), daemon=True) t1.start() @@ -20,6 +23,8 @@ # 鍙浆鍊篈PI绔彛涓�13008 t1 = threading.Thread(target=lambda: middle_cb_api_server.run(13008), daemon=True) t1.start() + t1 = threading.Thread(target=lambda: middle_ls_api_server.run(14008), daemon=True) + t1.start() # t1 = threading.Thread(target=lambda: middle_l1_data_server.run(12881), daemon=True) # t1.start() diff --git a/middle_api_server.py b/middle_api_server.py index 85ce1f1..60690fe 100644 --- a/middle_api_server.py +++ b/middle_api_server.py @@ -200,7 +200,7 @@ params = data_json["data"] ctype = params.get("ctype") trade_sell_types = {"get_current_l1_codes", "get_positions", "get_l2_deal_price", - "buy_cb_for_commission", "sell_cb_for_commission", "get_deal_queue"} + "buy_cb_for_commission", "sell_cb_for_commission", "get_deal_queue", "auto_cancel_sell_mode"} if ctype in trade_sell_types: result = hosting_api_util.common_request(params, client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) @@ -266,7 +266,7 @@ fdata = [] try: # 鑾峰彇褰撳墠娑ㄥ仠姣斾緥 - rate_results_dict = CodesLimitRateManager.get_price_rates(set([r[0] for r in fresults])) + rate_results_dict = global_data_cache_util.huaxin_subscript_codes_rate for r in fresults: fdata.append( (r[0], r[1], rate_results_dict.get(r[0]) if r[0] in rate_results_dict else 0, diff --git a/middle_ls_api_server.py b/middle_ls_api_server.py new file mode 100644 index 0000000..553ceca --- /dev/null +++ b/middle_ls_api_server.py @@ -0,0 +1,93 @@ +import hashlib +import json +import logging +import socket +import socketserver + +from log_module import request_log_util +from log_module.log import logger_request_debug +from utils import socket_util, hosting_api_util + +""" +浣庡惛澶栭儴鎺ュ彛 +""" + + +class MyTCPServer(socketserver.TCPServer): + def __init__(self, server_address, RequestHandlerClass): + socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) + + +# 濡傛灉浣跨敤寮傛鐨勫舰寮忓垯闇�瑕佸啀閲嶅啓ThreadingTCPServer +class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass + + +class MyBaseRequestHandle(socketserver.BaseRequestHandler): + __inited = False + + def setup(self): + self.__init() + + @classmethod + def __init(cls): + if cls.__inited: + return True + cls.__inited = True + cls.__req_socket_dict = {} + + def __is_sign_right(self, data_json): + list_str = [] + sign = data_json["sign"] + data_json.pop("sign") + for k in data_json: + list_str.append(f"{k}={data_json[k]}") + list_str.sort() + __str = "&".join(list_str) + "JiaBei@!*." + md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() + if md5 != sign: + raise Exception("绛惧悕鍑洪敊") + + def handle(self): + host = self.client_address[0] + super().handle() + sk: socket.socket = self.request + while True: + return_str = "" + try: + data, header = socket_util.recv_data(sk) + if data: + data_str = data + # print("鏀跺埌鏁版嵁------", f"{data_str[:20]}......{data_str[-20:]}") + data_json = json.loads(data_str) + type_ = data_json['type'] + try: + request_log_util.request_info("middle_ls_api_server", f"璇锋眰寮�濮嬶細{type_}") + is_sign_right = socket_util.is_client_params_sign_right(data_json) + # ------瀹㈡埛绔姹傛帴鍙�------- + if type_ == 'common': + params = data_json["data"] + result = hosting_api_util.common_request_for_low_suction(params) + return_str = json.dumps(result) + break + finally: + request_log_util.request_info("middle_ls_api_server", f"璇锋眰缁撴潫锛歿type_}") + break + # sk.close() + except Exception as e: + logging.exception(e) + logger_request_debug.exception(e) + return_str = json.dumps({"code": 401, "msg": str(e)}) + break + finally: + sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8'))) + + def finish(self): + super().finish() + + +def run(port): + print("create middle_ls_api_server") + laddr = "0.0.0.0", port + print("middle_ls_api_server is at: http://%s:%d/" % (laddr)) + tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle + tcpserver.serve_forever() \ No newline at end of file diff --git a/middle_server.py b/middle_server.py index b7228df..c2606c5 100644 --- a/middle_server.py +++ b/middle_server.py @@ -161,6 +161,15 @@ global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str() finally: sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) + elif data_json["type"] == "l2_subscript_codes_rate": + # 璁剧疆璁㈤槄鐨勪唬鐮佺殑娑ㄥ箙 + try: + data = data_json["data"] + datas = data["data"] + # print("l2_subscript_codes", data_json) + global_data_cache_util.huaxin_subscript_codes_rate = datas + finally: + sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) elif data_json["type"] == "l2_position_subscript_codes": # 璁剧疆璁㈤槄鐨勪唬鐮� try: @@ -360,6 +369,13 @@ result_str = json.dumps({"code": 1, "msg": str(e)}) sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) break + elif data_json["type"] == 'low_suction': + # TODO 浣庡惛閫氶亾 + datas = data_json["data"] + pass + + + except Exception as e: log.logger_tuoguan_request_debug.exception(e) finally: diff --git a/socket_manager.py b/socket_manager.py index 0c492d3..1c1200c 100644 --- a/socket_manager.py +++ b/socket_manager.py @@ -9,6 +9,7 @@ CLIENT_TYPE_COMMON = "common" CLIENT_TYPE_TRADE = "trade" CLIENT_TYPE_TRADE_SELL = "trade_sell" + CLIENT_TYPE_TRADE_LOW_SUCTION = "trade_low_suction" # 鍙浆鍊哄鎴风 CLIENT_TYPE_TRADE_CB = "trade_cb" @@ -18,7 +19,8 @@ @classmethod def add_client(cls, _type, rid, sk): - if _type == cls.CLIENT_TYPE_COMMON or _type == cls.CLIENT_TYPE_TRADE or _type == cls.CLIENT_TYPE_TRADE_SELL or _type == cls.CLIENT_TYPE_TRADE_CB: + if _type in {cls.CLIENT_TYPE_COMMON, cls.CLIENT_TYPE_TRADE, cls.CLIENT_TYPE_TRADE_SELL, + cls.CLIENT_TYPE_TRADE_CB, cls.CLIENT_TYPE_TRADE_LOW_SUCTION}: # 浜ゆ槗鍒楄〃 if _type not in cls.socket_client_dict: cls.socket_client_dict[_type] = [] diff --git a/utils/global_data_cache_util.py b/utils/global_data_cache_util.py index 02236ad..8d61023 100644 --- a/utils/global_data_cache_util.py +++ b/utils/global_data_cache_util.py @@ -3,6 +3,7 @@ """ huaxin_subscript_codes = [] huaxin_subscript_codes_update_time =None +huaxin_subscript_codes_rate = {} huaxin_position_subscript_codes = [] huaxin_position_subscript_codes_update_time =None \ No newline at end of file diff --git a/utils/hosting_api_util.py b/utils/hosting_api_util.py index b40c603..8ddf2ed 100644 --- a/utils/hosting_api_util.py +++ b/utils/hosting_api_util.py @@ -432,5 +432,21 @@ return __read_response(client, request_id, blocking, timeout=10) +def common_request_for_low_suction(params, blocking=True): + """ + 閫氱敤璇锋眰 + :param params: + :param blocking: + :return: + """ + data = {"type": API_TYPE_COMMON_REQUEST, + "sinfo": f"cb_{API_TYPE_COMMON_REQUEST}_{round(time.time() * 1000)}"} + if params: + for k in params: + data[k] = params[k] + request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE_LOW_SUCTION, data) + return __read_response(client, request_id, blocking, timeout=10) + + if __name__ == "__main__": pass diff --git a/utils/huaxin_trade_record_manager.py b/utils/huaxin_trade_record_manager.py index 537d06f..ab96e73 100644 --- a/utils/huaxin_trade_record_manager.py +++ b/utils/huaxin_trade_record_manager.py @@ -17,7 +17,7 @@ class DelegateRecordManager: key_list = ["id", "orderLocalID", "securityID", "securityName", "direction", "orderSysID", "insertTime", "insertDate", "acceptTime", "cancelTime", "limitPrice", "turnover", "volume", "volumeTraded", - "orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID", "orderRef"] + "orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID", "orderRef", "sinfo"] @classmethod def add(cls, datas): @@ -34,12 +34,12 @@ nameDict = HistoryKDatasUtils.get_gp_codes_names([d['securityID']]) name = nameDict.get(d['securityID']) mysqldb.execute( - "insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( + "insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( _id, d["orderLocalID"], d["securityID"], name, d["direction"], d["orderSysID"], d["insertTime"], d["insertDate"], d["acceptTime"], d["cancelTime"], d["limitPrice"], d["turnover"], d["volume"], d["volumeTraded"], d["orderStatus"], d["orderSubmitStatus"], d["statusMsg"], tool.get_now_datetime_str(), - tool.get_now_datetime_str(), d["accountID"])) + tool.get_now_datetime_str(), d["accountID"], d["orderRef"], d["sinfo"])) else: # 淇敼鏁版嵁 updateDict = {} -- Gitblit v1.8.0