From 68464c679ae5e1ae35e7e67e3b339ba0f939cbd3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 15 三月 2023 14:46:03 +0800 Subject: [PATCH] 选股宝板块优化 --- server.py | 303 ++++++++++++++++++++++++++++++++------------------ 1 files changed, 195 insertions(+), 108 deletions(-) diff --git a/server.py b/server.py index df38144..426bce3 100644 --- a/server.py +++ b/server.py @@ -13,6 +13,7 @@ import alert_util import client_manager import code_volumn_manager +import constant import data_process import global_data_loader import global_util @@ -28,13 +29,14 @@ import ths_industry_util import ths_util import tool +from third_data import hot_block_data_process from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util import l2_code_operate from code_data_util import ZYLTGBUtil import l2.transaction_progress from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ - logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record + logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager @@ -66,6 +68,7 @@ tradeBuyQueue = l2.transaction_progress.TradeBuyQueue() last_time = {} first_tick_datas = [] + latest_oringin_data = {} def setup(self): super().setup() # 鍙互涓嶈皟鐢ㄧ埗绫荤殑setup()鏂规硶锛岀埗绫荤殑setup鏂规硶浠�涔堥兘娌″仛 @@ -106,24 +109,16 @@ __start_time = round(time.time() * 1000) # level2鐩樺彛鏁版嵁 - day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data( + day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data( _str) + l2_log.threadIds[code] = random.randint(0, 100000) - - if channel == 0: - now_time = round(time.time() * 1000) - if self.last_time.get(channel) is not None: - # print("鎺ュ彈鍒癓2鐨勬暟鎹�", channel, now_time - self.last_time.get(channel), "瑙f瀽鑰楁椂",now_time - origin_start_time) - pass - - self.last_time[channel] = now_time - if True: # 闂撮殧1s淇濆瓨涓�鏉2鐨勬渶鍚庝竴鏉℃暟鎹� if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ - code] >= 1000 and len(datas) > 0: + code] >= 1000 and len(origin_datas) > 0: self.l2_save_time_dict[code] = origin_start_time - logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1]) + logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1]) # 10ms鐨勭綉缁滀紶杈撳欢鏃� capture_timestamp = __start_time - process_time - 10 @@ -137,6 +132,16 @@ "l2鑾峰彇浠g爜浣嶇疆鑰楁椂") # 鍒ゆ柇鐩爣浠g爜浣嶇疆鏄惁涓庝笂浼犳暟鎹綅缃竴鑷� if cid is not None and pid is not None and client == int(cid) and channel == int(pid): + # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas)) + l2_data_util.save_l2_latest_data_number(code, origin_datas_count) + # 淇濆瓨l2鏁版嵁鏉℃暟 + if not origin_datas: + #or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)): + raise Exception("鏃犳柊澧炴暟鎹�") + # 淇濆瓨鏈�杩戠殑鏁版嵁 + self.latest_oringin_data[code] = origin_datas + limit_up_price = gpcode_manager.get_limit_up_price(code) + datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price) try: # 鏍¢獙瀹㈡埛绔唬鐮� l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) @@ -151,11 +156,11 @@ "l2鏁版嵁鏈夋晥澶勭悊澶栭儴鑰楁椂", False) # 淇濆瓨鍘熷鏁版嵁鏁伴噺 - l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) - if round(time.time() * 1000) - __start_time > 20: - l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, - "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂", - False) + # l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) + # if round(time.time() * 1000) - __start_time > 20: + # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, + # "寮傛淇濆瓨鍘熷鏁版嵁鏉℃暟鑰楁椂", + # False) except l2_data_manager.L2DataException as l: # 鍗曚环涓嶇 @@ -184,7 +189,10 @@ "l2鏁版嵁澶勭悊鎬昏�楁椂", True) except Exception as e: - logger_l2_error.exception(e) + if str(e).find("鏂板鏁版嵁"): + pass + else: + logger_l2_error.exception(e) elif type == 1: # 璁剧疆鑲$エ浠g爜 @@ -235,7 +243,6 @@ raise Exception('鏈埌鎺ュ彈鏃堕棿') # 棣栨澘浠g爜 dataList, is_add = data_process.parseGPCode(_str) - # {'code': '605300', 'limitUpPercent': '0009.99', 'price': '0020.14', 'time': '10:44:00', 'volume': '44529', 'volumeUnit': 2, 'zyltMoney': '0011.60', 'zyltMoneyUnit': 0} limit_up_price_dict = {} temp_codes = [] codes = [] @@ -250,24 +257,31 @@ else: temp_codes.append(code) # data["price"] - tick_datas.append({"code": code, "price": data["price"], "volumn": data["volume"], - "volumnUnit": data["volumeUnit"]}) + tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"], + "volumeUnit": data["volumeUnit"]}) # 淇濆瓨鏈瓫閫夌殑棣栨澘浠g爜 new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes) for code in new_add_codes: - if (not l2_trade_util.is_in_forbidden_trade_codes(code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3: + if (not l2_trade_util.is_in_forbidden_trade_codes( + code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3: l2_trade_util.forbidden_trade(code) if new_add_codes: gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes)) # 鍔犲叆棣栨澘鍘嗗彶璁板綍 gpcode_manager.FirstCodeManager.add_record(new_add_codes) - logger_first_code_record.info("鏂板棣栨澘锛歿}",new_add_codes) + logger_first_code_record.info("鏂板棣栨澘锛歿}", new_add_codes) # 鑾峰彇60澶╂渶澶ц褰� for code in new_add_codes: if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None: volumes = juejin.get_volumn(code) - code_volumn_manager.set_histry_volumn(code,volumes[0],volumes[1]) + code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1]) + # 绉婚櫎浠g爜 + listen_codes = gpcode_manager.get_listen_codes() + for lc in listen_codes: + if not gpcode_manager.is_in_gp_pool(lc): + # 绉婚櫎浠g爜 + l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�") if temp_codes: # 鑾峰彇娑ㄥ仠浠� @@ -284,7 +298,7 @@ if code in global_util.zyltgb_map: continue zyltgb_list.append( - {"code": code, "zyltgb": data["zyltMoney"], "zyltgb_unit": data["zyltMoneyUnit"]}) + {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]}) if zyltgb_list: ZYLTGBUtil.save_list(zyltgb_list) global_data_loader.load_zyltgb() @@ -307,6 +321,9 @@ # 绾犳鏁版嵁 if is_limit_up and limit_up_time is None: limit_up_time = tool.get_now_time_str() + if is_limit_up: + # 鍔犲叆棣栨澘娑ㄥ仠 + gpcode_manager.FirstCodeManager.add_limited_up_record([code]) pricePre = gpcode_manager.get_price_pre(code) rate = round((float(price) - pricePre) * 100 / pricePre, 1) prices.append( @@ -318,8 +335,6 @@ code) if place_order_count == 0: trade_data_manager.placeordercountmanager.place_order(code) - # 鍔犲叆棣栨澘娑ㄥ仠 - gpcode_manager.FirstCodeManager.add_limited_up_record([code]) gpcode_first_screen_manager.process_ticks(prices) except Exception as e: @@ -400,78 +415,96 @@ # l2浜ゆ槗闃熷垪 elif type == 10: # 鍙敤閲戦 + __start_time = time.time() datas = data_process.parseData(_str) channel = datas["channel"] code = datas["code"] - data = datas["data"] - buy_time = data["buyTime"] - buy_one_price = data["buyOnePrice"] - buy_one_volumn = data["buyOneVolumn"] - buy_queue = data["buyQueue"] - if buy_one_price is None: - print('涔�1浠锋病鏈夛紝', code) - limit_up_price = gpcode_manager.get_limit_up_price(code) - if limit_up_price is not None: - buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time, - buy_queue) - if buy_queue_result_list: - # 鏈夋暟鎹� - try: - buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( - decimal.Decimal("0.00")) - # 鑾峰彇鎵ц浣嶆椂闂� - exec_time = None - buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data( - code) - if buy_exec_index: - try: - exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][ - "time"] - except: - pass + try: + if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code): + # 娌″湪鐩爣浠g爜涓笖娌℃湁鍦ㄩ鏉夸粖鏃ュ巻鍙蹭唬鐮佷腑 + raise Exception("浠g爜娌″湪鐩戝惉涓�") + data = datas["data"] + buy_time = data["buyTime"] + buy_one_price = data["buyOnePrice"] + buy_one_volumn = data["buyOneVolumn"] + buy_queue = data["buyQueue"] + if buy_one_price is None: + print('涔�1浠锋病鏈夛紝', code) + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is not None: + buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, + buy_time, + buy_queue) + if buy_queue_result_list: + raise Exception("娴嬭瘯涓柇") + # 鏈夋暟鎹� + try: + buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( + decimal.Decimal("0.00")) + # 鑾峰彇鎵ц浣嶆椂闂� + exec_time = None + buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data( + code) + if buy_exec_index: + # 鍙湁涓嬪崟杩囧悗鎵嶈幏鍙栦氦鏄撹繘搴� + try: + exec_time = \ + l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][ + "time"] + except: + pass - buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_, - buy_queue_result_list, - exec_time) - if buy_progress_index is not None: - HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index, - buy_progress_index, - l2.l2_data_util.local_today_datas.get( - code), - l2.l2_data_util.local_today_num_operate_map.get( - code)) - logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, - buy_progress_index, - json.dumps(buy_queue_result_list)) - else: - raise Exception("鏆傛湭鑾峰彇鍒颁氦鏄撹繘搴�") - except Exception as e: - logging.exception(e) - print("涔板叆闃熷垪", code, buy_queue_result_list) - logger_l2_trade_buy_queue.warning("鑾峰彇鎴愪氦浣嶇疆澶辫触锛� code-{} 鍘熷洜-{} 鏁版嵁-{}", code, str(e), - json.dumps(buy_queue_result_list)) + buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, + buy_one_price_, + buy_queue_result_list, + exec_time) + if buy_progress_index is not None: + HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index, + buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) + logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, + buy_progress_index, + json.dumps(buy_queue_result_list)) + else: + raise Exception("鏆傛湭鑾峰彇鍒颁氦鏄撹繘搴�") + except Exception as e: + logging.exception(e) + print("涔板叆闃熷垪", code, buy_queue_result_list) + logger_l2_trade_buy_queue.warning("鑾峰彇鎴愪氦浣嶇疆澶辫触锛� code-{} 鍘熷洜-{} 鏁版嵁-{}", code, str(e), + json.dumps(buy_queue_result_list)) - # buy_queue鏄惁鏈夊彉鍖� - if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get( + # buy_queue鏄惁鏈夊彉鍖� + if self.l2_trade_buy_queue_dict.get( + code) is None or buy_queue != self.l2_trade_buy_queue_dict.get( code): - self.l2_trade_buy_queue_dict[code] = buy_queue - logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue) - # 淇濆瓨鏈�杩戠殑璁板綍 - if self.ths_l2_trade_queue_manager.save_recod(code, data): - if buy_time != "00:00:00": - logger_l2_trade_queue.info("{}-{}", code, data) - self.buy1_price_manager.save(code, buy_one_price) - need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time, - int(buy_one_volumn), - buy_one_price) - #if need_cancel: - # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") - if need_sync: - # 鍚屾鏁版嵁 - L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) - # print(buy_time, buy_one_price, buy_one_volumn) + self.l2_trade_buy_queue_dict[code] = buy_queue + logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue) + # 淇濆瓨鏈�杩戠殑璁板綍 + if self.ths_l2_trade_queue_manager.save_recod(code, data): + if buy_time != "00:00:00": + logger_l2_trade_queue.info("{}-{}", code, data) + self.buy1_price_manager.save(code, buy_one_price) + need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time, + int(buy_one_volumn), + buy_one_price) + # if need_cancel: + # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") + if need_sync: + # 鍚屾鏁版嵁 + L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) + # print(buy_time, buy_one_price, buy_one_volumn) - # print("L2涔板崠闃熷垪",datas) + # print("L2涔板崠闃熷垪",datas) + except: + pass + finally: + space = time.time() - __start_time + if space > 0.1: + logger_debug.info("{}鎴愪氦闃熷垪澶勭悊鏃堕棿锛歿}", code, space) + elif type == 20: # 鐧诲綍 data = data_process.parse(_str)["data"] @@ -484,14 +517,15 @@ # 鐜颁环鏇存柊 elif type == 40: datas = data_process.parse(_str)["data"] + print("浜屾澘鐜颁环") # 鑾峰彇鏆傚瓨鐨勪簩鐗堢幇浠锋暟鎹� if datas and self.first_tick_datas: datas.extend(self.first_tick_datas) if datas is not None: - print("鐜颁环鏁伴噺", len(datas)) + print("浜屾澘鐜颁环鏁伴噺", len(datas)) for item in datas: - volumn = item["volumn"] - volumnUnit = item["volumnUnit"] + volumn = item["volume"] + volumnUnit = item["volumeUnit"] code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit) juejin.accept_prices(datas) elif type == 50: @@ -519,12 +553,11 @@ # 淇濆瓨鏁版嵁 need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn, price) - #if need_cancel: + # if need_cancel: # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") if need_sync: # 鍚屾鏁版嵁 L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) - elif type == 30: # 蹇冭烦淇℃伅 data = data_process.parse(_str)["data"] @@ -554,11 +587,11 @@ codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16) codes = sorted(codes) if client_id == 2: - codes = codes[:8] + codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE] else: - codes = codes[8:] + codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:] codes_datas = [] - for i in range(0, 8): + for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): if i >= len(codes): break codes_datas.append((i, codes[i])) @@ -572,12 +605,67 @@ else: return_json = {"code": 0, "msg": "寮�鍚湪绾跨姸鎬�"} return_str = json.dumps(return_json) + elif type == 70: + # 閫夎偂瀹濈儹闂ㄦ蹇� + datas = data_process.parse(_str)["data"] + if datas: + hot_block_data_process.save_datas(datas) + print(datas) elif type == 201: + # 鍔犲叆榛戝悕鍗� data = json.loads(_str) codes = data["data"]["codes"] for code in codes: l2_trade_util.forbidden_trade(code) + name = gpcode_manager.get_code_name(code) + if not name: + results = juejin.JueJinManager.get_gp_codes_names([code]) + if results: + gpcode_manager.CodesNameManager.add_first_code_name(code,results[code]) + return_str = json.dumps({"code": 0}) + elif type == 202: + # 鍔犲叆鐧藉悕鍗� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + l2_trade_util.WhiteListCodeManager.add_code(code) + name = gpcode_manager.get_code_name(code) + if not name: + results = juejin.JueJinManager.get_gp_codes_names([code]) + if results: + gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) + return_str = json.dumps({"code": 0}) + elif type == 203: + # 绉婚櫎榛戝悕鍗� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + l2_trade_util.remove_from_forbidden_trade_codes(code) + return_str = json.dumps({"code": 0}) + elif type == 204: + # 绉婚櫎鐧藉悕鍗� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + l2_trade_util.WhiteListCodeManager.remove_code(code) + return_str = json.dumps({"code": 0}) + elif type == 301: + # 榛戝悕鍗曞垪琛� + codes = l2_trade_util.BlackListCodeManager.list_codes() + datas = [] + for code in codes: + name = gpcode_manager.get_code_name(code) + datas.append(f"{name}:{code}") + return_str = json.dumps({"code": 0, "data": datas}) + elif type == 302: + # 榛戝悕鍗曞垪琛� + codes = l2_trade_util.WhiteListCodeManager.list_codes() + datas = [] + for code in codes: + name = gpcode_manager.get_code_name(code) + datas.append(f"{name}:{code}") + return_str = json.dumps({"code": 0, "data": datas}) sk.send(return_str.encode()) @@ -644,7 +732,7 @@ code_list = [] for code in codes: code_list.append(code) - client = authority._get_client_ids_by_rule("client-industry") + client = authority._get_client_ids_by_rule("data-maintain") result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list}) return result @@ -658,9 +746,8 @@ if __name__ == "__main__": - try: - a = round(float("0002.90"), 2) - print(decimal.Decimal(a).quantize(decimal.Decimal("0.00"))) - # repair_ths_main_site(2) - except Exception as e: - print(str(e)) + listen_codes = gpcode_manager.get_listen_codes() + for lc in listen_codes: + if not gpcode_manager.is_in_gp_pool(lc): + # 绉婚櫎浠g爜 + l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�") -- Gitblit v1.8.0