import hashlib import json import logging import socket import socketserver from log_module import request_log_util from log_module.log import logger_request_debug from utils import socket_util, hosting_api_util """ 可转债外部接口 """ class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass): socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): __inited = False def setup(self): self.__init() @classmethod def __init(cls): if cls.__inited: return True cls.__inited = True cls.__req_socket_dict = {} def __is_sign_right(self, data_json): list_str = [] sign = data_json["sign"] data_json.pop("sign") for k in data_json: list_str.append(f"{k}={data_json[k]}") list_str.sort() __str = "&".join(list_str) + "JiaBei@!*." md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() if md5 != sign: raise Exception("签名出错") def handle(self): host = self.client_address[0] super().handle() sk: socket.socket = self.request while True: return_str = "" try: data, header = socket_util.recv_data(sk) if data: data_str = data # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}") data_json = json.loads(data_str) type_ = data_json['type'] try: request_log_util.request_info("middle_cb_api_server", f"请求开始:{type_}") is_sign_right = socket_util.is_client_params_sign_right(data_json) # ------客户端请求接口------- if type_ == 'buy': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] volume = codes_data["volume"] price = codes_data["price"] price_type = codes_data["price_type"] try: if not code: raise Exception("请上传code") if not volume: raise Exception("请上传volume") # 下单 result = hosting_api_util.trade_order_for_cb(hosting_api_util.TRADE_DIRECTION_BUY, code, volume, price, price_type) if result: resultJSON = result print("下单结果:", resultJSON) if resultJSON['code'] == 0: return_str = json.dumps({"code": 0}) else: raise Exception(resultJSON['msg']) break except Exception as e: raise e elif type_ == 'cancel_order': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] orderSysID = codes_data.get("orderSysID") accountId = codes_data.get("accountId") if code: result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code, accountId, orderSysID, True) print("---撤单结果----") print(result) if result["code"] == 0: return_str = json.dumps({"code": 0}) else: raise Exception(result["msg"]) else: return_str = json.dumps({"code": 1, "msg": "请上传代码"}) break elif type_ == 'sell': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] volume = codes_data["volume"] price_type = codes_data["price_type"] result = hosting_api_util.trade_order_for_cb(hosting_api_util.TRADE_DIRECTION_SELL, code, volume, '', price_type=price_type) if result["code"] == 0: return_str = json.dumps(result) else: raise Exception(result["msg"]) print("---卖出结果----") print(result) break elif type_ == 'get_code_position_info': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] result = hosting_api_util.get_position_for_cb(code) return_str = json.dumps(result) break elif type_ == 'get_account_money': # 验证签名 if not is_sign_right: raise Exception("签名错误") result = hosting_api_util.get_account_money_for_cb() return_str = json.dumps(result) break elif type_ == 'refresh_trade_data': # 验证签名 data = data_json["data"] refresh_type = data["ctype"] result = hosting_api_util.refresh_trade_data_for_cb(refresh_type) return_str = json.dumps(result) break elif type_ == 'common': params = data_json["data"] result = hosting_api_util.common_request_for_cb(params) return_str = json.dumps(result) break finally: request_log_util.request_info("middle_cb_api_server", f"请求结束:{type_}") break # sk.close() except Exception as e: logging.exception(e) logger_request_debug.exception(e) return_str = json.dumps({"code": 401, "msg": str(e)}) break finally: sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8'))) def finish(self): super().finish() def run(port): print("create middle_cb_api_server") laddr = "0.0.0.0", port print("middle_cb_api_server is at: http://%s:%d/" % (laddr)) tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() if __name__ == "__main__": run(9005)