admin
2025-06-11 0ee16ce71f0412506d4d33c88ade65878b3719c5
data_server.py
@@ -1,16 +1,27 @@
import concurrent.futures
import copy
import hashlib
import http
import json
import logging
import socketserver
import time
from http.server import BaseHTTPRequestHandler
import urllib.parse as urlparse
from log_module.log import hx_logger_l2_transaction
import psutil
import constant
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from db.redis_manager_delegate import RedisUtils
from log_module import log_export, async_log_util
from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api, logger_system
from strategy import data_cache
from strategy.kpl_data_manager import KPLMarketsSiftPlateLogManager, KPLMarketStockHeatLogManager
from strategy.trade_setting import TradeSetting
from trade import huaxin_trade_api, huaxin_trade_data_update
from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager
from utils import tool, huaxin_util
from utils import tool, huaxin_util, socket_util
class DataServer(BaseHTTPRequestHandler):
@@ -32,6 +43,9 @@
        if url.path == "/get_position_list":
            # 获取持仓列表
            results = PositionManager.get_position_cache()
            results = copy.deepcopy(results)
            for r in results:
                r["auto_sell"] = 1 if r["securityID"] in data_cache.LIMIT_UP_SELL_CODES else 0
            response_data = json.dumps({"code": 0, "data": results})
        elif url.path == "/get_money":
            # 获取资金信息
