import concurrent.futures import copy import hashlib import http import json import logging import socketserver from http.server import BaseHTTPRequestHandler import urllib.parse as urlparse import psutil from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data from db.redis_manager_delegate import RedisUtils from log_module import log_export from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api from strategy import data_cache from strategy.kpl_data_manager import KPLStockOfMarketsPlateLogManager from strategy.trade_setting import TradeSetting from trade import huaxin_trade_api, huaxin_trade_data_update from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager from utils import tool, huaxin_util, socket_util class DataServer(BaseHTTPRequestHandler): __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 禁用日志输出 def log_message(self, format, *args): pass def do_GET(self): def get_params(url): return dict([(k, v[0]) for k, v in urlparse.parse_qs(url.query).items()]) path = self.path url = urlparse.urlparse(path) params_dict = get_params(url) response_data = "" if url.path == "/get_position_list": # 获取持仓列表 results = PositionManager.get_position_cache() results = copy.deepcopy(results) for r in results: r["auto_sell"] = 1 if r["securityID"] in data_cache.LIMIT_UP_SELL_CODES else 0 response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_money": # 获取资金信息 result = MoneyManager.get_cache() response_data = json.dumps({"code": 0, "data": result}) elif url.path == "/get_deal_list": # 获取成交列表 results = DealRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d")) if results: for d in results: d["securityName"] = data_cache.DataCache().code_name_dict.get(tool.get_symbol(d["securityID"])) response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_delegate_list": # 获取委托列表 # 是否可撤单,如果不传默认拉取所有 print("参数", params_dict) can_cancel = params_dict.get("can_cancel") order_status = [] if can_cancel is not None: if int(can_cancel): print("获取未结委托") order_status = [huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Unknown, huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded] else: print("获取已结委托") order_status = [huaxin_util.TORA_TSTP_OST_AllTraded, huaxin_util.TORA_TSTP_OST_PartTradeCanceled, huaxin_util.TORA_TSTP_OST_AllCanceled, huaxin_util.TORA_TSTP_OST_Rejected] results = DelegateRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=order_status) response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/refresh_trade_data": # 刷新交易数据 _type = params_dict.get("type") if _type == "money": huaxin_trade_data_update.add_money_list() elif _type == "delegate": huaxin_trade_data_update.add_delegate_list("手动刷新") elif _type == "deal": huaxin_trade_data_update.add_deal_list() elif _type == "position": huaxin_trade_data_update.add_position_list() response_data = json.dumps({"code": 0, "data": {}}) elif url.path == "/get_market_info": # 获取市场行情信息 codes_str = params_dict.get("codes") codes = json.loads(codes_str) fdatas = [] for code in codes: data = data_cache.latest_code_market_info_dict.get(code) # logger_debug.info(f"获取L1行情接口:{code}-{data}") if data: fdatas.append(data) response_data = json.dumps({"code": 0, "data": fdatas}) elif url.path == "/get_buy_money": # 获取每次买入的金额 money = data_cache.BUY_MONEY_PER_CODE response_data = json.dumps({"code": 0, "data": {"money": money}}) elif url.path == "/get_trade_settings": fdata = {"running": TradeSetting().get_running(), "auto_sell": TradeSetting().get_auto_sell(), "auto_buy": TradeSetting().get_auto_buy()} response_data = json.dumps({"code": 0, "data": fdata}) elif url.path == "/set_trade_settings": running = params_dict.get("running") auto_sell = params_dict.get("auto_sell") auto_buy = params_dict.get("auto_buy") if running is not None: TradeSetting().set_running(int(running)) if auto_sell is not None: TradeSetting().set_auto_sell(int(auto_sell)) if auto_buy is not None: TradeSetting().set_auto_buy(int(auto_buy)) response_data = json.dumps({"code": 0, "data": {}}) elif url.path == "/get_env": try: fdata = {} # try: # date = HistoryKDatasUtils.get_trading_dates(tool.date_sub(tool.get_now_date_str(), 10), # tool.get_now_date_str()) # if date: # fdata["juejin"] = 1 # except Exception as e: # fdata["juejin"] = 0 # fdata["kpl"] = {} # # 获取开盘啦数据 # kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value, # KPLDataType.INDUSTRY_RANK.value] # for kpl_type in kpl_types: # if kpl_type in KPLDataManager.kpl_data_update_info: # fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type) try: # 验证redis RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test") fdata["redis"] = 1 except: fdata["redis"] = 0 try: # 验证mysql mysql_data.Mysqldb().select_one("select 1") fdata["mysql"] = 1 except: fdata["mysql"] = 0 try: # redis异步任务数量 fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count() except: pass # 获取交易通道 try: can_access = huaxin_trade_api.test_trade_channel() fdata["trade_channel_access"] = 1 if can_access else 0 except Exception as e: logger_debug.exception(e) fdata["trade_channel_access"] = 0 # 获取CPU与内存适用情况 memory_info = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=1) fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent} # 获取交易通道 result = {"code": 0, "data": fdata, "msg": ""} # print("OnGetEnvInfo 成功") response_data = json.dumps(result) except Exception as e: response_data = json.dumps({"code": 1, "msg": str(e)}) elif url.path == "/load_kpl_stock_of_markets_plate": # 加载数据 KPLStockOfMarketsPlateLogManager().load_data() response_data = json.dumps({"code": 0, "msg": "暂无内容"}) elif url.path == "/get_kpl_stock_of_markets_plate": # 获取开盘啦流入板块详细信息 print("==========get_kpl_stock_of_markets_plate==========") try: time_str = params_dict.get("time") if not time_str: time_str = tool.get_now_time_str() fdatas = KPLStockOfMarketsPlateLogManager().get_filter_log_datas() response_data = json.dumps({"code": 1, "msg": "暂无内容"}) for i in range(len(fdatas) - 1, -1, -1): if fdatas[i][0] <= time_str: response_data = json.dumps({"code": 0, "data": fdatas[i]}) break except Exception as e: logging.exception(e) response_data = json.dumps({"code": 1, "msg": str(e)}) elif url.path == "/get_kpl_market_strong_records": # 获取开盘啦市场强度记录 time_str = params_dict.get("time") if not time_str: time_str = tool.get_now_time_str() datas = log_export.load_kpl_market_strong() fdatas = [] for data in datas: # (距离09:15:00的秒数, 时间, 强度) fdatas.append((tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1])) response_data = json.dumps({"code": 0, "data": fdatas}) self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) @classmethod def __is_sign_right(cls, params): ps = [] for k, v in params.items(): if k == 'sign': continue ps.append(f"{k}={v}") ps.sort() source_str = "&".join(ps) + "!@#lowSU*^cTion8888" md5_hash = hashlib.md5() # 将字符串编码为字节并更新哈希对象 md5_hash.update(source_str.encode('utf-8')) # 获取十六进制表示的哈希值 md5_hexdigest = md5_hash.hexdigest() if md5_hexdigest == params.get("sign"): return True return False def do_POST(self): result_str = "" try: path = self.path print("接收到POST请求:", str(path)) url = urlparse.urlparse(path) if url.path == "/trade_callback": # 接受开盘啦数据 body = self.__parse_request() if type(body) != str: huaxin_trade_api.add_trade_callback_data(json.dumps(body)) else: huaxin_trade_api.add_trade_callback_data(body) result_str = json.dumps({"code": 0}) elif url.path == "/buy": # 签名验证 params = self.__parse_request() if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return print("买入", params) logger_request_api.info(f"买入:{params}") # 买入 code = params.get("code") # 代码 volume = params.get("volume") # 量 price = params.get("price") if not price: # 没有上传价格,就需要获取最近的价格进行买入 data = data_cache.latest_code_market_info_dict.get(code) if not data: raise Exception("没有获取到L1数据") pre_price = data[1] current_price = data[2] if data[2] else data[5][0][0] price = tool.get_buy_max_price(current_price) price = min(price, tool.get_limit_up_price(code, pre_price)) else: price = round(float(params.get("price")), 2) # 价格 result = huaxin_trade_api.order(1, code, volume, price, blocking=True) result_str = json.dumps(result) elif url.path == "/sell": params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 try: print("卖出", params) code = params.get("code") # 代码 volume = params.get("volume") # 量 price = params.get("price") if not price: # 没有上传价格,就需要获取最近的价格进行买入 data = data_cache.latest_code_market_info_dict.get(code) if not data: raise Exception("没有获取到L1数据") pre_price = data[1] current_price = data[2] if data[2] else data[5][0][0] price = tool.get_buy_min_price(current_price) price = max(price, tool.get_limit_down_price(code, pre_price)) else: price = round(params.get("price"), 2) # 价格 result = huaxin_trade_api.order(2, code, volume, price, blocking=True) result_str = json.dumps(result) finally: logger_request_api.info(f"卖出:{params}") elif url.path == "/set_buy_money": # 设置每次买入的金额 params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 print("每次买入的金额", params) money = params.get("money") # 金额 if money is None: result_str = json.dumps({"code": 1, "msg": "未上传金额"}) return money = int(money) data_cache.BUY_MONEY_PER_CODE = money result_str = json.dumps({"code": 0}) elif url.path == "/set_limit_up_sell": # 设置每次买入的金额 params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 print("每次买入的金额", params) code = params.get("code") #代码 enable = params.get("enable") # 是否开启 if code is None or enable is None: result_str = json.dumps({"code": 1, "msg": "上传数据缺失"}) return enable = int(enable) if enable: data_cache.LIMIT_UP_SELL_CODES.add(code) else: data_cache.LIMIT_UP_SELL_CODES.discard(code) result_str = json.dumps({"code": 0}) elif url.path == "/cancel_order": params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 print("撤单", params) direction = params.get("direction") code = params.get("code") # 代码 orderSysID = params.get("orderSysID") # 系统订单编号 result = huaxin_trade_api.cancel_order(direction, code, orderSysID, blocking=True) result_str = json.dumps(result) elif url.path == "/upload_deal_big_orders": # 成交大单传递 datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") hx_logger_l2_transaction.info(_str) # 记录日志 result_str = json.dumps({"code": 0}) except Exception as e: result_str = json.dumps({"code": 1, "msg": str(e)}) finally: self.__send_response(result_str) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) try: params = json.loads(_str) return params except: return _str class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def run(addr="0.0.0.0", port=12881): handler = DataServer try: httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: pass if __name__ == "__main__": run()