import concurrent.futures import copy import hashlib import http import json import logging import socketserver import time from http.server import BaseHTTPRequestHandler import urllib.parse as urlparse import psutil import constant from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data from db.redis_manager_delegate import RedisUtils from log_module import log_export, async_log_util from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api from strategy import data_cache from strategy.kpl_data_manager import KPLMarketsSiftPlateLogManager, KPLMarketStockHeatLogManager from strategy.trade_setting import TradeSetting from trade import huaxin_trade_api, huaxin_trade_data_update from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager from utils import tool, huaxin_util, socket_util class DataServer(BaseHTTPRequestHandler): __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 禁用日志输出 def log_message(self, format, *args): pass def do_GET(self): def get_params(url): return dict([(k, v[0]) for k, v in urlparse.parse_qs(url.query).items()]) path = self.path url = urlparse.urlparse(path) params_dict = get_params(url) response_data = "" if url.path == "/get_position_list": # 获取持仓列表 results = PositionManager.get_position_cache() results = copy.deepcopy(results) for r in results: r["auto_sell"] = 1 if r["securityID"] in data_cache.LIMIT_UP_SELL_CODES else 0 response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_money": # 获取资金信息 result = MoneyManager.get_cache() response_data = json.dumps({"code": 0, "data": result}) elif url.path == "/get_deal_list": # 获取成交列表 results = DealRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d")) if results: for d in results: d["securityName"] = data_cache.DataCache().code_name_dict.get(tool.get_symbol(d["securityID"])) response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_delegate_list": # 获取委托列表 # 是否可撤单,如果不传默认拉取所有 print("参数", params_dict) can_cancel = params_dict.get("can_cancel") order_status = [] if can_cancel is not None: if int(can_cancel): print("获取未结委托") order_status = [huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Unknown, huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded] else: print("获取已结委托") order_status = [huaxin_util.TORA_TSTP_OST_AllTraded, huaxin_util.TORA_TSTP_OST_PartTradeCanceled, huaxin_util.TORA_TSTP_OST_AllCanceled, huaxin_util.TORA_TSTP_OST_Rejected] results = DelegateRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=order_status) response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/refresh_trade_data": # 刷新交易数据 _type = params_dict.get("type") if _type == "money": huaxin_trade_data_update.add_money_list() elif _type == "delegate": huaxin_trade_data_update.add_delegate_list("手动刷新") elif _type == "deal": huaxin_trade_data_update.add_deal_list() elif _type == "position": huaxin_trade_data_update.add_position_list() response_data = json.dumps({"code": 0, "data": {}}) elif url.path == "/get_market_info": # 获取市场行情信息 codes_str = params_dict.get("codes") codes = json.loads(codes_str) fdatas = [] for code in codes: data = data_cache.latest_code_market_info_dict.get(code) # logger_debug.info(f"获取L1行情接口:{code}-{data}") if data: fdatas.append(data) response_data = json.dumps({"code": 0, "data": fdatas}) elif url.path == "/get_buy_money": # 获取每次买入的金额 money = data_cache.BUY_MONEY_PER_CODE response_data = json.dumps({"code": 0, "data": {"money": money}}) elif url.path == "/get_trade_settings": fdata = {"running": TradeSetting().get_running(), "auto_sell": TradeSetting().get_auto_sell(), "auto_buy": TradeSetting().get_auto_buy()} response_data = json.dumps({"code": 0, "data": fdata}) elif url.path == "/set_trade_settings": running = params_dict.get("running") auto_sell = params_dict.get("auto_sell") auto_buy = params_dict.get("auto_buy") if running is not None: TradeSetting().set_running(int(running)) if auto_sell is not None: TradeSetting().set_auto_sell(int(auto_sell)) if auto_buy is not None: TradeSetting().set_auto_buy(int(auto_buy)) response_data = json.dumps({"code": 0, "data": {}}) elif url.path == "/get_env": request_id = params_dict.get("request_id") use_time_list = [] try: __start_time = time.time() fdata = {} # try: # date = HistoryKDatasUtils.get_trading_dates(tool.date_sub(tool.get_now_date_str(), 10), # tool.get_now_date_str()) # if date: # fdata["juejin"] = 1 # except Exception as e: # fdata["juejin"] = 0 # fdata["kpl"] = {} # # 获取开盘啦数据 # kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value, # KPLDataType.INDUSTRY_RANK.value] # for kpl_type in kpl_types: # if kpl_type in KPLDataManager.kpl_data_update_info: # fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type) try: # 验证redis RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test") fdata["redis"] = 1 except: fdata["redis"] = 0 use_time_list.append(("验证redis", time.time() - __start_time)) try: # 验证mysql mysql_data.Mysqldb().select_one("select 1") fdata["mysql"] = 1 except: fdata["mysql"] = 0 use_time_list.append(("验证mysql", time.time() - __start_time)) try: # redis异步任务数量 fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count() except: pass use_time_list.append(("验证异步任务数量", time.time() - __start_time)) # 获取交易通道 try: can_access = huaxin_trade_api.test_trade_channel() fdata["trade_channel_access"] = 1 if can_access else 0 except Exception as e: logger_debug.exception(e) fdata["trade_channel_access"] = 0 use_time_list.append(("验证交易通道", time.time() - __start_time)) # 获取CPU与内存适用情况 memory_info = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=1) fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent} use_time_list.append(("获取设备资源占用", time.time() - __start_time)) # 获取交易通道 result = {"code": 0, "data": fdata, "msg": ""} # print("OnGetEnvInfo 成功") response_data = json.dumps(result) except Exception as e: response_data = json.dumps({"code": 1, "msg": str(e)}) logger_debug.error(f"环境获取异常:{request_id}") logger_debug.exception(e) finally: if use_time_list and use_time_list[-1][1] > 10: logger_debug.warning(f"环境获取时间大于10s({request_id}):{use_time_list}") # 获取板块强度数据 elif url.path == "/load_get_kpl_market_sift_plate": # 加载数据 KPLMarketsSiftPlateLogManager().load_data() response_data = json.dumps({"code": 0, "msg": "暂无内容"}) elif url.path == "/get_kpl_market_sift_plate": # 获取开盘啦流入板块详细信息 print("==========get_kpl_market_sift_plate==========") try: time_str = params_dict.get("time") if not time_str: time_str = tool.get_now_time_str() fdatas = KPLMarketsSiftPlateLogManager().get_filter_log_datas() response_data = json.dumps({"code": 1, "msg": "暂无内容"}) for i in range(len(fdatas) - 1, -1, -1): if fdatas[i][0] <= time_str: response_data = json.dumps({"code": 0, "data": fdatas[i]}) break except Exception as e: logging.exception(e) response_data = json.dumps({"code": 1, "msg": str(e)}) # 获取个股强度数据 elif url.path == "/load_kpl_market_stock_heat": # 加载数据 KPLMarketStockHeatLogManager().load_data() response_data = json.dumps({"code": 0, "msg": "暂无内容"}) elif url.path == "/get_kpl_market_stock_heat": # 获取开盘啦流入板块详细信息 print("==========get_kpl_stock_of_markets_plate==========") try: time_str = params_dict.get("time") if not time_str: time_str = tool.get_now_time_str() fdatas = KPLMarketStockHeatLogManager().get_filter_log_datas() response_data = json.dumps({"code": 1, "msg": "暂无内容"}) for i in range(len(fdatas) - 1, -1, -1): if fdatas[i][0] <= time_str: response_data = json.dumps({"code": 0, "data": fdatas[i]}) break except Exception as e: logging.exception(e) response_data = json.dumps({"code": 1, "msg": str(e)}) elif url.path == "/get_kpl_market_strong_records": # 获取开盘啦市场强度记录 time_str = params_dict.get("time") if not time_str: time_str = tool.get_now_time_str() datas = log_export.load_kpl_market_strong() fdatas = [] for data in datas: # (距离09:15:00的秒数, 时间, 强度) fdatas.append((tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1])) response_data = json.dumps({"code": 0, "data": fdatas}) self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) @classmethod def __is_sign_right(cls, params): ps = [] for k, v in params.items(): if k == 'sign': continue ps.append(f"{k}={v}") ps.sort() source_str = "&".join(ps) + "!@#lowSU*^cTion8888" md5_hash = hashlib.md5() # 将字符串编码为字节并更新哈希对象 md5_hash.update(source_str.encode('utf-8')) # 获取十六进制表示的哈希值 md5_hexdigest = md5_hash.hexdigest() if md5_hexdigest == params.get("sign"): return True return False def do_POST(self): result_str = "" try: path = self.path print("接收到POST请求:", str(path)) url = urlparse.urlparse(path) if url.path == "/trade_callback": if constant.IS_SIMULATED_TRADE: # 接受开盘啦数据 body = self.__parse_request() if type(body) != str: huaxin_trade_api.add_trade_callback_data(json.dumps(body)) else: huaxin_trade_api.add_trade_callback_data(body) result_str = json.dumps({"code": 0}) elif url.path == "/buy": # 签名验证 params = self.__parse_request() if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return print("买入", params) logger_request_api.info(f"买入:{params}") # 买入 code = params.get("code") # 代码 volume = params.get("volume") # 量 price = params.get("price") if not price: # 没有上传价格,就需要获取最近的价格进行买入 data = data_cache.latest_code_market_info_dict.get(code) if not data: raise Exception("没有获取到L1数据") pre_price = data[1] current_price = data[2] if data[2] else data[5][0][0] price = tool.get_buy_max_price(current_price) price = min(price, tool.get_limit_up_price(code, pre_price)) else: price = round(float(params.get("price")), 2) # 价格 result = huaxin_trade_api.order(1, code, volume, price, blocking=True) result_str = json.dumps(result) elif url.path == "/sell": params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 try: print("卖出", params) code = params.get("code") # 代码 volume = params.get("volume") # 量 price = params.get("price") if not price: # 没有上传价格,就需要获取最近的价格进行买入 data = data_cache.latest_code_market_info_dict.get(code) if not data: raise Exception("没有获取到L1数据") pre_price = data[1] current_price = data[2] if data[2] else data[5][0][0] # 获取最新成交价格 latest_deal_price = data_cache.latest_deal_price_dict.get(code) if latest_deal_price: current_price = round(float(latest_deal_price), 2) async_log_util.info(logger_debug, f"根据成交价卖出:{code}-{latest_deal_price}") price = tool.get_buy_min_price(current_price) price = max(price, tool.get_limit_down_price(code, pre_price)) else: price = round(params.get("price"), 2) # 价格 result = huaxin_trade_api.order(2, code, volume, price, blocking=True) result_str = json.dumps(result) finally: logger_request_api.info(f"卖出:{params}") elif url.path == "/set_buy_money": # 设置每次买入的金额 params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 print("每次买入的金额", params) money = params.get("money") # 金额 if money is None: result_str = json.dumps({"code": 1, "msg": "未上传金额"}) return money = int(money) logger_debug.info(f"设置开仓金额:{money}") data_cache.BUY_MONEY_PER_CODE = money result_str = json.dumps({"code": 0}) elif url.path == "/set_limit_up_sell": # 设置每次买入的金额 params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 print("每次买入的金额", params) code = params.get("code") #代码 enable = params.get("enable") # 是否开启 if code is None or enable is None: result_str = json.dumps({"code": 1, "msg": "上传数据缺失"}) return enable = int(enable) if enable: data_cache.LIMIT_UP_SELL_CODES.add(code) else: data_cache.LIMIT_UP_SELL_CODES.discard(code) result_str = json.dumps({"code": 0}) elif url.path == "/cancel_order": params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 print("撤单", params) direction = params.get("direction") code = params.get("code") # 代码 orderSysID = params.get("orderSysID") # 系统订单编号 result = huaxin_trade_api.cancel_order(direction, code, orderSysID, blocking=True) result_str = json.dumps(result) elif url.path == "/upload_deal_big_orders": # 成交大单传递 datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") datas = json.loads(_str) for d in datas: if d[1] != 0: continue code, data = d[0], d[2] if code not in data_cache.big_order_deal_dict: data_cache.big_order_deal_dict[code] = [] data_cache.big_order_deal_dict[code].append(d) # 获取买大单数量 len(data_cache.big_order_deal_dict.get(code, [])) hx_logger_l2_transaction.info(_str) # 记录日志 result_str = json.dumps({"code": 0}) except Exception as e: result_str = json.dumps({"code": 1, "msg": str(e)}) finally: self.__send_response(result_str) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) try: params = json.loads(_str) return params except: return _str class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def run(addr="0.0.0.0", port=12881): handler = DataServer try: httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: pass if __name__ == "__main__": run()