From 96dc1a4cc38b588f39387b5a85b9677100e357f1 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 23 三月 2023 23:55:40 +0800 Subject: [PATCH] 首板买入打分机制 --- server.py | 278 +++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 207 insertions(+), 71 deletions(-) diff --git a/server.py b/server.py index 426bce3..5faf62e 100644 --- a/server.py +++ b/server.py @@ -12,6 +12,7 @@ import alert_util import client_manager +import code_nature_analyse import code_volumn_manager import constant import data_process @@ -21,7 +22,7 @@ import gpcode_manager import authority import juejin -from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log +from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager import l2_data_util from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil import l2.l2_data_util @@ -30,14 +31,15 @@ import ths_util import tool from third_data import hot_block_data_process -from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util +from ths import l2_listen_pos_health_manager +from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager import l2_code_operate from code_data_util import ZYLTGBUtil import l2.transaction_progress from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug -from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager +from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager class MyTCPServer(socketserver.TCPServer): @@ -61,7 +63,6 @@ ths_l2_trade_queue_manager = thsl2tradequeuemanager() latest_buy1_volumn_dict = {} - buy1_price_manager = Buy1PriceManager() l2_trade_queue_time_dict = {} l2_save_time_dict = {} l2_trade_buy_queue_dict = {} @@ -69,6 +70,7 @@ last_time = {} first_tick_datas = [] latest_oringin_data = {} + last_l2_listen_health_time = {} def setup(self): super().setup() # 鍙互涓嶈皟鐢ㄧ埗绫荤殑setup()鏂规硶锛岀埗绫荤殑setup鏂规硶浠�涔堥兘娌″仛 @@ -111,6 +113,15 @@ # level2鐩樺彛鏁版嵁 day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data( _str) + last_health_time = self.last_l2_listen_health_time.get((client, channel)) + # --------------------------------璁剧疆L2鍋ュ悍鐘舵��-------------------------------- + if last_health_time is None or __start_time - last_health_time > 1000: + self.last_l2_listen_health_time[(client, channel)] = __start_time + # 鏇存柊鐩戝惉浣嶅仴搴风姸鎬� + if origin_datas_count == 0: + l2_listen_pos_health_manager.set_unhealthy(client, channel) + else: + l2_listen_pos_health_manager.set_healthy(client, channel) l2_log.threadIds[code] = random.randint(0, 100000) if True: @@ -136,7 +147,7 @@ l2_data_util.save_l2_latest_data_number(code, origin_datas_count) # 淇濆瓨l2鏁版嵁鏉℃暟 if not origin_datas: - #or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)): + # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)): raise Exception("鏃犳柊澧炴暟鎹�") # 淇濆瓨鏈�杩戠殑鏁版嵁 self.latest_oringin_data[code] = origin_datas @@ -239,7 +250,7 @@ # limit_up_time_manager.save_limit_up_time(d["code"], d["time"]) elif type == 22: try: - if int(tool.get_now_time_str().replace(":", "")) < int("092600"): + if int(tool.get_now_time_str().replace(":", "")) < int("092500"): raise Exception('鏈埌鎺ュ彈鏃堕棿') # 棣栨澘浠g爜 dataList, is_add = data_process.parseGPCode(_str) @@ -251,46 +262,9 @@ for data in dataList: code = data["code"] codes.append(code) - limit_up_price = gpcode_manager.get_limit_up_price(code) - if limit_up_price is not None: - limit_up_price_dict[code] = limit_up_price - else: - temp_codes.append(code) - # data["price"] - tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"], - "volumeUnit": data["volumeUnit"]}) + # 淇濆瓨鏈瓫閫夌殑棣栨澘浠g爜 new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes) - for code in new_add_codes: - if (not l2_trade_util.is_in_forbidden_trade_codes( - code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3: - l2_trade_util.forbidden_trade(code) - - if new_add_codes: - gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes)) - # 鍔犲叆棣栨澘鍘嗗彶璁板綍 - gpcode_manager.FirstCodeManager.add_record(new_add_codes) - logger_first_code_record.info("鏂板棣栨澘锛歿}", new_add_codes) - # 鑾峰彇60澶╂渶澶ц褰� - for code in new_add_codes: - if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None: - volumes = juejin.get_volumn(code) - code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1]) - # 绉婚櫎浠g爜 - listen_codes = gpcode_manager.get_listen_codes() - for lc in listen_codes: - if not gpcode_manager.is_in_gp_pool(lc): - # 绉婚櫎浠g爜 - l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�") - - if temp_codes: - # 鑾峰彇娑ㄥ仠浠� - juejin.re_set_price_pres(temp_codes) - # 閲嶆柊鑾峰彇娑ㄥ仠浠� - for code in temp_codes: - limit_up_price = gpcode_manager.get_limit_up_price(code) - if limit_up_price is not None: - limit_up_price_dict[code] = limit_up_price # 淇濆瓨鑷敱娴侀�氳偂鏈� zyltgb_list = [] for data in dataList: @@ -303,7 +277,64 @@ ZYLTGBUtil.save_list(zyltgb_list) global_data_loader.load_zyltgb() - # 淇濆瓨鐜颁环 + bad_codes = set() + + if new_add_codes: + gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes)) + # 鍔犲叆棣栨澘鍘嗗彶璁板綍 + gpcode_manager.FirstCodeManager.add_record(new_add_codes) + logger_first_code_record.info("鏂板棣栨澘锛歿}", new_add_codes) + # 鑾峰彇60澶╂渶澶ц褰� + for code in codes: + if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None: + volumes_data = juejin.get_volumns_by_code(code, 150) + volumes = juejin.parse_max_volume(volumes_data[:60]) + logger_first_code_record.info("{} 鑾峰彇鍒伴鏉�60澶╂渶澶ч噺锛歿}", code, volumes) + code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1]) + # 鍒ゆ柇K绾垮舰鎬� + is_has_k_format, msg = code_nature_analyse.is_has_k_format( + gpcode_manager.get_limit_up_price(code), volumes_data) + if not is_has_k_format: + logger_first_code_record.info("{}棣栨澘K绾垮舰鎬佷笉濂�,{}", code, msg) + # 鑲℃�т笉濂斤紝灏变笉瑕佸姞鍏� + bad_codes.add(code) + # 鍔犲叆绂佹浜ゆ槗浠g爜 + l2_trade_util.forbidden_trade(code) + break + else: + code_nature_analyse.set_record_datas(code, + gpcode_manager.get_limit_up_price(code), + volumes_data) + + # 绉婚櫎浠g爜 + listen_codes = gpcode_manager.get_listen_codes() + for lc in listen_codes: + if not gpcode_manager.is_in_gp_pool(lc): + # 绉婚櫎浠g爜 + l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�") + + # 淇濆瓨鐜颁环 + if dataList: + for data in dataList: + code = data["code"] + codes.append(code) + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is not None: + limit_up_price_dict[code] = limit_up_price + else: + temp_codes.append(code) + tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"], + "volumeUnit": data["volumeUnit"]}) + # 鑾峰彇娑ㄥ仠浠� + if temp_codes: + # 鑾峰彇娑ㄥ仠浠� + juejin.re_set_price_pres(temp_codes) + # 閲嶆柊鑾峰彇娑ㄥ仠浠� + for code in temp_codes: + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is not None: + limit_up_price_dict[code] = limit_up_price + # 淇濆瓨鐜颁环 self.first_tick_datas.clear() self.first_tick_datas.extend(tick_datas) @@ -419,10 +450,13 @@ datas = data_process.parseData(_str) channel = datas["channel"] code = datas["code"] + msg = "" try: + if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code): # 娌″湪鐩爣浠g爜涓笖娌℃湁鍦ㄩ鏉夸粖鏃ュ巻鍙蹭唬鐮佷腑 raise Exception("浠g爜娌″湪鐩戝惉涓�") + data = datas["data"] buy_time = data["buyTime"] buy_one_price = data["buyOnePrice"] @@ -431,29 +465,38 @@ if buy_one_price is None: print('涔�1浠锋病鏈夛紝', code) limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is not None: + code_price_manager.Buy1PriceManager.process(code, buy_one_price, buy_time, limit_up_price) + _start_time = time.time() + msg += "涔�1浠锋牸澶勭悊锛�" + f"{_start_time - __start_time} " + buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time, buy_queue) + msg += "涔伴槦鍒椾繚瀛橈細" + f"{time.time() - _start_time} " + _start_time = time.time() + if buy_queue_result_list: - raise Exception("娴嬭瘯涓柇") + # 鏈夋暟鎹� try: buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( decimal.Decimal("0.00")) # 鑾峰彇鎵ц浣嶆椂闂� - exec_time = None - buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data( + + buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data( code) - if buy_exec_index: + if True: # 鍙湁涓嬪崟杩囧悗鎵嶈幏鍙栦氦鏄撹繘搴� + exec_time = None try: - exec_time = \ - l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][ - "time"] + if buy_exec_index: + exec_time = \ + l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][ + "time"] except: pass - buy_progress_index = self.tradeBuyQueue.compute_traded_index(code, buy_one_price_, buy_queue_result_list, @@ -468,8 +511,17 @@ logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{} 鏁版嵁-{}", code, buy_progress_index, json.dumps(buy_queue_result_list)) + # 璁$畻澶у崟鎴愪氦棰� + deal_big_money_manager.set_trade_progress(code, buy_progress_index, + l2.l2_data_util.local_today_datas.get( + code), + l2.l2_data_util.local_today_num_operate_map.get( + code)) + else: raise Exception("鏆傛湭鑾峰彇鍒颁氦鏄撹繘搴�") + msg += "璁$畻鎴愪氦杩涘害锛�" + f"{time.time() - _start_time} " + _start_time = time.time() except Exception as e: logging.exception(e) print("涔板叆闃熷垪", code, buy_queue_result_list) @@ -482,28 +534,31 @@ code): self.l2_trade_buy_queue_dict[code] = buy_queue logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue) + msg += "淇濆瓨璁板綍鏃ュ織锛�" + f"{time.time() - _start_time} " + _start_time = time.time() # 淇濆瓨鏈�杩戠殑璁板綍 if self.ths_l2_trade_queue_manager.save_recod(code, data): if buy_time != "00:00:00": logger_l2_trade_queue.info("{}-{}", code, data) - self.buy1_price_manager.save(code, buy_one_price) need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time, int(buy_one_volumn), buy_one_price) - # if need_cancel: - # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue") - if need_sync: - # 鍚屾鏁版嵁 - L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) + # if need_sync: + # # 鍚屾鏁版嵁 + # s = time.time() + # L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time) + # msg += "閲忔牎楠岋細"+f"{time.time()-s} " # print(buy_time, buy_one_price, buy_one_volumn) # print("L2涔板崠闃熷垪",datas) + msg += "涔�1澶勭悊锛�" + f"{time.time() - _start_time} " + _start_time = time.time() except: pass finally: space = time.time() - __start_time if space > 0.1: - logger_debug.info("{}鎴愪氦闃熷垪澶勭悊鏃堕棿锛歿}", code, space) + logger_debug.info("{}鎴愪氦闃熷垪澶勭悊鏃堕棿锛歿},{}", code, space, msg) elif type == 20: # 鐧诲綍 @@ -517,9 +572,11 @@ # 鐜颁环鏇存柊 elif type == 40: datas = data_process.parse(_str)["data"] + if datas is None: + datas = [] print("浜屾澘鐜颁环") # 鑾峰彇鏆傚瓨鐨勪簩鐗堢幇浠锋暟鎹� - if datas and self.first_tick_datas: + if self.first_tick_datas: datas.extend(self.first_tick_datas) if datas is not None: print("浜屾澘鐜颁环鏁伴噺", len(datas)) @@ -546,8 +603,6 @@ # 璁板綍鏁版嵁 logger_buy_1_volumn_record.info("{}-{}", code, data) self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price) - # 淇濆瓨涔�1浠锋牸 - self.buy1_price_manager.save(code, price) # 鏍℃鏃堕棿 time_ = tool.compute_buy1_real_time(time_) # 淇濆瓨鏁版嵁 @@ -607,10 +662,51 @@ return_str = json.dumps(return_json) elif type == 70: # 閫夎偂瀹濈儹闂ㄦ蹇� - datas = data_process.parse(_str)["data"] + data_json = data_process.parse(_str) + day = data_json["day"] + datas = data_json["data"] if datas: - hot_block_data_process.save_datas(datas) + hot_block_data_process.save_datas(day, datas) print(datas) + elif type == 71: + # 鏍规嵁浠g爜鑾峰彇閫夎偂瀹濈儹闂ㄦ蹇� + day = tool.get_now_date_str() + code = data_process.parse(_str)["data"]["code"] + todays = hot_block_data_process.XGBHotBlockDataManager.list_by_code(code, day) + today_datas = [] + if todays: + for data in todays: + block = data[2] + block_datas = hot_block_data_process.XGBHotBlockDataManager.list_by_block(block, day) + block_datas = list(block_datas) + # 鏍规嵁娑ㄥ仠鏃堕棿鎺掑簭 + block_datas.sort(key=lambda d: (d[4] if len(d[4]) > 6 else '15:00:00')) + for i in range(len(block_datas)): + if block_datas[i][3] == code: + today_datas.append( + {"block_name": block, "block_size": len(block_datas), "index": i, + "price": block_datas[i][5], "rate": block_datas[i][6]}) + break + # 鑾峰彇鍓嶄竴涓氦鏄撴棩 + last_day = juejin.JueJinManager.get_previous_trading_date(day) + lasts = hot_block_data_process.XGBHotBlockDataManager.list_by_code(code, last_day) + last_datas = [] + if todays: + for data in lasts: + block = data[2] + block_datas = hot_block_data_process.XGBHotBlockDataManager.list_by_block(block, last_day) + block_datas = list(block_datas) + # 鏍规嵁娑ㄥ仠鏃堕棿鎺掑簭 + block_datas.sort(key=lambda d: (d[4] if len(d[4]) > 6 else '15:00:00')) + for i in range(len(block_datas)): + if block_datas[i][3] == code: + last_datas.append( + {"block_name": block, "block_size": len(block_datas), "index": i, + "price": block_datas[i][5], "rate": block_datas[i][6]}) + break + final_data = {'code': code, 'today': today_datas, 'last_day': last_datas} + return_str = json.dumps({"code": 0, "data": final_data}) + pass elif type == 201: # 鍔犲叆榛戝悕鍗� data = json.loads(_str) @@ -621,7 +717,7 @@ if not name: results = juejin.JueJinManager.get_gp_codes_names([code]) if results: - gpcode_manager.CodesNameManager.add_first_code_name(code,results[code]) + gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) return_str = json.dumps({"code": 0}) elif type == 202: @@ -661,6 +757,30 @@ elif type == 302: # 榛戝悕鍗曞垪琛� codes = l2_trade_util.WhiteListCodeManager.list_codes() + datas = [] + for code in codes: + name = gpcode_manager.get_code_name(code) + datas.append(f"{name}:{code}") + return_str = json.dumps({"code": 0, "data": datas}) + elif type == 401: + # 鍔犲叆鎯宠涔� + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + gpcode_manager.WantBuyCodesManager.add_code(code) + name = gpcode_manager.get_code_name(code) + if not name: + results = juejin.JueJinManager.get_gp_codes_names([code]) + if results: + gpcode_manager.CodesNameManager.add_first_code_name(code, results[code]) + elif type == 402: + data = json.loads(_str) + codes = data["data"]["codes"] + for code in codes: + gpcode_manager.WantBuyCodesManager.remove_code(code) + return_str = json.dumps({"code": 0}) + elif type == 403: + codes = gpcode_manager.WantBuyCodesManager.list_code() datas = [] for code in codes: name = gpcode_manager.get_code_name(code) @@ -743,11 +863,27 @@ result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) + else: + # 娴嬮�熸垚鍔� + client_infos = [] + for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE): + client_infos.append((client, index)) + l2_listen_pos_health_manager.init_all(client_infos) if __name__ == "__main__": - listen_codes = gpcode_manager.get_listen_codes() - for lc in listen_codes: - if not gpcode_manager.is_in_gp_pool(lc): - # 绉婚櫎浠g爜 - l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�") + codes = ["601698"] + for code in codes: + volumes_data = juejin.get_volumns_by_code(code, 150) + volumes_data = volumes_data[1:] + global_data_loader.load_zyltgb() + limit_up_price = float(gpcode_manager.get_limit_up_price(code)) + # 鍒ゆ柇鑲℃�� + # is_k_format, msg = code_nature_analyse.is_has_k_format(float(limit_up_price), volumes_data) + # print(code, is_k_format, msg) + + code_nature_analyse.set_record_datas(code, + limit_up_price, + volumes_data) + + print(code_nature_analyse.get_k_format(float(limit_up_price), volumes_data)) -- Gitblit v1.8.0