From 6fbaa1012ca1b10689f7baad3a5e808ffc3c06b2 Mon Sep 17 00:00:00 2001
From: admin <admin@example.com>
Date: 星期五, 20 六月 2025 14:13:11 +0800
Subject: [PATCH] 1.精选流入09:25起开始进入数据 2.修复日志数据的None BUG 3.分离出手动拉黑后的决策日志和手动拉黑前的决策日志

---
 data_server.py |  313 +++++++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 287 insertions(+), 26 deletions(-)

diff --git a/data_server.py b/data_server.py
index e93689c..3e39b96 100644
--- a/data_server.py
+++ b/data_server.py
@@ -1,15 +1,28 @@
 import concurrent.futures
+import copy
 import hashlib
 import http
 import json
+import logging
 import socketserver
+import time
 from http.server import BaseHTTPRequestHandler
 import urllib.parse as urlparse
 
+import psutil
+
+import constant
+from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
+from db.redis_manager_delegate import RedisUtils
+from log_module import log_export, async_log_util
+from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api, logger_system
 from strategy import data_cache
+from strategy.forbidden_plates_manager import ForbiddenPlatesManager
+from strategy.kpl_data_manager import KPLMarketsSiftPlateLogManager, KPLMarketStockHeatLogManager
+from strategy.trade_setting import TradeSetting
 from trade import huaxin_trade_api, huaxin_trade_data_update
 from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager
-from utils import tool, huaxin_util
+from utils import tool, huaxin_util, socket_util
 
 
 class DataServer(BaseHTTPRequestHandler):
@@ -31,6 +44,9 @@
         if url.path == "/get_position_list":
             # 鑾峰彇鎸佷粨鍒楄〃
             results = PositionManager.get_position_cache()
+            results = copy.deepcopy(results)
+            for r in results:
+                r["auto_sell"] = 1 if r["securityID"] in data_cache.LIMIT_UP_SELL_CODES else 0
             response_data = json.dumps({"code": 0, "data": results})
         elif url.path == "/get_money":
             # 鑾峰彇璧勯噾淇℃伅
@@ -82,9 +98,164 @@
             fdatas = []
             for code in codes:
                 data = data_cache.latest_code_market_info_dict.get(code)
+                # logger_debug.info(f"鑾峰彇L1琛屾儏鎺ュ彛锛歿code}-{data}")
                 if data:
                     fdatas.append(data)
             response_data = json.dumps({"code": 0, "data": fdatas})