@@ -83,9 +97,157 @@
            fdatas = []
            for code in codes:
                data = data_cache.latest_code_market_info_dict.get(code)
                # logger_debug.info(f"获取L1行情接口:{code}-{data}")
                if data:
                    fdatas.append(data)
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/get_buy_money":
            # 获取每次买入的金额
            money = data_cache.BUY_MONEY_PER_CODE
            response_data = json.dumps({"code": 0, "data": {"money": money}})
        elif url.path == "/get_trade_settings":
            fdata = {"running": TradeSetting().get_running(), "auto_sell": TradeSetting().get_auto_sell(),
                     "auto_buy": TradeSetting().get_auto_buy()}
            response_data = json.dumps({"code": 0, "data": fdata})
        elif url.path == "/set_trade_settings":
            logger_debug.info(f"set_trade_settings: {params_dict}")
            running = params_dict.get("running")
            auto_sell = params_dict.get("auto_sell")
            auto_buy = params_dict.get("auto_buy")
            if running is not None:
                TradeSetting().set_running(int(running))
            if auto_sell is not None:
                TradeSetting().set_auto_sell(int(auto_sell))
            if auto_buy is not None:
                TradeSetting().set_auto_buy(int(auto_buy))
            response_data = json.dumps({"code": 0, "data": {}})
        elif url.path == "/get_env":
            request_id = params_dict.get("request_id")
            use_time_list = []
            try:
                __start_time = time.time()
                fdata = {}
                # try:
                #     date = HistoryKDatasUtils.get_trading_dates(tool.date_sub(tool.get_now_date_str(), 10),
                #                                                 tool.get_now_date_str())
                #     if date:
                #         fdata["juejin"] = 1
                # except Exception as e:
                #     fdata["juejin"] = 0
                # fdata["kpl"] = {}
                # # 获取开盘啦数据
                # kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
                #              KPLDataType.INDUSTRY_RANK.value]
                # for kpl_type in kpl_types:
                #     if kpl_type in KPLDataManager.kpl_data_update_info:
                #         fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
                try:
                    # 验证redis
                    RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
                    fdata["redis"] = 1
                except:
                    fdata["redis"] = 0
                use_time_list.append(("验证redis", time.time() - __start_time))
                try:
                    # 验证mysql
                    mysql_data.Mysqldb().select_one("select 1")
                    fdata["mysql"] = 1
                except:
                    fdata["mysql"] = 0
                use_time_list.append(("验证mysql", time.time() - __start_time))
                try:
                    # redis异步任务数量
                    fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
                except:
                    pass
                use_time_list.append(("验证异步任务数量", time.time() - __start_time))
                # 获取交易通道
                try:
                    can_access = huaxin_trade_api.test_trade_channel()
                    fdata["trade_channel_access"] = 1 if can_access else 0
                except Exception as e:
                    logger_debug.exception(e)
                    fdata["trade_channel_access"] = 0
                use_time_list.append(("验证交易通道", time.time() - __start_time))
                # 获取CPU与内存适用情况
                memory_info = psutil.virtual_memory()
                cpu_percent = psutil.cpu_percent(interval=1)
                fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
                use_time_list.append(("获取设备资源占用", time.time() - __start_time))
                # 获取交易通道
                result = {"code": 0, "data": fdata, "msg": ""}
                # print("OnGetEnvInfo 成功")
                response_data = json.dumps(result)
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
                logger_debug.error(f"环境获取异常:{request_id}")
                logger_debug.exception(e)
            finally:
                if use_time_list and use_time_list[-1][1] > 10:
                    logger_debug.warning(f"环境获取时间大于10s({request_id}):{use_time_list}")
        # 获取板块强度数据
        elif url.path == "/load_kpl_market_sift_plate":
            # 加载数据
            KPLMarketsSiftPlateLogManager().load_data()
            response_data = json.dumps({"code": 0, "msg": "暂无内容"})
        elif url.path == "/get_kpl_market_sift_plate":
            # 获取开盘啦流入板块详细信息
            print("==========get_kpl_market_sift_plate==========")
            try:
                time_str = params_dict.get("time")
                if not time_str:
                    time_str = tool.get_now_time_str()
                fdatas = KPLMarketsSiftPlateLogManager().get_filter_log_datas()
                response_data = json.dumps({"code": 1, "msg": "暂无内容"})
                for i in range(len(fdatas) - 1, -1, -1):
                    if fdatas[i][0] <= time_str:
                        response_data = json.dumps({"code": 0, "data": fdatas[i]})
                        break
            except Exception as e:
                logging.exception(e)
                response_data = json.dumps({"code": 1, "msg": str(e)})
        # 获取个股强度数据
        elif url.path == "/load_kpl_market_stock_heat":
            # 加载数据
            KPLMarketStockHeatLogManager().load_data()
            response_data = json.dumps({"code": 0, "msg": "暂无内容"})
        elif url.path == "/get_kpl_market_stock_heat":
            # 获取开盘啦流入板块详细信息
            print("==========get_kpl_stock_of_markets_plate==========")
            try:
                time_str = params_dict.get("time")
                if not time_str:
                    time_str = tool.get_now_time_str()
                fdatas = KPLMarketStockHeatLogManager().get_filter_log_datas()
                response_data = json.dumps({"code": 1, "msg": "暂无内容"})
                for i in range(len(fdatas) - 1, -1, -1):
                    if fdatas[i][0] <= time_str:
                        response_data = json.dumps({"code": 0, "data": fdatas[i]})
                        break
            except Exception as e:
                logging.exception(e)
                response_data = json.dumps({"code": 1, "msg": str(e)})
        elif url.path == "/get_kpl_market_strong_records":
            # 获取开盘啦市场强度记录
            time_str = params_dict.get("time")
            if not time_str:
                time_str = tool.get_now_time_str()
            datas = log_export.load_kpl_market_strong()
            fdatas = []
            for data in datas:
                # (距离09:15:00的秒数, 时间, 强度)
                fdatas.append((tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1]))
            response_data = json.dumps({"code": 0, "data": fdatas})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
@@ -117,12 +279,13 @@
            print("接收到POST请求:", str(path))
            url = urlparse.urlparse(path)
            if url.path == "/trade_callback":
                # 接受开盘啦数据
                body = self.__parse_request()
                if type(body) != str:
                    huaxin_trade_api.add_trade_callback_data(json.dumps(body))
                else:
                    huaxin_trade_api.add_trade_callback_data(body)
                if constant.IS_SIMULATED_TRADE:
                    # 接受开盘啦数据
                    body = self.__parse_request()
                    if type(body) != str:
                        huaxin_trade_api.add_trade_callback_data(json.dumps(body))
                    else:
                        huaxin_trade_api.add_trade_callback_data(body)
                result_str = json.dumps({"code": 0})
            elif url.path == "/buy":
                # 签名验证
@@ -131,6 +294,7 @@
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                print("买入", params)
                logger_request_api.info(f"买入:{params}")
                # 买入
                code = params.get("code")  # 代码
                volume = params.get("volume")  # 量
