import concurrent.futures import hashlib import http import json import socketserver from http.server import BaseHTTPRequestHandler import urllib.parse as urlparse from strategy import data_cache from trade import huaxin_trade_api, huaxin_trade_data_update from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager from utils import tool, huaxin_util class DataServer(BaseHTTPRequestHandler): __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 禁用日志输出 def log_message(self, format, *args): pass def do_GET(self): def get_params(url): return dict([(k, v[0]) for k, v in urlparse.parse_qs(url.query).items()]) path = self.path url = urlparse.urlparse(path) params_dict = get_params(url) response_data = "" if url.path == "/get_position_list": # 获取持仓列表 results = PositionManager.get_position_cache() response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_money": # 获取资金信息 result = MoneyManager.get_data() response_data = json.dumps({"code": 0, "data": result}) elif url.path == "/get_deal_list": # 获取成交列表 results = DealRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d")) response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_delegate_list": # 获取委托列表 # 是否可撤单,如果不传默认拉取所有 can_cancel = params_dict.get("can_cancel") order_status = [] if can_cancel is None: if can_cancel: order_status = [huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Unknown, huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded] else: order_status = [huaxin_util.TORA_TSTP_OST_AllTraded, huaxin_util.TORA_TSTP_OST_PartTradeCanceled, huaxin_util.TORA_TSTP_OST_AllCanceled, huaxin_util.TORA_TSTP_OST_Rejected] results = DelegateRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=order_status) response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/refresh_trade_data": # 刷新交易数据 _type = params_dict.get("type") if _type == "money": huaxin_trade_data_update.add_money_list() elif _type == "delegate": huaxin_trade_data_update.add_delegate_list("手动刷新") elif _type == "deal": huaxin_trade_data_update.add_deal_list() elif _type == "position": huaxin_trade_data_update.add_position_list() response_data = json.dumps({"code": 0, "data": {}}) elif url.path == "/get_market_info": # 获取市场行情信息 codes_str = params_dict.get("codes") codes = json.loads(codes_str) fdatas = [] for code in codes: data = data_cache.latest_code_market_info_dict.get(code) if data: fdatas.append(data) response_data = json.dumps({"code": 0, "data": fdatas}) self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) def __is_sign_right(self, params): ps = [] for k, v in params.items(): if k == 'sign': continue ps.append(f"{k}={v}") ps.sort() source_str = "&".join(ps) + "!@#lowSU*^cTion8888" md5_hash = hashlib.md5() # 将字符串编码为字节并更新哈希对象 md5_hash.update(source_str.encode('utf-8')) # 获取十六进制表示的哈希值 md5_hexdigest = md5_hash.hexdigest() if md5_hexdigest == params.get("sign"): return True return False def do_POST(self): result_str = "" try: path = self.path url = urlparse.urlparse(path) if url.path == "/trade_callback": # 接受开盘啦数据 body = self.__parse_request() if type(body) != str: huaxin_trade_api.add_trade_callback_data(json.dumps(body)) else: huaxin_trade_api.add_trade_callback_data(body) result_str = json.dumps({"code": 0}) elif url.path == "/buy": # 签名验证 params = self.__parse_request() if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return print("买入", params) # 买入 code = params.get("code") # 代码 volume = params.get("volume") # 量 price = round(params.get("price"), 2) # 价格 result = huaxin_trade_api.order(1, code, volume, price) result_str = json.dumps({"code": 0, "data": result}) elif url.path == "/sell": params = self.__parse_request() # 签名验证 if not self.__is_sign_right(params): result_str = json.dumps({"code": 1001, "msg": "签名错误"}) return # 卖出 print("卖出", params) code = params.get("code") # 代码 volume = params.get("volume") # 量 price = round(params.get("price"), 2) # 价格 result = huaxin_trade_api.order(2, code, volume, price) result_str = json.dumps({"code": 0, "data": result}) except Exception as e: result_str = json.dumps({"code": 1, "msg": str(e)}) finally: self.__send_response(result_str) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) try: params = json.loads(_str) return params except: return _str class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def run(addr="0.0.0.0", port=12881): handler = DataServer try: httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: pass