""" 接受客户端数据的服务器 """ import datetime import json import logging import socketserver import socket import threading import time import alert_util import client_manager import code_volumn_manager import data_process import global_data_loader import global_util import gpcode_manager import authority import juejin import l2_data_log import l2_data_manager import l2_data_manager_new import l2_data_util import limit_up_time_manager import ths_industry_util import ths_util import tool import trade_data_manager import trade_gui import trade_manager import l2_code_operate from code_data_util import ZYLTGBUtil from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ logger_l2_trade_queue, logger_l2_latest_data from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None): self.pipe_juejin = pipe_juejin # 增加的参数 self.pipe_ui = pipe_ui socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): l2_data_error_dict = {} last_trade_delegate_data = None buy1_volumn_manager = THSBuy1VolumnManager() ths_l2_trade_queue_manager = thsl2tradequeuemanager() latest_buy1_volumn_dict = {} buy1_price_manager = Buy1PriceManager() l2_trade_queue_time_dict = {} l2_save_time_dict = {} def setup(self): super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 # print("----setup方法被执行-----") # print("打印传入的参数:", self.server.pipe) self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance() def handle(self): host = self.client_address[0] super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做 # print("-------handler方法被执行----") # print(self.server) # print(self.request) # 服务 # print("客户端地址:", self.client_address) # 客户端地址 # print(self.__dict__) # print("- " * 30) # print(self.server.__dict__) # print("- " * 30) sk: socket.socket = self.request while True: data = sk.recv(1024000) if len(data) == 0: # print("客户端断开连接") break _str = str(data, encoding="gbk") if len(_str) > 0: # print("结果:",_str) type = data_process.parseType(_str) return_str = "OK" if type == 0: try: origin_start_time = round(time.time() * 1000) __start_time = round(time.time() * 1000) # level2盘口数据 day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data( _str) # 间隔1s保存一条l2的最后一条数据 if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ code] >= 1000 and len(datas) > 0: self.l2_save_time_dict[code] = origin_start_time logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1]) # 10ms的网络传输延时 capture_timestamp = __start_time - process_time - 10 # print("截图时间:", process_time) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "截图时间:{} 数据解析时间".format(process_time)) cid, pid = gpcode_manager.get_listen_code_pos(code) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2获取代码位置耗时") # 判断目标代码位置是否与上传数据位置一致 if cid is not None and pid is not None and client == int(cid) and channel == int(pid): try: # 校验客户端代码 l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) __start_time = round(time.time() * 1000) if gpcode_manager.is_listen(code): __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2外部数据预处理耗时") l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2数据有效处理外部耗时", False) # 保存原始数据数量 l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) if round(time.time() * 1000) - __start_time > 20: l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "异步保存原始数据条数耗时", False) except l2_data_manager.L2DataException as l: # 单价不符 if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: key = "{}-{}-{}".format(client, channel, code) if key not in self.l2_data_error_dict or round( time.time() * 1000) - self.l2_data_error_dict[key] > 10000: # self.l2CodeOperate.repaire_l2_data(code) # todo 太敏感移除代码 logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) # 单价不一致时需要移除代码重新添加 l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误") self.l2_data_error_dict[key] = round(time.time() * 1000) except Exception as e: print("异常", str(e), code) logging.exception(e) logger_l2_error.error("出错:{}".format(str(e))) logger_l2_error.error("内容:{}".format(_str)) finally: __end_time = round(time.time() * 1000) # 只记录大于40ms的数据 if __end_time - origin_start_time > 100: l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, "l2数据处理总耗时", True) except Exception as e: logger_l2_error.exception(e) elif type == 1: # 设置股票代码 data_list, is_add = data_process.parseGPCode(_str) ZYLTGBUtil.save_list(data_list) code_list = [] for data in data_list: code_list.append(data["code"]) # 获取基本信息 code_datas = juejin.JueJinManager.get_gp_latest_info(code_list) if is_add: gpcode_manager.add_gp_list(code_datas) else: gpcode_manager.set_gp_list(code_datas) if not is_add: # 同步同花顺目标代码 t1 = threading.Thread(target=lambda: sync_target_codes_to_ths()) t1.setDaemon(True) t1.start() elif type == 2: # 涨停代码 dataList, is_add = data_process.parseGPCode(_str) # 设置涨停时间 gpcode_manager.set_limit_up_list(dataList) # 保存到内存中 if dataList: global_data_loader.add_limit_up_codes(dataList) ths_industry_util.set_industry_hot_num(dataList) # 保存涨停时间 gp_list = gpcode_manager.get_gp_list() gp_code_set = set(gp_list) now_str = tool.get_now_time_str() for d in dataList: if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second( d["time"]): continue if d["code"] not in gp_code_set: continue # 获取是否有涨停时间 # if limit_up_time_manager.get_limit_up_time(d["code"]) is None: # limit_up_time_manager.save_limit_up_time(d["code"], d["time"]) elif type == 3: # 交易成功信息 dataList = data_process.parseList(_str) try: trade_manager.process_trade_success_data(dataList) except Exception as e: logging.exception(e) trade_manager.save_trade_success_data(dataList) elif type == 5: logger_trade_delegate.debug("接收到委托信息") # 交易委托信息 dataList = data_process.parseList(_str) if self.last_trade_delegate_data != _str: self.last_trade_delegate_data = _str # 保存委托信息 logger_trade_delegate.info(dataList) try: # 设置申报时间 for item in dataList: apply_time = item["apply_time"] if apply_time and len(apply_time) >= 8: code = item["code"] trade_state = trade_manager.get_trade_state(code) # 设置下单状态的代码为已委托 if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: origin_apply_time = apply_time apply_time = apply_time[0:6] apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6]) ms = origin_apply_time[6:9] if int(ms) > 500: # 时间+1s apply_time = tool.trade_time_add_second(apply_time, 1) print(apply_time) l2_data_manager_new.SecondAverageBigNumComputer.set_apply_time(code, apply_time) except Exception as e: logging.exception(e) try: trade_manager.process_trade_delegate_data(dataList) except Exception as e: logging.exception(e) trade_manager.save_trade_delegate_data(dataList) # 刷新交易界面 trade_gui.THSGuiTrade().refresh_data() elif type == 4: # 行业代码信息 dataList = data_process.parseList(_str) ths_industry_util.save_industry_code(dataList) elif type == 6: # 可用金额 datas = data_process.parseData(_str) client = datas["client"] money = datas["money"] # TODO存入缓存文件 trade_manager.set_available_money(client, money) # l2交易队列 elif type == 10: # 可用金额 datas = data_process.parseData(_str) channel = datas["channel"] code = datas["code"] data = datas["data"] buy_time = data["buyTime"] buy_one_price = data["buyOnePrice"] buy_one_volumn = data["buyOneVolumn"] # 保存最近的记录 if self.ths_l2_trade_queue_manager.save_recod(code, data): if buy_time != "00:00:00": logger_l2_trade_queue.info("{}-{}", code, data) self.buy1_price_manager.save(code, buy_one_price) need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time, int(buy_one_volumn), buy_one_price) if need_cancel: l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") if need_sync: # 同步数据 l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) # print(buy_time, buy_one_price, buy_one_volumn) # print("L2买卖队列",datas) elif type == 20: # 登录 data = data_process.parse(_str)["data"] try: client_id, _authoritys = authority.login(data["account"], data["pwd"]) return_str = data_process.toJson( {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}}) except Exception as e: return_str = data_process.toJson({"code": 1, "msg": str(e)}) # 现价更新 elif type == 40: data = data_process.parse(_str)["data"] if data is not None: print("现价数量", len(data)) for item in data: volumn = item["volumn"] volumnUnit = item["volumnUnit"] code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit) juejin.accpt_prices(data) elif type == 50: data = data_process.parse(_str)["data"] if data is not None: index = data["index"] code_name = data["codeName"].replace(" ", "") volumn = data["volumn"] price = data["price"] time_ = data["time"] code = global_util.name_codes.get(code_name) if code is None: global_data_loader.load_name_codes() code = global_util.name_codes.get(code_name) if code is not None: # 记录日志 if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn, price): # 记录数据 logger_buy_1_volumn_record.info("{}-{}", code, data) self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price) # 保存买1价格 self.buy1_price_manager.save(code, price) # 校正时间 time_ = tool.compute_buy1_real_time(time_) # 保存数据 need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn, price) if need_cancel: l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") if need_sync: # 同步数据 l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) elif type == 30: # 心跳信息 data = data_process.parse(_str)["data"] client_id = data["client"] thsDead = data.get("thsDead") logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) client_manager.saveClientActive(int(client_id), host, thsDead) if ths_util.is_ths_dead(client_id): # TODO 重启同花顺 # 报警 l2_clients = authority.get_l2_clients() if client_id in l2_clients: alert_util.alarm() elif type == 60: # 心跳信息 data = data_process.parse(_str)["data"] client_id = data["client"] print("L2自启动成功", client_id) now_str = tool.get_now_time_str() ts = tool.get_time_as_second(now_str) # 9点25到9点28之间的自启动就需要批量设置代码 if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00"): # 准备批量设置代码 return_json = {"code": 1, "msg": "等待批量设置代码"} return_str = json.dumps(return_json) # 获取排名前16位的代码 codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16) codes = sorted(codes) if client_id == 2: codes = codes[:8] else: codes = codes[8:] codes_datas = [] for i in range(0, 8): if i >= len(codes): break codes_datas.append((i, codes[i])) # 如果设置失败需要重试2次 for i in range(0, 3): set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas) if set_success: break else: time.sleep(3) else: return_json = {"code": 0, "msg": "开启在线状态"} return_str = json.dumps(return_json) # print("心跳:", client_id) elif type == 100: # 图像识别 return_str = data_process.toJson({"code": 0, "data": {"datas": []}}) pass sk.send(return_str.encode()) # print("----------handler end ----------") def finish(self): super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做 # print("--------finish方法被执行---") def send_msg(client_id, data): _ip = client_manager.getActiveClientIP(client_id) print("ip", client_id, _ip) if _ip is None or len(_ip) <= 0: raise Exception("客户端IP为空") socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socketClient.connect((_ip, 9006)) # 连接socket try: socketClient.send(json.dumps(data).encode()) recv = socketClient.recv(1024) result = str(recv, encoding="gbk") return result finally: socketClient.close() # 客户端心跳机制 def test_client_server(): while True: clients = authority.get_l2_clients() for client in clients: print("心跳", client) try: send_msg(client, {"action": "test"}) except: pass # 矫正客户端代码 l2_code_operate.correct_client_codes() time.sleep(5) # 获取采集客户端的状态 def get_client_env_state(client): result = send_msg(client, {"action": "getEnvState"}) result = json.loads(result) if result["code"] == 0: return json.loads(result["data"]) else: raise Exception(result["msg"]) # 修复采集客户端 def repair_client_env(client): result = send_msg(client, {"action": "repairEnv"}) result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) # 同步目标标的到同花顺 def sync_target_codes_to_ths(): codes = gpcode_manager.get_gp_list() code_list = [] for code in codes: code_list.append(code) client = authority._get_client_ids_by_rule("client-industry") result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list}) return result # 修复同花顺主站 def repair_ths_main_site(client): result = send_msg(client, {"action": "updateTHSSite"}) result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) if __name__ == "__main__": try: thsl2tradequeuemanager().test() # repair_ths_main_site(2) except Exception as e: print(str(e))