+        elif url.path == "/get_buy_money":
+            # 鑾峰彇姣忔涔板叆鐨勯噾棰�
+            money = data_cache.BUY_MONEY_PER_CODE
+            response_data = json.dumps({"code": 0, "data": {"money": money}})
+        elif url.path == "/get_trade_settings":
+            fdata = {"running": TradeSetting().get_running(), "auto_sell": TradeSetting().get_auto_sell(),
+                     "auto_buy": TradeSetting().get_auto_buy()}
+            response_data = json.dumps({"code": 0, "data": fdata})
+
+        elif url.path == "/get_env":
+            request_id = params_dict.get("request_id")
+            use_time_list = []
+            try:
+                __start_time = time.time()
+                fdata = {}
+                # try:
+                #     date = HistoryKDatasUtils.get_trading_dates(tool.date_sub(tool.get_now_date_str(), 10),
+                #                                                 tool.get_now_date_str())
+                #     if date:
+                #         fdata["juejin"] = 1
+                # except Exception as e:
+                #     fdata["juejin"] = 0
+                # fdata["kpl"] = {}
+                # # 鑾峰彇寮�鐩樺暒鏁版嵁
+                # kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
+                #              KPLDataType.INDUSTRY_RANK.value]
+                # for kpl_type in kpl_types:
+                #     if kpl_type in KPLDataManager.kpl_data_update_info:
+                #         fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
+
+                try:
+                    # 楠岃瘉redis
+                    RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
+                    fdata["redis"] = 1
+                except:
+                    fdata["redis"] = 0
+                use_time_list.append(("楠岃瘉redis", time.time() - __start_time))
+
+                try:
+                    # 楠岃瘉mysql
+                    mysql_data.Mysqldb().select_one("select 1")
+                    fdata["mysql"] = 1
+                except:
+                    fdata["mysql"] = 0
+                use_time_list.append(("楠岃瘉mysql", time.time() - __start_time))
+
+                try:
+                    # redis寮傛浠诲姟鏁伴噺
+                    fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
+                except:
+                    pass
+                use_time_list.append(("楠岃瘉寮傛浠诲姟鏁伴噺", time.time() - __start_time))
+
+                # 鑾峰彇浜ゆ槗閫氶亾
+                try:
+                    can_access = huaxin_trade_api.test_trade_channel()
+                    fdata["trade_channel_access"] = 1 if can_access else 0
+                except Exception as e:
+                    logger_debug.exception(e)
+                    fdata["trade_channel_access"] = 0
+                use_time_list.append(("楠岃瘉浜ゆ槗閫氶亾", time.time() - __start_time))
+
+                # 鑾峰彇CPU涓庡唴瀛橀�傜敤鎯呭喌
+                memory_info = psutil.virtual_memory()
+                cpu_percent = psutil.cpu_percent(interval=1)
+                fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
+
+                use_time_list.append(("鑾峰彇璁惧璧勬簮鍗犵敤", time.time() - __start_time))
+                # 鑾峰彇浜ゆ槗閫氶亾
+                result = {"code": 0, "data": fdata, "msg": ""}
+                # print("OnGetEnvInfo 鎴愬姛")
+                response_data = json.dumps(result)
+            except Exception as e:
+                response_data = json.dumps({"code": 1, "msg": str(e)})
+                logger_debug.error(f"鐜鑾峰彇寮傚父锛歿request_id}")
+                logger_debug.exception(e)
+            finally:
+                if use_time_list and use_time_list[-1][1] > 10:
+                    logger_debug.warning(f"鐜鑾峰彇鏃堕棿澶т簬10s({request_id}):{use_time_list}")
+        # 鑾峰彇鏉垮潡寮哄害鏁版嵁
+        elif url.path == "/load_kpl_market_sift_plate":
+            # 鍔犺浇鏁版嵁
+            KPLMarketsSiftPlateLogManager().load_data()
+            response_data = json.dumps({"code": 0, "msg": "鏆傛棤鍐呭"})
+        elif url.path == "/get_kpl_market_sift_plate":
+            # 鑾峰彇寮�鐩樺暒娴佸叆鏉垮潡璇︾粏淇℃伅
+            print("==========get_kpl_market_sift_plate==========")
+            try:
+                time_str = params_dict.get("time")
+                if not time_str:
+                    time_str = tool.get_now_time_str()
+                fdatas = KPLMarketsSiftPlateLogManager().get_filter_log_datas()
+                response_data = json.dumps({"code": 1, "msg": "鏆傛棤鍐呭"})
+                for i in range(len(fdatas) - 1, -1, -1):
+                    if fdatas[i][0] <= time_str:
+                        response_data = json.dumps({"code": 0, "data": fdatas[i]})
+                        break
+            except Exception as e:
+                logging.exception(e)
+                response_data = json.dumps({"code": 1, "msg": str(e)})
+
+        # 鑾峰彇涓偂寮哄害鏁版嵁
+        elif url.path == "/load_kpl_market_stock_heat":
+            # 鍔犺浇鏁版嵁
+            KPLMarketStockHeatLogManager().load_data()
+            response_data = json.dumps({"code": 0, "msg": "鏆傛棤鍐呭"})
+        elif url.path == "/get_kpl_market_stock_heat":
+            # 鑾峰彇寮�鐩樺暒娴佸叆鏉垮潡璇︾粏淇℃伅
+            print("==========get_kpl_stock_of_markets_plate==========")
+            try:
+                time_str = params_dict.get("time")
+                if not time_str:
+                    time_str = tool.get_now_time_str()
+                fdatas = KPLMarketStockHeatLogManager().get_filter_log_datas()
+                response_data = json.dumps({"code": 1, "msg": "鏆傛棤鍐呭"})
+                for i in range(len(fdatas) - 1, -1, -1):
+                    if fdatas[i][0] <= time_str:
+                        response_data = json.dumps({"code": 0, "data": fdatas[i]})
+                        break
+            except Exception as e:
+                logging.exception(e)
+                response_data = json.dumps({"code": 1, "msg": str(e)})
+        elif url.path == "/get_kpl_market_strong_records":
+            # 鑾峰彇寮�鐩樺暒甯傚満寮哄害璁板綍
+            time_str = params_dict.get("time")
+            if not time_str:
+                time_str = tool.get_now_time_str()
+            datas = log_export.load_kpl_market_strong()
+            fdatas = []
+            for data in datas:
+                # (璺濈09:15:00鐨勭鏁�, 鏃堕棿, 寮哄害)
+                fdatas.append((tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1]))
+            response_data = json.dumps({"code": 0, "data": fdatas})
+        elif url.path == "/get_place_order_records":
+            datas = data_cache.purchased_stocks_details_list
+            response_data = json.dumps({"code": 0, "data": datas})
+        elif url.path == "/get_forbidden_plates":
+            datas = ForbiddenPlatesManager().list_plates()
+            # human 璁や负璁剧疆  fixed: 鍥哄畾鐨�
+            response_data = json.dumps(
+                {"code": 0, "data": {"human": list(datas), "fixed": list(constant.check_plate_list)}})
+        elif url.path == "/add_forbidden_plate":
+            plate = params_dict.get("plate")
+            ForbiddenPlatesManager().add_plate(plate)
+            response_data = json.dumps({"code": 0, "data": {}})
+        elif url.path == "/remove_forbidden_plate":
+            plate = params_dict.get("plate")
+            ForbiddenPlatesManager().remove_plate(plate)
+            response_data = json.dumps({"code": 0, "data": {}})
+        elif url.path == "/get_market_sift_plate_stock_dict":
+            # 鑾峰彇寮�鐩樺暒鏉垮潡绮鹃�夋祦鍏�
+            data = data_cache.market_sift_plates
+            response_data = json.dumps({"code": 0, "data": data})
+
         self.send_response(200)
         # 鍙戠粰璇锋眰瀹㈡埛绔殑鍝嶅簲鏁版嵁
         self.send_header('Content-type', 'application/json')
