From 6fbaa1012ca1b10689f7baad3a5e808ffc3c06b2 Mon Sep 17 00:00:00 2001 From: admin <admin@example.com> Date: 星期五, 20 六月 2025 14:13:11 +0800 Subject: [PATCH] 1.精选流入09:25起开始进入数据 2.修复日志数据的None BUG 3.分离出手动拉黑后的决策日志和手动拉黑前的决策日志 --- data_server.py | 313 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 287 insertions(+), 26 deletions(-) diff --git a/data_server.py b/data_server.py index e93689c..3e39b96 100644 --- a/data_server.py +++ b/data_server.py @@ -1,15 +1,28 @@ import concurrent.futures +import copy import hashlib import http import json +import logging import socketserver +import time from http.server import BaseHTTPRequestHandler import urllib.parse as urlparse +import psutil + +import constant +from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data +from db.redis_manager_delegate import RedisUtils +from log_module import log_export, async_log_util +from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api, logger_system from strategy import data_cache +from strategy.forbidden_plates_manager import ForbiddenPlatesManager +from strategy.kpl_data_manager import KPLMarketsSiftPlateLogManager, KPLMarketStockHeatLogManager +from strategy.trade_setting import TradeSetting from trade import huaxin_trade_api, huaxin_trade_data_update from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager -from utils import tool, huaxin_util +from utils import tool, huaxin_util, socket_util class DataServer(BaseHTTPRequestHandler): @@ -31,6 +44,9 @@ if url.path == "/get_position_list": # 鑾峰彇鎸佷粨鍒楄〃 results = PositionManager.get_position_cache() + results = copy.deepcopy(results) + for r in results: + r["auto_sell"] = 1 if r["securityID"] in data_cache.LIMIT_UP_SELL_CODES else 0 response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_money": # 鑾峰彇璧勯噾淇℃伅 @@ -82,9 +98,164 @@ fdatas = [] for code in codes: data = data_cache.latest_code_market_info_dict.get(code) + # logger_debug.info(f"鑾峰彇L1琛屾儏鎺ュ彛锛歿code}-{data}") if data: fdatas.append(data) response_data = json.dumps({"code": 0, "data": fdatas}) + elif url.path == "/get_buy_money": + # 鑾峰彇姣忔涔板叆鐨勯噾棰� + money = data_cache.BUY_MONEY_PER_CODE + response_data = json.dumps({"code": 0, "data": {"money": money}}) + elif url.path == "/get_trade_settings": + fdata = {"running": TradeSetting().get_running(), "auto_sell": TradeSetting().get_auto_sell(), + "auto_buy": TradeSetting().get_auto_buy()} + response_data = json.dumps({"code": 0, "data": fdata}) + + elif url.path == "/get_env": + request_id = params_dict.get("request_id") + use_time_list = [] + try: + __start_time = time.time() + fdata = {} + # try: + # date = HistoryKDatasUtils.get_trading_dates(tool.date_sub(tool.get_now_date_str(), 10), + # tool.get_now_date_str()) + # if date: + # fdata["juejin"] = 1 + # except Exception as e: + # fdata["juejin"] = 0 + # fdata["kpl"] = {} + # # 鑾峰彇寮�鐩樺暒鏁版嵁 + # kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value, + # KPLDataType.INDUSTRY_RANK.value] + # for kpl_type in kpl_types: + # if kpl_type in KPLDataManager.kpl_data_update_info: + # fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type) + + try: + # 楠岃瘉redis + RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test") + fdata["redis"] = 1 + except: + fdata["redis"] = 0 + use_time_list.append(("楠岃瘉redis", time.time() - __start_time)) + + try: + # 楠岃瘉mysql + mysql_data.Mysqldb().select_one("select 1") + fdata["mysql"] = 1 + except: + fdata["mysql"] = 0 + use_time_list.append(("楠岃瘉mysql", time.time() - __start_time)) + + try: + # redis寮傛浠诲姟鏁伴噺 + fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count() + except: + pass + use_time_list.append(("楠岃瘉寮傛浠诲姟鏁伴噺", time.time() - __start_time)) + + # 鑾峰彇浜ゆ槗閫氶亾 + try: + can_access = huaxin_trade_api.test_trade_channel() + fdata["trade_channel_access"] = 1 if can_access else 0 + except Exception as e: + logger_debug.exception(e) + fdata["trade_channel_access"] = 0 + use_time_list.append(("楠岃瘉浜ゆ槗閫氶亾", time.time() - __start_time)) + + # 鑾峰彇CPU涓庡唴瀛橀�傜敤鎯呭喌 + memory_info = psutil.virtual_memory() + cpu_percent = psutil.cpu_percent(interval=1) + fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent} + + use_time_list.append(("鑾峰彇璁惧璧勬簮鍗犵敤", time.time() - __start_time)) + # 鑾峰彇浜ゆ槗閫氶亾 + result = {"code": 0, "data": fdata, "msg": ""} + # print("OnGetEnvInfo 鎴愬姛") + response_data = json.dumps(result) + except Exception as e: + response_data = json.dumps({"code": 1, "msg": str(e)}) + logger_debug.error(f"鐜鑾峰彇寮傚父锛歿request_id}") + logger_debug.exception(e) + finally: + if use_time_list and use_time_list[-1][1] > 10: + logger_debug.warning(f"鐜鑾峰彇鏃堕棿澶т簬10s({request_id}):{use_time_list}") + # 鑾峰彇鏉垮潡寮哄害鏁版嵁 + elif url.path == "/load_kpl_market_sift_plate": + # 鍔犺浇鏁版嵁 + KPLMarketsSiftPlateLogManager().load_data() + response_data = json.dumps({"code": 0, "msg": "鏆傛棤鍐呭"}) + elif url.path == "/get_kpl_market_sift_plate": + # 鑾峰彇寮�鐩樺暒娴佸叆鏉垮潡璇︾粏淇℃伅 + print("==========get_kpl_market_sift_plate==========") + try: + time_str = params_dict.get("time") + if not time_str: + time_str = tool.get_now_time_str() + fdatas = KPLMarketsSiftPlateLogManager().get_filter_log_datas() + response_data = json.dumps({"code": 1, "msg": "鏆傛棤鍐呭"}) + for i in range(len(fdatas) - 1, -1, -1): + if fdatas[i][0] <= time_str: + response_data = json.dumps({"code": 0, "data": fdatas[i]}) + break + except Exception as e: + logging.exception(e) + response_data = json.dumps({"code": 1, "msg": str(e)}) + + # 鑾峰彇涓偂寮哄害鏁版嵁 + elif url.path == "/load_kpl_market_stock_heat": + # 鍔犺浇鏁版嵁 + KPLMarketStockHeatLogManager().load_data() + response_data = json.dumps({"code": 0, "msg": "鏆傛棤鍐呭"}) + elif url.path == "/get_kpl_market_stock_heat": + # 鑾峰彇寮�鐩樺暒娴佸叆鏉垮潡璇︾粏淇℃伅 + print("==========get_kpl_stock_of_markets_plate==========") + try: + time_str = params_dict.get("time") + if not time_str: + time_str = tool.get_now_time_str() + fdatas = KPLMarketStockHeatLogManager().get_filter_log_datas() + response_data = json.dumps({"code": 1, "msg": "鏆傛棤鍐呭"}) + for i in range(len(fdatas) - 1, -1, -1): + if fdatas[i][0] <= time_str: + response_data = json.dumps({"code": 0, "data": fdatas[i]}) + break + except Exception as e: + logging.exception(e) + response_data = json.dumps({"code": 1, "msg": str(e)}) + elif url.path == "/get_kpl_market_strong_records": + # 鑾峰彇寮�鐩樺暒甯傚満寮哄害璁板綍 + time_str = params_dict.get("time") + if not time_str: + time_str = tool.get_now_time_str() + datas = log_export.load_kpl_market_strong() + fdatas = [] + for data in datas: + # (璺濈09:15:00鐨勭鏁�, 鏃堕棿, 寮哄害) + fdatas.append((tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1])) + response_data = json.dumps({"code": 0, "data": fdatas}) + elif url.path == "/get_place_order_records": + datas = data_cache.purchased_stocks_details_list + response_data = json.dumps({"code": 0, "data": datas}) + elif url.path == "/get_forbidden_plates": + datas = ForbiddenPlatesManager().list_plates() + # human 璁や负璁剧疆 fixed: 鍥哄畾鐨� + response_data = json.dumps( + {"code": 0, "data": {"human": list(datas), "fixed": list(constant.check_plate_list)}}) + elif url.path == "/add_forbidden_plate": + plate = params_dict.get("plate") + ForbiddenPlatesManager().add_plate(plate) + response_data = json.dumps({"code": 0, "data": {}}) + elif url.path == "/remove_forbidden_plate": + plate = params_dict.get("plate") + ForbiddenPlatesManager().remove_plate(plate) + response_data = json.dumps({"code": 0, "data": {}}) + elif url.path == "/get_market_sift_plate_stock_dict": + # 鑾峰彇寮�鐩樺暒鏉垮潡绮鹃�夋祦鍏� + data = data_cache.market_sift_plates + response_data = json.dumps({"code": 0, "data": data}) + self.send_response(200) # 鍙戠粰璇锋眰瀹㈡埛绔殑鍝嶅簲鏁版嵁 self.send_header('Content-type', 'application/json') @@ -116,13 +287,31 @@ print("鎺ユ敹鍒癙OST璇锋眰锛�", str(path)) url = urlparse.urlparse(path) if url.path == "/trade_callback": - # 鎺ュ彈寮�鐩樺暒鏁版嵁 - body = self.__parse_request() - if type(body) != str: - huaxin_trade_api.add_trade_callback_data(json.dumps(body)) - else: - huaxin_trade_api.add_trade_callback_data(body) + if constant.IS_SIMULATED_TRADE: + # 鎺ュ彈寮�鐩樺暒鏁版嵁 + body = self.__parse_request() + if type(body) != str: + huaxin_trade_api.add_trade_callback_data(json.dumps(body)) + else: + huaxin_trade_api.add_trade_callback_data(body) result_str = json.dumps({"code": 0}) + elif url.path == "/set_trade_settings": + params = self.__parse_request() + if not self.__is_sign_right(params): + result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"}) + return + logger_debug.info(f"set_trade_settings: {params}") + running = params.get("running") + auto_sell = params.get("auto_sell") + auto_buy = params.get("auto_buy") + if running is not None: + TradeSetting().set_running(int(running)) + if auto_sell is not None: + TradeSetting().set_auto_sell(int(auto_sell)) + if auto_buy is not None: + TradeSetting().set_auto_buy(int(auto_buy)) + result_str = json.dumps({"code": 0, "data": {}}) + elif url.path == "/buy": # 绛惧悕楠岃瘉 params = self.__parse_request() @@ -130,6 +319,7 @@ result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"}) return print("涔板叆", params) + logger_request_api.info(f"涔板叆锛歿params}") # 涔板叆 code = params.get("code") # 浠g爜 volume = params.get("volume") # 閲� @@ -142,9 +332,9 @@ pre_price = data[1] current_price = data[2] if data[2] else data[5][0][0] price = tool.get_buy_max_price(current_price) - price = min(price, tool.get_limit_up_price(code,pre_price)) + price = min(price, tool.get_limit_up_price(code, pre_price)) else: - price = round(params.get("price"), 2) # 浠锋牸 + price = round(float(params.get("price")), 2) # 浠锋牸 result = huaxin_trade_api.order(1, code, volume, price, blocking=True) result_str = json.dumps(result) elif url.path == "/sell": @@ -154,23 +344,73 @@ result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"}) return # 鍗栧嚭 - print("鍗栧嚭", params) - code = params.get("code") # 浠g爜 - volume = params.get("volume") # 閲� - price = params.get("price") - if not price: - # 娌℃湁涓婁紶浠锋牸锛屽氨闇�瑕佽幏鍙栨渶杩戠殑浠锋牸杩涜涔板叆 - data = data_cache.latest_code_market_info_dict.get(code) - if not data: - raise Exception("娌℃湁鑾峰彇鍒癓1鏁版嵁") - pre_price = data[1] - current_price = data[2] if data[2] else data[5][0][0] - price = tool.get_buy_min_price(current_price) - price = max(price, tool.get_limit_down_price(code, pre_price)) + try: + print("鍗栧嚭", params) + code = params.get("code") # 浠g爜 + volume = params.get("volume") # 閲� + price = params.get("price") + if not price: + # 娌℃湁涓婁紶浠锋牸锛屽氨闇�瑕佽幏鍙栨渶杩戠殑浠锋牸杩涜涔板叆 + data = data_cache.latest_code_market_info_dict.get(code) + if not data: + raise Exception("娌℃湁鑾峰彇鍒癓1鏁版嵁") + pre_price = data[1] + current_price = data[2] if data[2] else data[5][0][0] + # 鑾峰彇鏈�鏂版垚浜や环鏍� + latest_deal_price = data_cache.latest_deal_price_dict.get(code) + if latest_deal_price: + current_price = round(float(latest_deal_price), 2) + async_log_util.info(logger_debug, f"鏍规嵁鎴愪氦浠峰崠鍑猴細{code}-{latest_deal_price}") + + price = tool.get_buy_min_price(current_price) + price = max(price, tool.get_limit_down_price(code, pre_price)) + else: + price = round(params.get("price"), 2) # 浠锋牸 + result = huaxin_trade_api.order(2, code, volume, price, blocking=True) + result_str = json.dumps(result) + finally: + logger_request_api.info(f"鍗栧嚭锛歿params}") + + elif url.path == "/set_buy_money": + # 璁剧疆姣忔涔板叆鐨勯噾棰� + params = self.__parse_request() + # 绛惧悕楠岃瘉 + if not self.__is_sign_right(params): + result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"}) + return + # 鍗栧嚭 + print("姣忔涔板叆鐨勯噾棰�", params) + money = params.get("money") # 閲戦 + if money is None: + result_str = json.dumps({"code": 1, "msg": "鏈笂浼犻噾棰�"}) + return + money = int(money) + + logger_debug.info(f"璁剧疆寮�浠撻噾棰濓細{money}") + data_cache.BUY_MONEY_PER_CODE = money + result_str = json.dumps({"code": 0}) + + elif url.path == "/set_limit_up_sell": + # 璁剧疆姣忔涔板叆鐨勯噾棰� + params = self.__parse_request() + # 绛惧悕楠岃瘉 + if not self.__is_sign_right(params): + result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"}) + return + # 鍗栧嚭 + print("姣忔涔板叆鐨勯噾棰�", params) + code = params.get("code") #浠g爜 + enable = params.get("enable") # 鏄惁寮�鍚� + if code is None or enable is None: + result_str = json.dumps({"code": 1, "msg": "涓婁紶鏁版嵁缂哄け"}) + return + enable = int(enable) + if enable: + data_cache.LIMIT_UP_SELL_CODES.add(code) else: - price = round(params.get("price"), 2) # 浠锋牸 - result = huaxin_trade_api.order(2, code, volume, price, blocking=True) - result_str = json.dumps(result) + data_cache.LIMIT_UP_SELL_CODES.discard(code) + result_str = json.dumps({"code": 0}) + elif url.path == "/cancel_order": params = self.__parse_request() # 绛惧悕楠岃瘉 @@ -184,6 +424,23 @@ orderSysID = params.get("orderSysID") # 绯荤粺璁㈠崟缂栧彿 result = huaxin_trade_api.cancel_order(direction, code, orderSysID, blocking=True) result_str = json.dumps(result) + elif url.path == "/upload_deal_big_orders": + # 鎴愪氦澶у崟浼犻�� + datas = self.rfile.read(int(self.headers['content-length'])) + _str = str(datas, encoding="gbk") + datas = json.loads(_str) + for d in datas: + if d[1] != 0: + continue + code, data = d[0], d[2] + if code not in data_cache.big_order_deal_dict: + data_cache.big_order_deal_dict[code] = [] + data_cache.big_order_deal_dict[code].append(d) + # 鑾峰彇涔板ぇ鍗曟暟閲� + len(data_cache.big_order_deal_dict.get(code, [])) + hx_logger_l2_transaction.info(_str) + # 璁板綍鏃ュ織 + result_str = json.dumps({"code": 0}) except Exception as e: result_str = json.dumps({"code": 1, "msg": str(e)}) finally: @@ -219,4 +476,8 @@ print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: - pass + logger_system.exception(e) + + +if __name__ == "__main__": + run() -- Gitblit v1.8.0