From 287c506725b2d970f721f80169f83c2418cb0991 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期三, 04 六月 2025 18:39:02 +0800 Subject: [PATCH] 添加新版低吸中间服务器 --- middle_api_server.py | 191 +++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 165 insertions(+), 26 deletions(-) diff --git a/middle_api_server.py b/middle_api_server.py index 1a79864..60690fe 100644 --- a/middle_api_server.py +++ b/middle_api_server.py @@ -10,9 +10,13 @@ import constant import socket_manager import trade_manager +from code_attribute.code_price_manager import CodesLimitRateManager from db import mysql_data, redis_manager from db.redis_manager import RedisUtils -from log import logger_request_debug +from log_module import log, request_log_util +from log_module.log import logger_request_debug +from middle_l1_data_server import L1DataManager +from output import push_msg_manager from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util from utils.history_k_data_util import HistoryKDatasUtils, JueJinApi from utils.huaxin_trade_record_manager import PositionManager @@ -65,9 +69,8 @@ # print("鏀跺埌鏁版嵁------", f"{data_str[:20]}......{data_str[-20:]}") data_json = json.loads(data_str) type_ = data_json['type'] - thread_id = random.randint(0, 1000000) try: - logger_request_debug.info(f"middle_api_server 璇锋眰寮�濮�({thread_id})锛歿type_}") + request_log_util.request_info("middle_api_server", f"璇锋眰寮�濮嬶細{type_}") if type(type_) == int: # 澶勭悊鏁板瓧鍨婽YPE return_str = self.process_num_type(sk, type_, data_str) @@ -81,22 +84,59 @@ raise Exception("绛惧悕閿欒") codes_data = data_json["data"] code = codes_data["code"] - volume = codes_data["volume"] - price = codes_data["price"] + money = codes_data.get("money") + volume = codes_data.get("volume") + price = codes_data.get("price") + price_type = codes_data.get("price_type") try: if not code: raise Exception("璇蜂笂浼燾ode") - if not volume: - raise Exception("璇蜂笂浼爒olume") - if round(float(price), 2) <= 0: - prices = HistoryKDatasUtils.get_now_price([code]) - if not prices: - raise Exception("鐜颁环鑾峰彇澶辫触") - price = prices[0][1] + if not price or round(float(price), 2) <= 0: + if price_type is None: + price_type = 0 + # 榛樿涓虹瀛愪环 + pre_close = HistoryKDatasUtils.get_gp_latest_info([code], "sec_id,pre_close")[0][ + "pre_close"] + if price_type == 0: # 浠锋牸绗煎瓙 + # 鑾峰彇鐜颁环 + prices = HistoryKDatasUtils.get_now_price([code]) + if not prices: + raise Exception("鐜颁环鑾峰彇澶辫触") + now_price = prices[0][1] + limit_up_price = round( + float(tool.get_limit_up_price_by_preprice(code, pre_close)), + 2) + price = min(tool.get_buy_max_price(now_price), limit_up_price) + elif price_type == 1: # 璺屽仠浠� + limit_down_price = round( + float(tool.get_limit_down_price_by_preprice(code, pre_close)), + 2) + price = limit_down_price + elif price_type == 2: # 娑ㄥ仠浠� + limit_up_price = round( + float(tool.get_limit_up_price_by_preprice(code, pre_close)), + 2) + price = limit_up_price + elif price_type == 3: # 鐜颁环 + prices = HistoryKDatasUtils.get_now_price([code]) + if not prices: + raise Exception("鐜颁环鑾峰彇澶辫触") + now_price = prices[0][1] + price = now_price + elif price_type == 4: # 涔�5浠� + prices = HistoryKDatasUtils.get_now_price([code]) + if not prices: + raise Exception("鐜颁环鑾峰彇澶辫触") + now_price = prices[0][1] + price = now_price - 0.04 + if not volume and money: + volume = (money // int(round(float(price) * 100))) * 100 + if volume < 100: + volume = 100 # 涓嬪崟 - result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code, volume, - round(float(price), 2)) + result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code, + volume, price) if result: resultJSON = result print("涓嬪崟缁撴灉锛�", resultJSON) @@ -157,11 +197,15 @@ break elif type_ == 'common': - # 楠岃瘉绛惧悕 - if not is_sign_right: - raise Exception("绛惧悕閿欒") params = data_json["data"] - result = hosting_api_util.common_request(params) + ctype = params.get("ctype") + trade_sell_types = {"get_current_l1_codes", "get_positions", "get_l2_deal_price", + "buy_cb_for_commission", "sell_cb_for_commission", "get_deal_queue", "auto_cancel_sell_mode"} + if ctype in trade_sell_types: + result = hosting_api_util.common_request(params, + client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) + else: + result = hosting_api_util.common_request(params) return_str = json.dumps(result) break @@ -212,16 +256,41 @@ # 鍚屾浜ゆ槗鏁版嵁 sync_type = data_json["data"]["type"] hosting_api_util.refresh_trade_data(sync_type) + hosting_api_util.refresh_trade_data(sync_type, blocking=False, + client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) elif type_ == "get_huaxin_subscript_codes": # 鑾峰彇鍗庨懌璁㈤槄鐨勪唬鐮� fresults = global_data_cache_util.huaxin_subscript_codes + fdata = [] + try: + # 鑾峰彇褰撳墠娑ㄥ仠姣斾緥 + rate_results_dict = global_data_cache_util.huaxin_subscript_codes_rate + for r in fresults: + fdata.append( + (r[0], r[1], rate_results_dict.get(r[0]) if r[0] in rate_results_dict else 0, + r[2])) + fdata.sort(key=lambda r: r[2], reverse=True) + except: + fdata = fresults update_time = global_data_cache_util.huaxin_subscript_codes_update_time if update_time is None: update_time = '' return_str = json.dumps( - {"code": 0, "data": {"count": len(fresults), "list": fresults, "update_time": update_time}, + {"code": 0, + "data": {"count": len(fresults), "list": fdata, "update_time": update_time}, + "msg": ""}) + pass + elif type_ == "get_huaxin_position_subscript_codes": + # 鑾峰彇鍗庨懌璁㈤槄鐨勪唬鐮� + fresults = global_data_cache_util.huaxin_position_subscript_codes + update_time = global_data_cache_util.huaxin_position_subscript_codes_update_time + if update_time is None: + update_time = '' + return_str = json.dumps( + {"code": 0, + "data": {"count": len(fresults), "list": fresults, "update_time": update_time}, "msg": ""}) pass elif type_ == "export_l2_data": @@ -292,7 +361,7 @@ return_str = json.dumps(result) elif type_ == "trade_server_channels": trade_channels = socket_manager.ClientSocketManager.list_client( - socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE) + socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) common_channels = socket_manager.ClientSocketManager.list_client( socket_manager.ClientSocketManager.CLIENT_TYPE_COMMON) data = {} @@ -320,6 +389,9 @@ elif type_ == "add_sell_rule": result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_ADD, data=data_json["data"]) return_str = json.dumps(result) + elif type_ == "update_sell_rule": + result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_SET, data=data_json["data"]) + return_str = json.dumps(result) elif type_ == "del_sell_rule": id_ = data_json["data"]["id"] result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_DELETE, data={"id": id_}) @@ -331,16 +403,62 @@ code = data_json["data"]["code"] result = hosting_api_util.get_code_position_info(code) return_str = json.dumps(result) - elif type_ == "common": + elif type_ == "register_msg_receiver": params = data_json["data"] - result = hosting_api_util.common_request(params) + msg_types = params["types"] + try: + push_msg_manager.SocketManager.regirster_socket(sk, msg_types) + result = {"code": 0} + except Exception as e: + result = {"code": 1, "msg": str(e)} return_str = json.dumps(result) + sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8'))) + while True: + try: + buf = sk.recv(1024) + print(f"鏀跺埌鏁版嵁锛歿buf.decode(encoding='utf-8')}") + time.sleep(1) + except: + print("鏁版嵁鏂紑") + break + elif type_ == "get_latest_cancel_orders": + # 鑾峰彇鏈�杩戠殑鎾ゅ崟 + results = huaxin_trade_record_manager.DelegateRecordManager.list_latest_cancel_records(10) + fresults = [] + for result in results: + temp = {} + for key in result: + if key in ["securityID", "securityName", "direction", "orderSysID", "acceptTime", + "cancelTime", "limitPrice", "volume"]: + temp[key] = result[key] + # 杩囨护铏氭嫙鍗曚笌鍗栧崟 + if int(temp["volume"] <= 100): + continue + if int(temp["direction"] != 0): + continue + fresults.append(temp) + return_str = json.dumps(json.dumps({"code": 0, "data": fresults})) + elif type_ == "get_buy1_info": + code = data_json["data"]["code"] + result = hosting_api_util.common_request({"ctype": "get_buy1_info", "code": code}, + client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) + return_str = json.dumps(result) + elif type_ == "get_l1_data": + results = L1DataManager().get_current_l1_data() + return_str = json.dumps({"code": 0, "data": results}) + elif type_ == "set_l1_codes": + codes = data_json["data"]["codes"] + # 灏嗕唬鐮佹殏瀛樺埌鏈湴 + # 璁剧疆L1浠g爜 + L1DataManager().save_target_codes(codes) + return_str = json.dumps({"code": 0, "data": {}}) finally: - logger_request_debug.info(f"middle_api_server 璇锋眰缁撴潫({thread_id})锛歿type}") + request_log_util.request_info("middle_api_server", f"璇锋眰缁撴潫锛歿type_}") break # sk.close() except Exception as e: logging.exception(e) + logger_request_debug.exception(e) return_str = json.dumps({"code": 401, "msg": str(e)}) break finally: @@ -448,7 +566,7 @@ return_str = json.dumps({"code": 1, "msg": "涓嶅彲浠ュ彇娑�"}) elif type == 421: - # 鍔犲叆鏆備笉涔� + # 鍔犵孩 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: @@ -456,7 +574,7 @@ return_str = json.dumps({"code": 0}) elif type == 422: - # 绉婚櫎鏆備笉涔� + # 绉荤孩 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: @@ -464,10 +582,31 @@ return_str = json.dumps({"code": 0}) elif type == 423: - # 鏆備笉涔板垪琛� + # 绾㈠崟鍒楄〃 result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_MUST_BUY) return_str = json.dumps(result) + elif type == 441: + # 鍔犵豢 + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_GREEN) + return_str = json.dumps({"code": 0}) + + elif type == 442: + # 绉荤豢 + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_GREEN) + return_str = json.dumps({"code": 0}) + + elif type == 443: + # 缁垮崟鍒楄〃 + result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_GREEN) + return_str = json.dumps(result) + elif type == 430: # 鏌ヨ浠g爜灞炴�� -- Gitblit v1.8.0