import hashlib import json import logging import random import socket import socketserver import threading import time import constant import socket_manager import trade_manager from code_attribute.code_price_manager import CodesLimitRateManager from db import mysql_data, redis_manager from db.redis_manager import RedisUtils from log_module import log, request_log_util from log_module.log import logger_request_debug from middle_l1_data_server import L1DataManager from output import push_msg_manager from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util from utils.history_k_data_util import HistoryKDatasUtils, JueJinApi from utils.huaxin_trade_record_manager import PositionManager class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass): socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): __inited = False def setup(self): self.__init() @classmethod def __init(cls): if cls.__inited: return True cls.__inited = True cls.__req_socket_dict = {} def __is_sign_right(self, data_json): list_str = [] sign = data_json["sign"] data_json.pop("sign") for k in data_json: list_str.append(f"{k}={data_json[k]}") list_str.sort() __str = "&".join(list_str) + "JiaBei@!*." md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() if md5 != sign: raise Exception("签名出错") def handle(self): host = self.client_address[0] super().handle() sk: socket.socket = self.request while True: return_str = "" try: data, header = socket_util.recv_data(sk) if data: data_str = data # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}") data_json = json.loads(data_str) type_ = data_json['type'] try: request_log_util.request_info("middle_api_server", f"请求开始:{type_}") if type(type_) == int: # 处理数字型TYPE return_str = self.process_num_type(sk, type_, data_str) break is_sign_right = socket_util.is_client_params_sign_right(data_json) # ------客户端请求接口------- if type_ == 'buy': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] money = codes_data.get("money") volume = codes_data.get("volume") price = codes_data.get("price") price_type = codes_data.get("price_type") try: if not code: raise Exception("请上传code") if not price or round(float(price), 2) <= 0: if price_type is None: price_type = 0 # 默认为笼子价 pre_close = HistoryKDatasUtils.get_gp_latest_info([code], "sec_id,pre_close")[0][ "pre_close"] if price_type == 0: # 价格笼子 # 获取现价 prices = HistoryKDatasUtils.get_now_price([code]) if not prices: raise Exception("现价获取失败") now_price = prices[0][1] limit_up_price = round( float(tool.get_limit_up_price_by_preprice(code, pre_close)), 2) price = min(tool.get_buy_max_price(now_price), limit_up_price) elif price_type == 1: # 跌停价 limit_down_price = round( float(tool.get_limit_down_price_by_preprice(code, pre_close)), 2) price = limit_down_price elif price_type == 2: # 涨停价 limit_up_price = round( float(tool.get_limit_up_price_by_preprice(code, pre_close)), 2) price = limit_up_price elif price_type == 3: # 现价 prices = HistoryKDatasUtils.get_now_price([code]) if not prices: raise Exception("现价获取失败") now_price = prices[0][1] price = now_price elif price_type == 4: # 买5价 prices = HistoryKDatasUtils.get_now_price([code]) if not prices: raise Exception("现价获取失败") now_price = prices[0][1] price = now_price - 0.04 if not volume and money: volume = (money // int(round(float(price) * 100))) * 100 if volume < 100: volume = 100 # 下单 result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code, volume, price) if result: resultJSON = result print("下单结果:", resultJSON) if resultJSON['code'] == 0: return_str = json.dumps({"code": 0}) else: raise Exception(resultJSON['msg']) break except Exception as e: raise e elif type_ == 'cancel_order': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] orderSysID = codes_data.get("orderSysID") accountId = codes_data.get("accountId") if code: result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code, accountId, orderSysID, True) print("---撤单结果----") print(result) if result["code"] == 0: return_str = json.dumps({"code": 0}) else: raise Exception(result["msg"]) else: return_str = json.dumps({"code": 1, "msg": "请上传代码"}) break elif type_ == 'sell': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] volume = codes_data["volume"] price_type = codes_data["price_type"] result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_SELL, code, volume, '', price_type=price_type) if result["code"] == 0: return_str = json.dumps(result) else: raise Exception(result["msg"]) print("---卖出结果----") print(result) break elif type_ == 'get_code_position_info': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] result = hosting_api_util.get_code_position_info(code) return_str = json.dumps(result) break elif type_ == 'common': params = data_json["data"] ctype = params.get("ctype") trade_sell_types = {"get_current_l1_codes", "get_positions", "get_l2_deal_price", "buy_cb_for_commission", "sell_cb_for_commission", "get_deal_queue", "auto_cancel_sell_mode"} if ctype in trade_sell_types: result = hosting_api_util.common_request(params, client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) else: result = hosting_api_util.common_request(params) return_str = json.dumps(result) break elif type_ == 'get_cost_price': # 获取成本价 codes_data = data_json["data"] code = codes_data["code"] try: price = PositionManager.get_cost_price(code) return_str = json.dumps({"code": 0, "data": {"price": price}}) except Exception as e: return_str = json.dumps({"code": 1, "msg": str(e)}) break elif type_ == 'delegate_list': # 委托列表 update_time = data_json["data"]["update_time"] # 是否可撤 0/1 can_cancel = data_json["data"]["can_cancel"] results, update_time = None, None if can_cancel: results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d"), None, [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]) else: results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d"), update_time) return_str = json.dumps( {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"}) break elif type_ == 'deal_list': # 成交列表 results = huaxin_trade_record_manager.DealRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d")) return_str = json.dumps( {"code": 0, "data": {"list": results}, "msg": ""}) elif type_ == 'position_list': # 持仓股列表 results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day( tool.get_now_date_str("%Y%m%d")) return_str = json.dumps( {"code": 0, "data": {"list": results}, "msg": ""}) elif type_ == 'money_list': # 资金详情 money_data = huaxin_trade_record_manager.MoneyManager.get_data() return_str = json.dumps( {"code": 0, "data": money_data, "msg": ""}) elif type_ == 'sync_trade_data': # 同步交易数据 sync_type = data_json["data"]["type"] hosting_api_util.refresh_trade_data(sync_type) hosting_api_util.refresh_trade_data(sync_type, blocking=False, client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) elif type_ == "get_huaxin_subscript_codes": # 获取华鑫订阅的代码 fresults = global_data_cache_util.huaxin_subscript_codes fdata = [] try: # 获取当前涨停比例 rate_results_dict = global_data_cache_util.huaxin_subscript_codes_rate for r in fresults: fdata.append( (r[0], r[1], rate_results_dict.get(r[0]) if r[0] in rate_results_dict else 0, r[2])) fdata.sort(key=lambda r: r[2], reverse=True) except: fdata = fresults update_time = global_data_cache_util.huaxin_subscript_codes_update_time if update_time is None: update_time = '' return_str = json.dumps( {"code": 0, "data": {"count": len(fresults), "list": fdata, "update_time": update_time}, "msg": ""}) pass elif type_ == "get_huaxin_position_subscript_codes": # 获取华鑫订阅的代码 fresults = global_data_cache_util.huaxin_position_subscript_codes update_time = global_data_cache_util.huaxin_position_subscript_codes_update_time if update_time is None: update_time = '' return_str = json.dumps( {"code": 0, "data": {"count": len(fresults), "list": fresults, "update_time": update_time}, "msg": ""}) pass elif type_ == "export_l2_data": # 导出L2数据 code = data_json["data"]["code"] hosting_api_util.export_l2_data(code) return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) elif type_ == 'everyday_init': # 每日初始化 hosting_api_util.everyday_init() return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) elif type_ == 'huaxin_channel_state': # 华鑫通道状态 types = [] fdata = {} return_str = json.dumps( {"code": 0, "data": fdata, "msg": ""}) elif type_ == 'juejin_is_valid': # 掘金是否可用 try: date = JueJinApi.get_previous_trading_date(tool.get_now_date_str()) if date: return_str = json.dumps( {"code": 0, "msg": ""}) except Exception as e: return_str = json.dumps( {"code": 0, "msg": str(e)}) elif type_ == 'get_env_info': # 获取环境信息 result = hosting_api_util.get_env_info() return_str = json.dumps(result) elif type_ == 'sync_l1_subscript_codes': # 获取环境信息 result = hosting_api_util.sync_l1_subscript_codes() return_str = json.dumps(result) elif type_ == 'get_system_logs': # 获取环境信息 start_index = data_json["data"]["start_index"] count = data_json["data"]["count"] result = hosting_api_util.get_system_logs(start_index, count) return_str = json.dumps(result) elif type_ == 'test_redis': redis = redis_manager.RedisManager(5).getRedisNoPool() try: _start_time = time.time() times = [] for i in range(0, 100): RedisUtils.sadd(redis, "test_set", f"000000:{i}", auto_free=False) times.append(time.time() - _start_time) _start_time = time.time() for i in range(0, 20): RedisUtils.smembers(redis, "test_set", auto_free=False) times.append(time.time() - _start_time) return_str = json.dumps( {"code": 0, "data": times, "msg": ""}) finally: redis.close() elif type_ == 'get_code_trade_info': # 获取环境信息 code = data_json["data"]["code"] result = hosting_api_util.get_code_trade_info(code) return_str = json.dumps(result) elif type_ == 'get_l2_listen_active_count': result = hosting_api_util.get_l2_listen_active_count() return_str = json.dumps(result) elif type_ == "trade_server_channels": trade_channels = socket_manager.ClientSocketManager.list_client( socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) common_channels = socket_manager.ClientSocketManager.list_client( socket_manager.ClientSocketManager.CLIENT_TYPE_COMMON) data = {} available_count = 0 active_count = 0 now_time_str = tool.get_now_time_str() for t in trade_channels: if not t[1]: available_count += 1 if tool.trade_time_sub(now_time_str, t[2]) < 60: active_count += 1 data["trade"] = (len(trade_channels), available_count, active_count) available_count = 0 active_count = 0 for t in common_channels: if not t[1]: available_count += 1 if tool.trade_time_sub(now_time_str, t[2]) < 60: active_count += 1 data["common"] = (len(common_channels), available_count, active_count) return_str = json.dumps({"code": 0, "data": data}) elif type_ == "save_running_data": result = hosting_api_util.save_running_data() return_str = json.dumps(result) elif type_ == "add_sell_rule": result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_ADD, data=data_json["data"]) return_str = json.dumps(result) elif type_ == "update_sell_rule": result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_SET, data=data_json["data"]) return_str = json.dumps(result) elif type_ == "del_sell_rule": id_ = data_json["data"]["id"] result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_DELETE, data={"id": id_}) return_str = json.dumps(result) elif type_ == "list_sell_rule": result = hosting_api_util.sell_rule(hosting_api_util.OPERRATE_GET, data={}) return_str = json.dumps(result) elif type_ == "get_code_position_info": code = data_json["data"]["code"] result = hosting_api_util.get_code_position_info(code) return_str = json.dumps(result) elif type_ == "register_msg_receiver": params = data_json["data"] msg_types = params["types"] try: push_msg_manager.SocketManager.regirster_socket(sk, msg_types) result = {"code": 0} except Exception as e: result = {"code": 1, "msg": str(e)} return_str = json.dumps(result) sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8'))) while True: try: buf = sk.recv(1024) print(f"收到数据:{buf.decode(encoding='utf-8')}") time.sleep(1) except: print("数据断开") break elif type_ == "get_latest_cancel_orders": # 获取最近的撤单 results = huaxin_trade_record_manager.DelegateRecordManager.list_latest_cancel_records(10) fresults = [] for result in results: temp = {} for key in result: if key in ["securityID", "securityName", "direction", "orderSysID", "acceptTime", "cancelTime", "limitPrice", "volume"]: temp[key] = result[key] # 过滤虚拟单与卖单 if int(temp["volume"] <= 100): continue if int(temp["direction"] != 0): continue fresults.append(temp) return_str = json.dumps(json.dumps({"code": 0, "data": fresults})) elif type_ == "get_buy1_info": code = data_json["data"]["code"] result = hosting_api_util.common_request({"ctype": "get_buy1_info", "code": code}, client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL) return_str = json.dumps(result) elif type_ == "get_l1_data": results = L1DataManager().get_current_l1_data() return_str = json.dumps({"code": 0, "data": results}) elif type_ == "set_l1_codes": codes = data_json["data"]["codes"] # 将代码暂存到本地 # 设置L1代码 L1DataManager().save_target_codes(codes) return_str = json.dumps({"code": 0, "data": {}}) finally: request_log_util.request_info("middle_api_server", f"请求结束:{type_}") break # sk.close() except Exception as e: logging.exception(e) logger_request_debug.exception(e) return_str = json.dumps({"code": 401, "msg": str(e)}) break finally: sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8'))) @classmethod def process_num_type(cls, sk, type, _str): return_str = "" try: if type == 201: # 加入黑名单 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_BLACK) return_str = json.dumps({"code": 0}) elif type == 203: # 移除黑名单 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_BLACK) return_str = json.dumps({"code": 0}) elif type == 301: # 黑名单列表 result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_BLACK) return_str = json.dumps(result) elif type == 202: # 加入白名单 data = json.loads(_str) codes = data["data"]["codes"] try: for code in codes: hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_WHITE) return_str = json.dumps({"code": 0}) except Exception as e: return_str = json.dumps({"code": 1, "msg": str(e)}) elif type == 204: # 移除白名单 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_WHITE) return_str = json.dumps({"code": 0}) elif type == 302: # 白名单列表 result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_WHITE) return_str = json.dumps(result) elif type == 401: # 加入想要买 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_WANT) return_str = json.dumps({"code": 0}) elif type == 402: # 移除想买单 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_WANT) return_str = json.dumps({"code": 0}) elif type == 403: # 想买单列表 result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_WANT) return_str = json.dumps(result) elif type == 411: # 加入暂不买 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_PAUSE_BUY) return_str = json.dumps({"code": 0}) elif type == 412: # 移除暂不买 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_PAUSE_BUY) return_str = json.dumps({"code": 0}) elif type == 413: # 暂不买列表 result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_PAUSE_BUY) return_str = json.dumps(result) elif type == 420: # 是否可以撤单 data = json.loads(_str) codes = data["data"]["codes"] code = codes[0] result = hosting_api_util.get_code_trade_state(code) state = result["data"]["state"] if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS: return_str = json.dumps({"code": 0, "msg": "可以取消"}) else: return_str = json.dumps({"code": 1, "msg": "不可以取消"}) elif type == 421: # 加红 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_MUST_BUY) return_str = json.dumps({"code": 0}) elif type == 422: # 移红 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_MUST_BUY) return_str = json.dumps({"code": 0}) elif type == 423: # 红单列表 result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_MUST_BUY) return_str = json.dumps(result) elif type == 441: # 加绿 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_GREEN) return_str = json.dumps({"code": 0}) elif type == 442: # 移绿 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_GREEN) return_str = json.dumps({"code": 0}) elif type == 443: # 绿单列表 result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_GREEN) return_str = json.dumps(result) elif type == 430: # 查询代码属性 data = json.loads(_str) code = data["data"]["code"] # 查询是否想买单/白名单/黑名单/暂不买 result = hosting_api_util.get_code_attribute(code) return_str = json.dumps(result) elif type == 501: # 设置系统交易状态 data = json.loads(_str) is_open = data["data"]["open"] if is_open: hosting_api_util.set_trade_state(True) else: hosting_api_util.set_trade_state(False) return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")}) elif type == 502: # 获取系统交易状态 result = hosting_api_util.get_trade_state() return_str = json.dumps(result) elif type == 503: # 设置交易目标代码的模式 data = json.loads(_str) mode = data["data"]["mode"] try: hosting_api_util.set_trade_mode(mode) return_str = json.dumps({"code": 0, "data": {"mode": mode}}) except Exception as e: return_str = json.dumps({"code": 1, "msg": str(e)}) elif type == 504: # 获取交易目标代码模式 result = hosting_api_util.get_trade_mode() return_str = json.dumps(result) except Exception as e: return_str = json.dumps({"code": 1, "msg": str(e)}) return return_str def finish(self): super().finish() def run(): print("create middle_api_server") laddr = "0.0.0.0", constant.MIDDLE_API_SERVER_PORT print("middle_api_server is at: http://%s:%d/" % (laddr)) tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() if __name__ == "__main__": pass