import hashlib import json import logging import multiprocessing import socket import socketserver import threading import time import psutil import inited_data from code_attribute import gpcode_manager from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from l2 import l2_data_manager_new from l2.huaxin import huaxin_target_codes_manager from log_module.log import logger_system, logger_l2_codes_subscript from third_data import block_info, kpl_api from third_data.code_plate_key_manager import KPLCodeJXBlockManager from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager from third_data.kpl_util import KPLDataType from trade import trade_manager, trade_huaxin, l2_trade_util, trade_constant from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \ huaxin_trade_data_update from trade.huaxin.huaxin_trade_api import ClientSocketManager from trade.huaxin.huaxin_trade_order_processor import TradeResultProcessor, HuaxinOrderEntity from utils import socket_util, tool, huaxin_util, data_export_util class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass): socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): __inited = False def setup(self): self.__init() @classmethod def __init(cls): if cls.__inited: return True cls.__inited = True cls.__req_socket_dict = {} def __is_sign_right(self, data_json): list_str = [] sign = data_json["sign"] data_json.pop("sign") for k in data_json: list_str.append(f"{k}={data_json[k]}") list_str.sort() __str = "&".join(list_str) + "JiaBei@!*." md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() if md5 != sign: raise Exception("签名出错") def handle(self): host = self.client_address[0] super().handle() sk: socket.socket = self.request while True: return_str = "" try: data, header = socket_util.recv_data(sk) if data: data_str = data data_json = json.loads(data_str) type_ = data_json['type'] is_sign_right = socket_util.is_client_params_sign_right(data_json) # ------客户端请求接口------- if type_ == 'buy': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] volume = codes_data["volume"] price = codes_data["price"] try: if not code: raise Exception("请上传code") if not volume: raise Exception("请上传volume") if round(float(price), 2) <= 0: prices = HistoryKDatasUtils.get_now_price([code]) if not prices: raise Exception("现价获取失败") price = prices[0][1] # 下单 result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, volume, round(float(price), 2)) if result: resultJSON = result # # {'code': 0, 'data': {'sinfo': 'b_600480_1689060343812', 'securityId': '600480', # 'orderLocalId': '0190000809', 'orderStatus': '7', 'statusMsg': # '10932:产品状态资源访问授权不足', 'orderSysID': '110010190000809', 'accountId': # '38800001334901'}} if resultJSON['code'] == 0: try: resultJSON = resultJSON['data'] statusCode = resultJSON['orderStatus'] if statusCode == huaxin_util.TORA_TSTP_OST_Rejected: # 交易所拒绝 raise Exception(resultJSON['statusMsg']) else: # code, orderStatus, orderRef, accountID, orderSysID, insertTime=None order = HuaxinOrderEntity(resultJSON['securityId'], statusCode, resultJSON['orderRef'], resultJSON['accountID'], resultJSON['orderSysID'], resultJSON['insertTime'], acceptTime=resultJSON['acceptTime'], direction=resultJSON['direction']) TradeResultProcessor.order_success(order) return_str = json.dumps({"code": 0}) finally: # 更新委托列表 huaxin_trade_data_update.add_delegate_list("接口") huaxin_trade_data_update.add_money_list() else: raise Exception(resultJSON['msg']) break except Exception as e: raise e elif type_ == 'cancel_order': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] orderSysID = codes_data.get("orderSysID") accountId = codes_data.get("accountId") if code and orderSysID and accountId: result = huaxin_trade_api.cancel_order(huaxin_trade_api.TRADE_DIRECTION_BUY, code, orderSysID, True) print(result) if result["code"] == 0: if result["data"]["cancel"] == 1: # 撤单成功 TradeResultProcessor.cancel_order_success(code, accountId, orderSysID) return_str = json.dumps({"code": 0}) else: # 撤单失败 raise Exception(result["data"]["errorMsg"]) else: raise Exception(result["msg"]) elif code: state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) if state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_CANCEL_ING: try: l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销") return_str = json.dumps({"code": 0}) except Exception as e: logging.exception(e) return_str = json.dumps({"code": 2, "msg": str(e)}) else: return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"}) else: return_str = json.dumps({"code": 1, "msg": "请上传代码"}) break elif type_ == 'sell': # 验证签名 if not is_sign_right: raise Exception("签名错误") codes_data = data_json["data"] code = codes_data["code"] volume = codes_data["volume"] price = codes_data["price"] # 是否强制卖0/1 force_sell = codes_data["force"] # TODO 强制卖策略 if volume == 0: # 查询持仓量 volume = huaxin_trade_record_manager.PositionManager.get_code_volume(code) if volume <= 0: raise Exception("代码无持仓") if not price: # 获取现价 prices = HistoryKDatasUtils.get_now_price([code]) if not prices: raise Exception("现价获取失败") # 已现价的5档价卖 price = prices[0][1] - 0.04 result = huaxin_trade_api.order(huaxin_trade_api.TRADE_DIRECTION_SELL, code, volume, price) if result["code"] == 0: if result["data"]["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected or ( type(result["data"]["orderStatus"]) == int and result["data"]["orderStatus"] < 0): raise Exception(result["data"]["statusMsg"]) else: return_str = json.dumps({"code": 0, "msg": ""}) else: raise Exception(result["msg"]) print("---卖出结果----") print(result) break elif type_ == 'delegate_list': # 委托列表 update_time = data_json["data"]["update_time"] # 是否可撤 0/1 can_cancel = data_json["data"]["can_cancel"] results, update_time = None, None if can_cancel: results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d"), None, [huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]) else: results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d"), update_time) return_str = json.dumps( {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"}) break elif type_ == 'deal_list': # 成交列表 results = huaxin_trade_record_manager.DealRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d")) return_str = json.dumps( {"code": 0, "data": {"list": results}, "msg": ""}) elif type_ == 'position_list': # 持仓股列表 results, update_time = huaxin_trade_record_manager.PositionManager.list_by_day( tool.get_now_date_str("%Y%m%d")) return_str = json.dumps( {"code": 0, "data": {"list": results}, "msg": ""}) elif type_ == 'money_list': # 资金详情 money_data = huaxin_trade_record_manager.MoneyManager.get_data() return_str = json.dumps( {"code": 0, "data": money_data, "msg": ""}) elif type_ == 'sync_trade_data': # 同步交易数据 sync_type = data_json["data"]["type"] if sync_type == "delegate_list": huaxin_trade_data_update.add_delegate_list("接口") elif sync_type == "deal_list": huaxin_trade_data_update.add_deal_list() elif sync_type == "money": huaxin_trade_data_update.add_money_list() elif sync_type == "position_list": huaxin_trade_data_update.add_position_list() return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) elif type_ == "get_huaxin_subscript_codes": # 获取华鑫订阅的代码 codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes() fresults = [] if codes: for code in codes: code_name = gpcode_manager.get_code_name(code) fresults.append((code, code_name)) return_str = json.dumps( {"code": 0, "data": {"count": len(fresults), "list": fresults}, "msg": ""}) elif type_ == "export_l2_data": # 导出L2数据 code = data_json["data"]["code"] try: data_export_util.export_l2_excel(code) return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) except Exception as e: logging.exception(e) return_str = json.dumps( {"code": 1, "msg": str(e)}) elif type_ == 'everyday_init': # 每日初始化 inited_data.everyday_init() return_str = json.dumps( {"code": 0, "data": {}, "msg": ""}) elif type_ == 'huaxin_channel_state': # 华鑫通道状态 types = [huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_TRADE, huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_CMD_L2, huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_DEAL_LIST, huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_DELEGATE_LIST, huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_MONEY, huaxin_trade_api.ClientSocketManager.CLIENT_TYPE_POSITION_LIST] fdata = {} for t in types: client_list = huaxin_trade_api.ClientSocketManager.list_client(t) client_state_list = [] for client in client_list: # 判断是否已经上锁 lock_state = huaxin_trade_api.ClientSocketManager.is_client_locked(client[0]) lock_state_desc = "" if lock_state is None: lock_state_desc = "未知" elif lock_state: lock_state_desc = "已锁" else: lock_state_desc = "未锁" client_state_list.append((client[0], lock_state_desc)) fdata[t] = client_state_list return_str = json.dumps( {"code": 0, "data": fdata, "msg": ""}) elif type_ == 'juejin_is_valid': # 掘金是否可用 try: date = JueJinApi.get_previous_trading_date(tool.get_now_date_str()) if date: return_str = json.dumps( {"code": 0, "msg": ""}) except Exception as e: return_str = json.dumps( {"code": 0, "msg": str(e)}) elif type_ == 'get_env_info': # 获取环境信息 fdata = {} try: date = JueJinApi.get_previous_trading_date(tool.get_now_date_str()) if date: fdata["juejin"] = 1 except Exception as e: fdata["juejin"] = 0 fdata["kpl"] = {} # 获取开盘啦数据 kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value, KPLDataType.INDUSTRY_RANK.value] for kpl_type in kpl_types: if kpl_type in KPLDataManager.kpl_data_update_info: fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type) try: # 验证redis RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test") fdata["redis"] = 1 except: fdata["redis"] = 0 try: # 验证mysql mysql_data.Mysqldb().select_one("select 1") fdata["mysql"] = 1 except: fdata["mysql"] = 0 try: # redis异步任务数量 fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count() except: pass # 获取CPU与内存适用情况 memory_info = psutil.virtual_memory() cpu_percent = psutil.cpu_percent(interval=1) fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent} return_str = json.dumps( {"code": 0, "data": fdata, "msg": ""}) elif type_ == 'test_redis': redis = redis_manager.RedisManager(5).getRedisNoPool() try: _start_time = time.time() times = [] for i in range(0, 100): RedisUtils.sadd(redis, "test_set", f"000000:{i}", auto_free=False) times.append(time.time() - _start_time) _start_time = time.time() for i in range(0, 20): RedisUtils.smembers(redis, "test_set", auto_free=False) times.append(time.time() - _start_time) return_str = json.dumps( {"code": 0, "data": times, "msg": ""}) finally: redis.close() # RedisUtils.realse(redis) # 查询委托列表 elif type_ == 'test': # 卖出 # trade_api.order(trade_api.TRADE_DIRECTION_SELL, "600854", 100, 5.45) result = huaxin_trade_api.get_deal_list() print("\n\n---成交列表----") for d in result["data"]: print(d) result = huaxin_trade_api.get_delegate_list(True) print("\n\n---可撤委托----") for d in result["data"]: print(d) result = huaxin_trade_api.get_delegate_list(False) print("\n\n---全部委托----") for d in result["data"]: print(d) result = huaxin_trade_api.get_position_list() print("\n\n---持仓列表----") for d in result["data"]: print(d) result = huaxin_trade_api.get_money() print("\n\n---账户列表----") for d in result["data"]: print(d) elif type_ == 'test_l2': codes_data = data_json["data"] result = huaxin_trade_api.set_l2_codes_data(codes_data) print("\n\n---L2设置结果----") print(result) break # sk.close() except Exception as e: logging.exception(e) return_str = json.dumps({"code": 401, "msg": str(e)}) break finally: sk.sendall(socket_util.load_header(return_str.encode(encoding='utf-8'))) def finish(self): super().finish() def __set_target_codes(queue_other_w_l2_r: multiprocessing.Queue): logger_system.info("启动读取L2订阅代码队列") while True: try: _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop() if _datas: times = _datas[0] datas = _datas[1] request_id = _datas[2] logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}", request_id, len(datas)) # 只处理20s内的数据 if time.time() - times < 20: # 获取涨停列表中的数据 # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间) if not datas: # 没有数据需要处理 continue # 再次获取代码 codes = [d[0] for d in datas] for code in codes: block_info.init_code(code) root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, "data": datas} queue_other_w_l2_r.put_nowait(json.dumps(root_data)) # 如果在9:24-9:29 需要加载板块 if int("092400") < int(tool.get_now_time_str().replace(":", "")) < int("092900"): for d in datas: threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0], gpcode_manager.get_price( d[0]), float(d[2]), KPLLimitUpDataRecordManager.get_current_reasons()), daemon=True).start() time.sleep(0.2) logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas)) except Exception as e: logging.exception(e) logger_l2_codes_subscript.exception(e) finally: time.sleep(0.01) def __read_sync_task(pipe): logger_system.info("启动读取数据同步服务") while True: try: if pipe: val = pipe.recv() if val: print("接收到更新任务:", val) val = json.loads(val) type_ = val["type"] if type_ == "want_list": print("want_list before", gpcode_manager.WantBuyCodesManager().list_code_cache()) gpcode_manager.WantBuyCodesManager().sync() print("want_list after", gpcode_manager.WantBuyCodesManager().list_code_cache()) elif type_ == "white_list": print("white_list before", gpcode_manager.WhiteListCodeManager().list_codes_cache()) gpcode_manager.WhiteListCodeManager().sync() print("white_list after", gpcode_manager.WhiteListCodeManager().list_codes_cache()) elif type_ == "black_list": print("black_list before", gpcode_manager.BlackListCodeManager().list_codes_cache()) gpcode_manager.BlackListCodeManager().sync() print("black_list after", gpcode_manager.BlackListCodeManager().list_codes_cache()) elif type_ == "pause_buy_list": print("pause_buy_list before", gpcode_manager.PauseBuyCodesManager().list_code_cache()) gpcode_manager.PauseBuyCodesManager().sync() print("pause_buy_list after", gpcode_manager.PauseBuyCodesManager().list_code_cache()) elif type_ == "trade_state": print("trade_state before", trade_manager.TradeStateManager().is_can_buy_cache()) trade_manager.TradeStateManager().sync() print("trade_state after", trade_manager.TradeStateManager().is_can_buy_cache()) elif type_ == "trade_mode": print("trade_mode before", trade_manager.TradeTargetCodeModeManager().get_mode_cache()) trade_manager.TradeTargetCodeModeManager().sync() print("trade_mode after", trade_manager.TradeTargetCodeModeManager().get_mode_cache()) except Exception as e: logging.exception(e) finally: time.sleep(1) def run(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w): logger_system.info("create TradeApiServer") logger_system.info(f"trade_api_server 线程ID:{tool.get_thread_id()}") # 拉取交易信息 huaxin_trade_data_update.run(queue_l1_trade_r_strategy_w, queue_other_w_l2_r) # t1 = threading.Thread(target=lambda: __set_target_codes(queue_other_w_l2_r), daemon=True) t1.start() t1 = threading.Thread(target=lambda: __read_sync_task(pipe_server), daemon=True) t1.start() while True: time.sleep(5) # 托管环境下不开启接口 # laddr = "0.0.0.0", 10009 # tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle # tcpserver.serve_forever() if __name__ == "__main__": pass