""" 接受客户端数据的服务器 """ import decimal import json import logging import random import socketserver import socket import threading import time from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util from code_attribute import code_volumn_manager, code_nature_analyse, global_data_loader, gpcode_manager, \ gpcode_first_screen_manager import constant from user import authority import inited_data from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager import l2_data_util from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, LCancelBigNumComputer import l2.l2_data_util from output import code_info_output from third_data import hot_block_data_process, block_info, kpl_api from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager from trade import trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \ current_price_process_manager, trade_juejin from code_attribute.code_data_util import ZYLTGBUtil import l2.transaction_progress from logs.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug from trade.huaxin import huaxin_trade_record_manager from trade.trade_manager import TradeTargetCodeModeManager from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager ths_util = import_util.import_lib("ths.ths_util") trade_gui = import_util.import_lib("trade.trade_gui") class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None): self.pipe_juejin = pipe_juejin # 增加的参数 self.pipe_ui = pipe_ui socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass # 首板tick级数据 class MyBaseRequestHandle(socketserver.BaseRequestHandler): l2_data_error_dict = {} last_trade_delegate_data = None buy1_volumn_manager = THSBuy1VolumnManager() ths_l2_trade_queue_manager = thsl2tradequeuemanager() latest_buy1_volumn_dict = {} l2_trade_queue_time_dict = {} l2_save_time_dict = {} l2_trade_buy_queue_dict = {} tradeBuyQueue = l2.transaction_progress.TradeBuyQueue() last_time = {} first_tick_datas = [] latest_oringin_data = {} last_l2_listen_health_time = {} __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager() __CodesPlateKeysManager = CodesHisReasonAndBlocksManager() # 在L2监控上采集的现价 __l2_current_price_data = {} def setup(self): super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 # print("----setup方法被执行-----") # print("打印传入的参数:", self.server.pipe) self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance() def handle(self): host = self.client_address[0] super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做 # print("-------handler方法被执行----") # print(self.server) # print(self.request) # 服务 # print("客户端地址:", self.client_address) # 客户端地址 # print(self.__dict__) # print("- " * 30) # print(self.server.__dict__) # print("- " * 30) sk: socket.socket = self.request while True: data = sk.recv(1024 * 100) if len(data) == 0: # print("客户端断开连接") break _str = str(data, encoding="gbk") if len(_str) > 0: # print("结果:",_str) type = -1 try: type = data_process.parseType(_str) except Exception as e: if str(e).find("Unterminated string starting") > -1: _str = _str.replace("\n", "") type = data_process.parseType(_str) else: print(_str) return_str = "OK" if type == 0: try: origin_start_time = round(time.time() * 1000) __start_time = round(time.time() * 1000) # level2盘口数据 day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data( _str) last_health_time = self.last_l2_listen_health_time.get((client, channel)) # --------------------------------设置L2健康状态-------------------------------- if last_health_time is None or __start_time - last_health_time > 1000: self.last_l2_listen_health_time[(client, channel)] = __start_time # 更新监听位健康状态 if origin_datas_count == 0: l2_listen_pos_health_manager.set_unhealthy(client, channel) else: l2_listen_pos_health_manager.set_healthy(client, channel) l2_log.threadIds[code] = random.randint(0, 100000) if True: # 间隔1s保存一条l2的最后一条数据 if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ code] >= 1000 and len(origin_datas) > 0: self.l2_save_time_dict[code] = origin_start_time logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1]) # 10ms的网络传输延时 capture_timestamp = __start_time - process_time - 10 # print("截图时间:", process_time) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "截图时间:{} 数据解析时间".format(process_time)) cid, pid = gpcode_manager.get_listen_code_pos(code) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2获取代码位置耗时") # 判断目标代码位置是否与上传数据位置一致 if cid is not None and pid is not None and client == int(cid) and channel == int(pid): # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas)) l2_data_util.save_l2_latest_data_number(code, origin_datas_count) # 保存l2数据条数 if not origin_datas: # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)): raise Exception("无新增数据") # 保存最近的数据 self.latest_oringin_data[code] = origin_datas limit_up_price = gpcode_manager.get_limit_up_price(code) datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price) try: # 校验客户端代码 l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) __start_time = round(time.time() * 1000) if gpcode_manager.is_listen(code): __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2外部数据预处理耗时") l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2数据有效处理外部耗时", False) # 保存原始数据数量 # l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) # if round(time.time() * 1000) - __start_time > 20: # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, # "异步保存原始数据条数耗时", # False) except l2_data_manager.L2DataException as l: # 单价不符 if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: key = "{}-{}-{}".format(client, channel, code) if key not in self.l2_data_error_dict or round( time.time() * 1000) - self.l2_data_error_dict[key] > 10000: # self.l2CodeOperate.repaire_l2_data(code) # todo 太敏感移除代码 logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) # 单价不一致时需要移除代码重新添加 l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误") self.l2_data_error_dict[key] = round(time.time() * 1000) except Exception as e: print("异常", str(e), code) logging.exception(e) logger_l2_error.error("出错:{}".format(str(e))) logger_l2_error.error("内容:{}".format(_str)) finally: __end_time = round(time.time() * 1000) # 只记录大于40ms的数据 if __end_time - origin_start_time > 100: l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, "l2数据处理总耗时", True) except Exception as e: if str(e).find("新增数据"): pass else: logger_l2_error.exception(e) elif type == 1: # 设置股票代码 data_list, is_add = data_process.parseGPCode(_str) ZYLTGBUtil.save_list(data_list) code_list = [] for data in data_list: code_list.append(data["code"]) # 获取基本信息 code_datas = HistoryKDatasUtils.get_gp_latest_info(code_list) # if is_add: # gpcode_manager.add_gp_list(code_datas) # else: # gpcode_manager.set_gp_list(code_datas) if not is_add: # 同步同花顺目标代码 t1 = threading.Thread(target=lambda: sync_target_codes_to_ths()) t1.setDaemon(True) t1.start() elif type == 2: # 涨停代码 dataList, is_add = data_process.parseGPCode(_str) # 设置涨停时间 gpcode_manager.set_limit_up_list(dataList) # 保存到内存中 if dataList: global_data_loader.add_limit_up_codes(dataList) ths_industry_util.set_industry_hot_num(dataList) # 保存涨停时间 gp_list = gpcode_manager.get_gp_list() gp_code_set = set(gp_list) now_str = tool.get_now_time_str() if dataList: for d in dataList: if d["time"] == "00:00:00" or tool.get_time_as_second(now_str) < tool.get_time_as_second( d["time"]): continue if d["code"] not in gp_code_set: continue # 获取是否有涨停时间 # if limit_up_time_manager.get_limit_up_time(d["code"]) is None: # limit_up_time_manager.save_limit_up_time(d["code"], d["time"]) elif type == 22: try: if int(tool.get_now_time_str().replace(":", "")) < int("092500"): raise Exception('未到接受时间') # 首板代码 dataList, is_add = data_process.parseGPCode(_str) limit_up_price_dict = {} temp_codes = [] codes = [] tick_datas = [] if dataList: for data in dataList: code = data["code"] codes.append(code) # ---查询想买单,如果没有在列表中就需要强行加入列表 want_codes = gpcode_manager.WantBuyCodesManager.list_code() if want_codes: # 没有在现价采集中的想买代码 diff_codes = set(want_codes) - set(codes) if diff_codes: zyltgb_list = [] for code in diff_codes: # 查询是否在L2现价中 if code in self.__l2_current_price_data: item = self.__l2_current_price_data.get(code) codes.append(code) dataList.append(item) # 保存自由流通股本 zyltgb_list.append( {"code": code, "zyltgb": item["zyltgb"], "zyltgb_unit": item["zyltgbUnit"]}) else: # 获取涨停价 _limit_up_price = gpcode_manager.get_limit_up_price(code) if not _limit_up_price: inited_data.re_set_price_pres([code], True) # 再次获取涨停价 _limit_up_price = gpcode_manager.get_limit_up_price(code) if _limit_up_price: # 成功获取到了涨停价,构造虚拟的现价信息 codes.append(code) dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0", "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100", "zyltgbUnit": 0}) # 强制更新自由流通股本 if zyltgb_list: ZYLTGBUtil.save_list(zyltgb_list) # 将保存的数据更新到内存中 for z in zyltgb_list: val = ZYLTGBUtil.get(z["code"]) if val: global_util.zyltgb_map[z["code"]] = val # ---保存未筛选的首板代码 new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes) # 保存自由流通股本 if dataList: zyltgb_list = [] for data in dataList: code = data["code"] if code in global_util.zyltgb_map: continue zyltgb_list.append( {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]}) if zyltgb_list: ZYLTGBUtil.save_list(zyltgb_list) global_data_loader.load_zyltgb() bad_codes = set() # 获取昨日收盘价 for code in codes: # 如果涨停价是空值就需要设置昨日收盘价格 if gpcode_manager.get_limit_up_price(code) is None: inited_data.re_set_price_pres([code], True) # 板块关键字准备 for code in codes: if not self.__CodesPlateKeysManager.get_history_limit_up_reason(code) is None: self.__CodesPlateKeysManager.set_history_limit_up_reason(code, KPLLimitUpDataRecordManager.get_latest_blocks_set( code)) if self.__CodesPlateKeysManager.get_blocks(code) is None: try: results = kpl_api.getStockIDPlate(code) bs = [r[1] for r in results] self.__CodesPlateKeysManager.set_blocks(code, bs) except Exception as e: logging.exception(e) pass # 获取60天最大记录 for code in codes: need_get_volumn = False if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None: need_get_volumn = True if not need_get_volumn and code_nature_analyse.CodeNatureRecordManager.get_nature( code) is None: need_get_volumn = True if need_get_volumn: volumes_data = inited_data.get_volumns_by_code(code, 150) volumes = inited_data.parse_max_volume(volumes_data[:90], code_nature_analyse.is_new_top( gpcode_manager.get_limit_up_price(code), volumes_data[:90])) logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes) code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2]) # 判断K线形态 is_has_k_format, msg = code_nature_analyse.is_has_k_format( gpcode_manager.get_limit_up_price(code), volumes_data) if not is_has_k_format: logger_first_code_record.info("{}首板K线形态不好,{}", code, msg) # 股性不好,就不要加入 bad_codes.add(code) # 加入禁止交易代码 l2_trade_util.forbidden_trade(code) code_nature_analyse.set_record_datas(code, gpcode_manager.get_limit_up_price(code), volumes_data) gpcode_manager.FirstCodeManager.add_record(codes) if new_add_codes: gpcode_manager.set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes, fields="symbol,sec_name,sec_type,sec_level")) # 加入首板历史记录 logger_first_code_record.info("新增首板:{}", new_add_codes) # 移除代码 listen_codes = gpcode_manager.get_listen_codes() for lc in listen_codes: if not gpcode_manager.is_in_gp_pool(lc): # 移除代码 l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除") # 保存现价 if dataList: for data in dataList: code = data["code"] codes.append(code) limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: limit_up_price_dict[code] = limit_up_price else: temp_codes.append(code) tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"], "volumeUnit": data["volumeUnit"]}) # 获取涨停价 if temp_codes: # 获取涨停价 inited_data.re_set_price_pres(temp_codes) # 重新获取涨停价 for code in temp_codes: limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: limit_up_price_dict[code] = limit_up_price # 保存现价 self.first_tick_datas.clear() self.first_tick_datas.extend(tick_datas) # 首板数据加工 prices = [] for data in dataList: code = data["code"] price = data["price"] limit_up_time = data["time"] if limit_up_time == "00:00:00": limit_up_time = None if code not in limit_up_price_dict: continue is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01 # 纠正数据 if is_limit_up and limit_up_time is None: limit_up_time = tool.get_now_time_str() if is_limit_up: # 加入首板涨停 gpcode_manager.FirstCodeManager.add_limited_up_record([code]) pricePre = gpcode_manager.get_price_pre(code) if pricePre is None: inited_data.re_set_price_pres([code]) rate = round((float(price) - pricePre) * 100 / pricePre, 1) prices.append( {"code": code, "time": limit_up_time, "rate": rate, "limit_up": is_limit_up}) if code in new_add_codes: if is_limit_up: place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count( code) if place_order_count == 0: trade_data_manager.placeordercountmanager.place_order(code) gpcode_first_screen_manager.process_ticks(prices) except Exception as e: logging.exception(e) elif type == 3: # 交易成功信息 dataList = data_process.parseList(_str) try: trade_manager.process_trade_success_data(dataList) except Exception as e: logging.exception(e) trade_manager.save_trade_success_data(dataList) elif type == 5: logger_trade_delegate.debug("接收到委托信息") __start_time = round(time.time() * 1000) try: # 交易委托信息 dataList = data_process.parseList(_str) if self.last_trade_delegate_data != _str: self.last_trade_delegate_data = _str # 保存委托信息 logger_trade_delegate.info(dataList) try: # 设置申报时间 for item in dataList: apply_time = item["apply_time"] if apply_time and len(apply_time) >= 8: code = item["code"] trade_state = trade_manager.get_trade_state(code) # 设置下单状态的代码为已委托 if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: origin_apply_time = apply_time apply_time = apply_time[0:6] apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6]) ms = origin_apply_time[6:9] if int(ms) > 500: # 时间+1s apply_time = tool.trade_time_add_second(apply_time, 1) print(apply_time) except Exception as e: logging.exception(e) try: trade_manager.process_trade_delegate_data(dataList) except Exception as e: logging.exception(e) trade_manager.save_trade_delegate_data(dataList) # 刷新交易界面 if trade_gui is not None: trade_gui.THSGuiTrade().refresh_data() finally: pass elif type == 4: # 行业代码信息 dataList = data_process.parseList(_str) codes = [] for datas in dataList: for d in datas: name = ths_industry_util.get_name_by_code(d['code']) if not name or name == 'None': codes.append(d["code"]) # 根据代码获取代码名称 codes_name = {} if codes: codes_name = HistoryKDatasUtils.get_gp_codes_names(codes) ths_industry_util.save_industry_code(dataList, codes_name) elif type == 6: # 可用金额 datas = data_process.parseData(_str) client = datas["client"] money = datas["money"] # TODO存入缓存文件 trade_manager.set_available_money(client, money) # l2交易队列 elif type == 10: # 可用金额 __start_time = time.time() datas = data_process.parseData(_str) channel = datas["channel"] code = datas["code"] msg = "" try: if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code): # 没在目标代码中且没有在首板今日历史代码中 raise Exception("代码没在监听中") data = datas["data"] buy_time = data["buyTime"] buy_one_price = data["buyOnePrice"] buy_one_volumn = data["buyOneVolumn"] sell_one_price = data["sellOnePrice"] sell_one_volumn = data["sellOneVolumn"] buy_queue = data["buyQueue"] if buy_one_price is None: print('买1价没有,', code) limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: code_price_manager.Buy1PriceManager.process(code, buy_one_price, buy_time, limit_up_price, sell_one_price, sell_one_volumn) _start_time = time.time() msg += "买1价格处理:" + f"{_start_time - __start_time} " buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time, buy_queue) msg += "买队列保存:" + f"{time.time() - _start_time} " _start_time = time.time() if buy_queue_result_list: # 有数据 try: buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( decimal.Decimal("0.00")) # 获取执行位时间 buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data( code) if True: # 只有下单过后才获取交易进度 exec_time = None try: if buy_exec_index: exec_time = \ l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][ "time"] except: pass buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_, buy_queue_result_list, exec_time) if buy_progress_index is not None: HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index, buy_progress_index, l2.l2_data_util.local_today_datas.get( code), l2.l2_data_util.local_today_num_operate_map.get( code)) LCancelBigNumComputer.set_trade_progress(code, buy_progress_index, l2.l2_data_util.local_today_datas.get( code)) logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code, buy_progress_index, json.dumps(buy_queue_result_list)) # 计算大单成交额 deal_big_money_manager.set_trade_progress(code, buy_progress_index, l2.l2_data_util.local_today_datas.get( code), l2.l2_data_util.local_today_num_operate_map.get( code)) else: raise Exception("暂未获取到交易进度") msg += "计算成交进度:" + f"{time.time() - _start_time} " _start_time = time.time() except Exception as e: logging.exception(e) print("买入队列", code, buy_queue_result_list) logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{} 数据-{}", code, str(e), json.dumps(buy_queue_result_list)) # buy_queue是否有变化 if self.l2_trade_buy_queue_dict.get( code) is None or buy_queue != self.l2_trade_buy_queue_dict.get( code): self.l2_trade_buy_queue_dict[code] = buy_queue logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue) msg += "保存记录日志:" + f"{time.time() - _start_time} " _start_time = time.time() # 保存最近的记录 if self.ths_l2_trade_queue_manager.save_recod(code, data): if buy_time != "00:00:00": logger_l2_trade_queue.info("{}-{}", code, data) need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time, int(buy_one_volumn), buy_one_price) # if need_sync: # # 同步数据 # s = time.time() # L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) # msg += "量校验:"+f"{time.time()-s} " # print(buy_time, buy_one_price, buy_one_volumn) # print("L2买卖队列",datas) msg += "买1处理:" + f"{time.time() - _start_time} " _start_time = time.time() except: pass finally: space = time.time() - __start_time if space > 0.1: logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg) elif type == 20: # 登录 data = data_process.parse(_str)["data"] try: client_id, _authoritys = authority.login(data["account"], data["pwd"]) return_str = data_process.toJson( {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}}) except Exception as e: return_str = data_process.toJson({"code": 1, "msg": str(e)}) # 现价更新 elif type == 40: datas = data_process.parse(_str)["data"] if datas is None: datas = [] print("二板现价") # 获取暂存的二版现价数据 if self.first_tick_datas: datas.extend(self.first_tick_datas) if datas is not None: print("二板现价数量", len(datas)) for item in datas: volumn = item["volume"] volumnUnit = item["volumeUnit"] code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit) current_price_process_manager.accept_prices(datas) # L2现价更新 elif type == 41: datas = data_process.parse(_str)["data"] if datas: for d in datas: code = d["code"] self.__l2_current_price_data[code] = d elif type == 50: data = data_process.parse(_str)["data"] if data is not None: index = data["index"] code_name = data["codeName"].replace(" ", "") volumn = data["volumn"] price = data["price"] time_ = data["time"] code = global_util.name_codes.get(code_name) if code is None: global_data_loader.load_name_codes() code = global_util.name_codes.get(code_name) if code is not None: # 记录日志 if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn, price): # 记录数据 logger_buy_1_volumn_record.info("{}-{}", code, data) self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price) # 校正时间 time_ = tool.compute_buy1_real_time(time_) # 保存数据 need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn, price) # if need_cancel: # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") if need_sync: # 同步数据 L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) elif type == 30: # 心跳信息 data = data_process.parse(_str)["data"] client_id = data["client"] thsDead = data.get("thsDead") logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) client_manager.saveClientActive(int(client_id), host, thsDead) if constant.is_windows(): # 动态导入 if ths_util.is_ths_dead(client_id): # TODO 重启同花顺 # 报警 l2_clients = authority.get_l2_clients() if client_id in l2_clients: alert_util.alarm() elif type == 60: # L2自启动成功 data = data_process.parse(_str)["data"] client_id = data["client"] print("L2自启动成功", client_id) now_str = tool.get_now_time_str() ts = tool.get_time_as_second(now_str) # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行 if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False: # 准备批量设置代码 return_json = {"code": 1, "msg": "等待批量设置代码"} return_str = json.dumps(return_json) # 获取排名前16位的代码 codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16) codes = sorted(codes) if client_id == 2: codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE] else: codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:] codes_datas = [] for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): if i >= len(codes): break codes_datas.append((i, codes[i])) # 如果设置失败需要重试2次 for i in range(0, 3): set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas) if set_success: break else: time.sleep(3) else: return_json = {"code": 0, "msg": "开启在线状态"} return_str = json.dumps(return_json) elif type == 70: # 选股宝热门概念 data_json = data_process.parse(_str) day = data_json["day"] datas = data_json["data"] if datas: hot_block_data_process.save_datas(day, datas) print(datas) elif type == 71: # 根据代码获取选股宝热门概念 day = tool.get_now_date_str() code = data_process.parse(_str)["data"]["code"] __start_time = time.time() final_data = {'code': code, 'data': code_info_output.get_output_html(code)} return_str = json.dumps({"code": 0, "data": final_data}) print("代码信息获取时间", code, round((time.time() - __start_time) * 1000)) pass # 获取最近2个交易日涨停代码 elif type == 72: day = tool.get_now_date_str() data_dict = {} for i in range(0, 2): day = HistoryKDatasUtils.get_previous_trading_date(day) data_list = list(block_info.KPLLimitUpDataRecordManager.list_all(day)) codes_set = set() if data_list: for d in data_list: if len(d[4]) > 6: codes_set.add(d[3]) data_dict[day] = list(codes_set) return_str = json.dumps({"code": 0, "data": data_dict}) elif type == 80: # 撤单 data = json.loads(_str) code = data["data"]["code"] if code: state = trade_manager.get_trade_state(code) if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING: try: l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销") return_str = json.dumps({"code": 0}) except Exception as e: return_str = json.dumps({"code": 2, "msg": str(e)}) else: return_str = json.dumps({"code": 1, "msg": "未处于可撤单状态"}) else: return_str = json.dumps({"code": 1, "msg": "请上传代码"}) elif type == 82: # 获取委托列表 data = json.loads(_str) update_time = data["data"]["update_time"] results, update_time = huaxin_trade_record_manager.DelegateRecordManager.list_by_day( tool.get_now_date_str("%Y%m%d"), update_time) return_str = json.dumps( {"code": 0, "data": {"list": results, "updateTime": update_time}, "msg": "请上传代码"}) elif type == 201: # 加入黑名单 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: l2_trade_util.forbidden_trade(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) return_str = json.dumps({"code": 0}) elif type == 202: # 加入白名单 data = json.loads(_str) codes = data["data"]["codes"] try: for code in codes: # 自由流通市值>50亿,股价高于30块的不能加白名单 limit_up_price = gpcode_manager.get_limit_up_price(code) if float(limit_up_price) > 30: raise Exception("股价高于30元") # zyltgb = global_util.zyltgb_map.get(code) # if zyltgb is None: # global_data_loader.load_zyltgb() # zyltgb = global_util.zyltgb_map.get(code) # if zyltgb > 50 * 100000000: # raise Exception("自由流通股本大于50亿") l2_trade_util.WhiteListCodeManager.add_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) return_str = json.dumps({"code": 0}) except Exception as e: return_str = json.dumps({"code": 1, "msg": str(e)}) elif type == 203: # 移除黑名单 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: l2_trade_util.remove_from_forbidden_trade_codes(code) return_str = json.dumps({"code": 0}) elif type == 204: # 移除白名单 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: l2_trade_util.WhiteListCodeManager.remove_code(code) return_str = json.dumps({"code": 0}) elif type == 301: # 黑名单列表 codes = l2_trade_util.BlackListCodeManager.list_codes() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") return_str = json.dumps({"code": 0, "data": datas}) elif type == 302: # 黑名单列表 codes = l2_trade_util.WhiteListCodeManager.list_codes() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") return_str = json.dumps({"code": 0, "data": datas}) elif type == 401: # 加入想要买 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: gpcode_manager.WantBuyCodesManager.add_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) if "plates" in data["data"]: for i in range(len(data["data"]["plates"])): self.__KPLCodeLimitUpReasonManager.save_reason(codes[i], data["data"]["plates"][i]) return_str = json.dumps({"code": 0}) elif type == 402: data = json.loads(_str) codes = data["data"]["codes"] for code in codes: gpcode_manager.WantBuyCodesManager.remove_code(code) return_str = json.dumps({"code": 0}) elif type == 403: plate = None include_codes = set() if _str: data = json.loads(_str) plate = data.get("plate") if plate: code_map = self.__KPLCodeLimitUpReasonManager.list_all() for k in code_map: if code_map[k] == plate: include_codes.add(k) codes = gpcode_manager.WantBuyCodesManager.list_code() datas = [] for code in codes: if plate and plate != '其他' and code not in include_codes: continue name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") return_str = json.dumps({"code": 0, "data": datas}) elif type == 411: data = json.loads(_str) codes = data["data"]["codes"] for code in codes: gpcode_manager.PauseBuyCodesManager.add_code(code) name = gpcode_manager.get_code_name(code) if not name: results = HistoryKDatasUtils.get_gp_codes_names([code]) if results: gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) return_str = json.dumps({"code": 0}) # 加入暂停买入列表 elif type == 412: # 移除暂停买入列表 data = json.loads(_str) codes = data["data"]["codes"] for code in codes: gpcode_manager.PauseBuyCodesManager.remove_code(code) return_str = json.dumps({"code": 0}) elif type == 413: # 暂停买入列表 codes = gpcode_manager.PauseBuyCodesManager.list_code() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) datas.append(f"{name}:{code}") return_str = json.dumps({"code": 0, "data": datas}) elif type == 420: # 是否可以撤单 data = json.loads(_str) codes = data["data"]["codes"] code = codes[0] state = trade_manager.get_trade_state(code) if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS: return_str = json.dumps({"code": 0, "msg": "可以取消"}) else: return_str = json.dumps({"code": 1, "msg": "不可以取消"}) elif type == 430: # 查询代码属性 data = json.loads(_str) code = data["data"]["code"] # 查询是否想买单/白名单/黑名单/暂不买 code_name = gpcode_manager.get_code_name(code) want = gpcode_manager.WantBuyCodesManager.is_in(code) white = l2_trade_util.WhiteListCodeManager.is_in(code) black = l2_trade_util.is_in_forbidden_trade_codes(code) pause_buy = gpcode_manager.PauseBuyCodesManager.is_in(code) desc_list = [] if want: desc_list.append("【想买单】") if white: desc_list.append("【白名单】") if black: desc_list.append("【黑名单】") if pause_buy: desc_list.append("【暂不买】") return_str = json.dumps( {"code": 0, "data": {"code_info": (code, code_name), "desc": "".join(desc_list)}}) elif type == 501: data = json.loads(_str) is_open = data["data"]["open"] if is_open: trade_manager.TradeStateManager.open_buy() else: trade_manager.TradeStateManager.close_buy() return_str = json.dumps({"code": 0, "msg": ("开启成功" if is_open else "关闭成功")}) elif type == 502: can_buy = trade_manager.TradeStateManager.is_can_buy() return_str = json.dumps({"code": 0, "data": {"can_buy": can_buy}}) elif type == 503: # 设置交易目标代码的模式 data = json.loads(_str) mode = data["data"]["mode"] try: TradeTargetCodeModeManager.set_mode(mode) return_str = json.dumps({"code": 0, "data": {"mode": mode}}) except Exception as e: return_str = json.dumps({"code": 1, "msg": str(e)}) elif type == 504: # 获取交易目标代码模式 mode = TradeTargetCodeModeManager.get_mode() return_str = json.dumps({"code": 0, "data": {"mode": mode}}) elif type == 601: pass # 加自选 elif type == 602: pass # 移除自选 sk.send(return_str.encode()) # print("----------handler end ----------") def finish(self): super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做 # print("--------finish方法被执行---") def send_msg(client_id, data): _ip = client_manager.getActiveClientIP(client_id) # print("ip", client_id, _ip) if _ip is None or len(_ip) <= 0: raise Exception("客户端IP为空") socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socketClient.connect((_ip, 9006)) # 连接socket try: socketClient.send(json.dumps(data).encode()) recv = socketClient.recv(1024) result = str(recv, encoding="gbk") return result finally: socketClient.close() # 客户端心跳机制 def test_client_server(): while True: clients = authority.get_l2_clients() for client in clients: # print("心跳", client) try: send_msg(client, {"action": "test"}) except: pass # 矫正客户端代码 l2_code_operate.correct_client_codes() time.sleep(5) # 获取采集客户端的状态 def get_client_env_state(client): result = send_msg(client, {"action": "getEnvState"}) result = json.loads(result) if result["code"] == 0: return json.loads(result["data"]) else: raise Exception(result["msg"]) # 修复采集客户端 def repair_client_env(client): result = send_msg(client, {"action": "repairEnv"}) result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) # 同步目标标的到同花顺 def sync_target_codes_to_ths(): codes = gpcode_manager.get_second_gp_list() code_list = [] for code in codes: code_list.append(code) client = authority._get_client_ids_by_rule("data-maintain") result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list}) return result # 修复同花顺主站 def repair_ths_main_site(client): result = send_msg(client, {"action": "updateTHSSite"}) result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) else: # 测速成功 client_infos = [] for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE): client_infos.append((client, index)) l2_listen_pos_health_manager.init_all(client_infos) if __name__ == "__main__": # 交易成功无法读取时备用 while False: try: datas = trade_juejin.get_execution_reports() # 上传数据 fdatas = [] for d in datas: fdatas.append( {"code": d[0], "money": d[4], "num": d[2], "price": d[3], "time": d[7], "trade_num": d[5], "type": d[1] - 1}) print(fdatas) if fdatas: try: trade_manager.process_trade_success_data(fdatas) except Exception as e: logging.exception(e) trade_manager.save_trade_success_data(fdatas) except: pass time.sleep(1.5) if __name__ == "__main__1": codes = gpcode_manager.get_first_gp_codes() for code in codes: try: global_data_loader.load_zyltgb() limit_up_price = float(gpcode_manager.get_limit_up_price(code)) volumes_data = inited_data.get_volumns_by_code(code, 150) volumes_data = volumes_data[1:] volumes = inited_data.parse_max_volume(volumes_data[:60], code_nature_analyse.is_new_top(limit_up_price, volumes_data[:60])) logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes) code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2]) # 判断K线形态 k_format = code_nature_analyse.get_k_format( limit_up_price, volumes_data) print(k_format) code_nature_analyse.set_record_datas(code, gpcode_manager.get_limit_up_price(code), volumes_data) except: pass # code_nature_analyse.set_record_datas(code, # limit_up_price, # volumes_data)