@@ -143,9 +307,9 @@
                    pre_price = data[1]
                    current_price = data[2] if data[2] else data[5][0][0]
                    price = tool.get_buy_max_price(current_price)
                    price = min(price, tool.get_limit_up_price(code,pre_price))
                    price = min(price, tool.get_limit_up_price(code, pre_price))
                else:
                    price = round(params.get("price"), 2)  # 价格
                    price = round(float(params.get("price")), 2)  # 价格
                result = huaxin_trade_api.order(1, code, volume, price, blocking=True)
                result_str = json.dumps(result)
            elif url.path == "/sell":
@@ -155,23 +319,73 @@
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("卖出", params)
                code = params.get("code")  # 代码
                volume = params.get("volume")  # 量
                price = params.get("price")
                if not price:
                    # 没有上传价格,就需要获取最近的价格进行买入
                    data = data_cache.latest_code_market_info_dict.get(code)
                    if not data:
                        raise Exception("没有获取到L1数据")
                    pre_price = data[1]
                    current_price = data[2] if data[2] else data[5][0][0]
                    price = tool.get_buy_min_price(current_price)
                    price = max(price, tool.get_limit_down_price(code, pre_price))
                try:
                    print("卖出", params)
                    code = params.get("code")  # 代码
                    volume = params.get("volume")  # 量
                    price = params.get("price")
                    if not price:
                        # 没有上传价格,就需要获取最近的价格进行买入
                        data = data_cache.latest_code_market_info_dict.get(code)
                        if not data:
                            raise Exception("没有获取到L1数据")
                        pre_price = data[1]
                        current_price = data[2] if data[2] else data[5][0][0]
                        # 获取最新成交价格
                        latest_deal_price = data_cache.latest_deal_price_dict.get(code)
                        if latest_deal_price:
                            current_price = round(float(latest_deal_price), 2)
                            async_log_util.info(logger_debug, f"根据成交价卖出:{code}-{latest_deal_price}")
                        price = tool.get_buy_min_price(current_price)
                        price = max(price, tool.get_limit_down_price(code, pre_price))
                    else:
                        price = round(params.get("price"), 2)  # 价格
                    result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
                    result_str = json.dumps(result)
                finally:
                    logger_request_api.info(f"卖出:{params}")
            elif url.path == "/set_buy_money":
                # 设置每次买入的金额
                params = self.__parse_request()
                # 签名验证
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("每次买入的金额", params)
                money = params.get("money")  # 金额
                if money is None:
                    result_str = json.dumps({"code": 1, "msg": "未上传金额"})
                    return
                money = int(money)
                logger_debug.info(f"设置开仓金额:{money}")
                data_cache.BUY_MONEY_PER_CODE = money
                result_str = json.dumps({"code": 0})
            elif url.path == "/set_limit_up_sell":
                # 设置每次买入的金额
                params = self.__parse_request()
                # 签名验证
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("每次买入的金额", params)
                code = params.get("code")  #代码
                enable = params.get("enable")  # 是否开启
                if code is None or enable is None:
                    result_str = json.dumps({"code": 1, "msg": "上传数据缺失"})
                    return
                enable = int(enable)
                if enable:
                    data_cache.LIMIT_UP_SELL_CODES.add(code)
                else:
                    price = round(params.get("price"), 2)  # 价格
                result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
                result_str = json.dumps(result)
                    data_cache.LIMIT_UP_SELL_CODES.discard(code)
                result_str = json.dumps({"code": 0})
            elif url.path == "/cancel_order":
                params = self.__parse_request()
                # 签名验证
@@ -189,6 +403,16 @@
                # 成交大单传递
                datas = self.rfile.read(int(self.headers['content-length']))
                _str = str(datas, encoding="gbk")
                datas = json.loads(_str)
                for d in datas:
                    if d[1] != 0:
                        continue
                    code, data = d[0], d[2]
                    if code not in data_cache.big_order_deal_dict:
                        data_cache.big_order_deal_dict[code] = []
                    data_cache.big_order_deal_dict[code].append(d)
                    # 获取买大单数量
                    len(data_cache.big_order_deal_dict.get(code, []))
                hx_logger_l2_transaction.info(_str)
                # 记录日志
                result_str = json.dumps({"code": 0})
@@ -227,4 +451,8 @@
        print("HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        pass
        logger_system.exception(e)
if __name__ == "__main__":
    run()