@@ -116,13 +287,31 @@
             print("鎺ユ敹鍒癙OST璇锋眰锛�", str(path))
             url = urlparse.urlparse(path)
             if url.path == "/trade_callback":
-                # 鎺ュ彈寮�鐩樺暒鏁版嵁
-                body = self.__parse_request()
-                if type(body) != str:
-                    huaxin_trade_api.add_trade_callback_data(json.dumps(body))
-                else:
-                    huaxin_trade_api.add_trade_callback_data(body)
+                if constant.IS_SIMULATED_TRADE:
+                    # 鎺ュ彈寮�鐩樺暒鏁版嵁
+                    body = self.__parse_request()
+                    if type(body) != str:
+                        huaxin_trade_api.add_trade_callback_data(json.dumps(body))
+                    else:
+                        huaxin_trade_api.add_trade_callback_data(body)
                 result_str = json.dumps({"code": 0})
+            elif url.path == "/set_trade_settings":
+                params = self.__parse_request()
+                if not self.__is_sign_right(params):
+                    result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"})
+                    return
+                logger_debug.info(f"set_trade_settings: {params}")
+                running = params.get("running")
+                auto_sell = params.get("auto_sell")
+                auto_buy = params.get("auto_buy")
+                if running is not None:
+                    TradeSetting().set_running(int(running))
+                if auto_sell is not None:
+                    TradeSetting().set_auto_sell(int(auto_sell))
+                if auto_buy is not None:
+                    TradeSetting().set_auto_buy(int(auto_buy))
+                result_str = json.dumps({"code": 0, "data": {}})
+
             elif url.path == "/buy":
                 # 绛惧悕楠岃瘉
                 params = self.__parse_request()
@@ -130,6 +319,7 @@
                     result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"})
                     return
                 print("涔板叆", params)
+                logger_request_api.info(f"涔板叆锛歿params}")
                 # 涔板叆
                 code = params.get("code")  # 浠g爜
                 volume = params.get("volume")  # 閲�
@@ -142,9 +332,9 @@
                     pre_price = data[1]
                     current_price = data[2] if data[2] else data[5][0][0]
                     price = tool.get_buy_max_price(current_price)
-                    price = min(price, tool.get_limit_up_price(code,pre_price))
+                    price = min(price, tool.get_limit_up_price(code, pre_price))
                 else:
-                    price = round(params.get("price"), 2)  # 浠锋牸
+                    price = round(float(params.get("price")), 2)  # 浠锋牸
                 result = huaxin_trade_api.order(1, code, volume, price, blocking=True)
                 result_str = json.dumps(result)
             elif url.path == "/sell":
@@ -154,23 +344,73 @@
                     result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"})
                     return
                 # 鍗栧嚭
-                print("鍗栧嚭", params)
-                code = params.get("code")  # 浠g爜
-                volume = params.get("volume")  # 閲�
-                price = params.get("price")
-                if not price:
-                    # 娌℃湁涓婁紶浠锋牸锛屽氨闇�瑕佽幏鍙栨渶杩戠殑浠锋牸杩涜涔板叆
-                    data = data_cache.latest_code_market_info_dict.get(code)
-                    if not data:
-                        raise Exception("娌℃湁鑾峰彇鍒癓1鏁版嵁")
-                    pre_price = data[1]
-                    current_price = data[2] if data[2] else data[5][0][0]
-                    price = tool.get_buy_min_price(current_price)
-                    price = max(price, tool.get_limit_down_price(code, pre_price))
+                try:
+                    print("鍗栧嚭", params)
+                    code = params.get("code")  # 浠g爜
+                    volume = params.get("volume")  # 閲�
+                    price = params.get("price")
+                    if not price:
+                        # 娌℃湁涓婁紶浠锋牸锛屽氨闇�瑕佽幏鍙栨渶杩戠殑浠锋牸杩涜涔板叆
+                        data = data_cache.latest_code_market_info_dict.get(code)
+                        if not data:
+                            raise Exception("娌℃湁鑾峰彇鍒癓1鏁版嵁")
+                        pre_price = data[1]
+                        current_price = data[2] if data[2] else data[5][0][0]
+                        # 鑾峰彇鏈�鏂版垚浜や环鏍�
+                        latest_deal_price = data_cache.latest_deal_price_dict.get(code)
+                        if latest_deal_price:
+                            current_price = round(float(latest_deal_price), 2)
+                            async_log_util.info(logger_debug, f"鏍规嵁鎴愪氦浠峰崠鍑猴細{code}-{latest_deal_price}")
+
+                        price = tool.get_buy_min_price(current_price)
+                        price = max(price, tool.get_limit_down_price(code, pre_price))
+                    else:
+                        price = round(params.get("price"), 2)  # 浠锋牸
+                    result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
+                    result_str = json.dumps(result)
+                finally:
+                    logger_request_api.info(f"鍗栧嚭锛歿params}")
+
+            elif url.path == "/set_buy_money":
+                # 璁剧疆姣忔涔板叆鐨勯噾棰�
+                params = self.__parse_request()
+                # 绛惧悕楠岃瘉
+                if not self.__is_sign_right(params):
+                    result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"})
+                    return
+                # 鍗栧嚭
+                print("姣忔涔板叆鐨勯噾棰�", params)
+                money = params.get("money")  # 閲戦
+                if money is None:
+                    result_str = json.dumps({"code": 1, "msg": "鏈笂浼犻噾棰�"})
+                    return
+                money = int(money)
+
+                logger_debug.info(f"璁剧疆寮�浠撻噾棰濓細{money}")
+                data_cache.BUY_MONEY_PER_CODE = money
+                result_str = json.dumps({"code": 0})
+
+            elif url.path == "/set_limit_up_sell":
+                # 璁剧疆姣忔涔板叆鐨勯噾棰�
+                params = self.__parse_request()
+                # 绛惧悕楠岃瘉
+                if not self.__is_sign_right(params):
+                    result_str = json.dumps({"code": 1001, "msg": "绛惧悕閿欒"})
+                    return
+                # 鍗栧嚭
+                print("姣忔涔板叆鐨勯噾棰�", params)
+                code = params.get("code")  #浠g爜
+                enable = params.get("enable")  # 鏄惁寮�鍚�
+                if code is None or enable is None:
+                    result_str = json.dumps({"code": 1, "msg": "涓婁紶鏁版嵁缂哄け"})
+                    return
+                enable = int(enable)
+                if enable:
+                    data_cache.LIMIT_UP_SELL_CODES.add(code)
                 else:
-                    price = round(params.get("price"), 2)  # 浠锋牸
-                result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
-                result_str = json.dumps(result)
+                    data_cache.LIMIT_UP_SELL_CODES.discard(code)
+                result_str = json.dumps({"code": 0})
+
             elif url.path == "/cancel_order":
                 params = self.__parse_request()
                 # 绛惧悕楠岃瘉
@@ -184,6 +424,23 @@
                 orderSysID = params.get("orderSysID")  # 绯荤粺璁㈠崟缂栧彿
                 result = huaxin_trade_api.cancel_order(direction, code, orderSysID, blocking=True)
                 result_str = json.dumps(result)
+            elif url.path == "/upload_deal_big_orders":
+                # 鎴愪氦澶у崟浼犻��
+                datas = self.rfile.read(int(self.headers['content-length']))
+                _str = str(datas, encoding="gbk")
+                datas = json.loads(_str)
+                for d in datas:
+                    if d[1] != 0:
+                        continue
+                    code, data = d[0], d[2]
+                    if code not in data_cache.big_order_deal_dict:
+                        data_cache.big_order_deal_dict[code] = []
+                    data_cache.big_order_deal_dict[code].append(d)
+                    # 鑾峰彇涔板ぇ鍗曟暟閲�
+                    len(data_cache.big_order_deal_dict.get(code, []))
+                hx_logger_l2_transaction.info(_str)
+                # 璁板綍鏃ュ織
+                result_str = json.dumps({"code": 0})
         except Exception as e:
             result_str = json.dumps({"code": 1, "msg": str(e)})
         finally:
@@ -219,4 +476,8 @@
         print("HTTP server is at: http://%s:%d/" % (addr, port))
         httpd.serve_forever()
     except Exception as e:
-        pass
+        logger_system.exception(e)
+
+
+if __name__ == "__main__":
+    run()

--
Gitblit v1.8.0