From 96dc1a4cc38b588f39387b5a85b9677100e357f1 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期四, 23 三月 2023 23:55:40 +0800
Subject: [PATCH] 首板买入打分机制

---
 server.py |  278 +++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 207 insertions(+), 71 deletions(-)

diff --git a/server.py b/server.py
index 426bce3..5faf62e 100644
--- a/server.py
+++ b/server.py
@@ -12,6 +12,7 @@
 
 import alert_util
 import client_manager
+import code_nature_analyse
 import code_volumn_manager
 import constant
 import data_process
@@ -21,7 +22,7 @@
 import gpcode_manager
 import authority
 import juejin
-from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log
+from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
 import l2_data_util
 from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
 import l2.l2_data_util
@@ -30,14 +31,15 @@
 import ths_util
 import tool
 from third_data import hot_block_data_process
-from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util
+from ths import l2_listen_pos_health_manager
+from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager
 import l2_code_operate
 from code_data_util import ZYLTGBUtil
 import l2.transaction_progress
 
 from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
     logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_first_code_record, logger_debug
-from trade.trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
+from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
 
 
 class MyTCPServer(socketserver.TCPServer):
@@ -61,7 +63,6 @@
     ths_l2_trade_queue_manager = thsl2tradequeuemanager()
 
     latest_buy1_volumn_dict = {}
-    buy1_price_manager = Buy1PriceManager()
     l2_trade_queue_time_dict = {}
     l2_save_time_dict = {}
     l2_trade_buy_queue_dict = {}
@@ -69,6 +70,7 @@
     last_time = {}
     first_tick_datas = []
     latest_oringin_data = {}
+    last_l2_listen_health_time = {}
 
     def setup(self):
         super().setup()  # 鍙互涓嶈皟鐢ㄧ埗绫荤殑setup()鏂规硶锛岀埗绫荤殑setup鏂规硶浠�涔堥兘娌″仛
@@ -111,6 +113,15 @@
                         # level2鐩樺彛鏁版嵁
                         day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                             _str)
+                        last_health_time = self.last_l2_listen_health_time.get((client, channel))
+                        # --------------------------------璁剧疆L2鍋ュ悍鐘舵��--------------------------------
+                        if last_health_time is None or __start_time - last_health_time > 1000:
+                            self.last_l2_listen_health_time[(client, channel)] = __start_time
+                            # 鏇存柊鐩戝惉浣嶅仴搴风姸鎬�
+                            if origin_datas_count == 0:
+                                l2_listen_pos_health_manager.set_unhealthy(client, channel)
+                            else:
+                                l2_listen_pos_health_manager.set_healthy(client, channel)
 
                         l2_log.threadIds[code] = random.randint(0, 100000)
                         if True:
@@ -136,7 +147,7 @@
                                 l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                 # 淇濆瓨l2鏁版嵁鏉℃暟
                                 if not origin_datas:
-                                    #or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
+                                    # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                     raise Exception("鏃犳柊澧炴暟鎹�")
                                 # 淇濆瓨鏈�杩戠殑鏁版嵁
                                 self.latest_oringin_data[code] = origin_datas
@@ -239,7 +250,7 @@
                         #     limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                 elif type == 22:
                     try:
-                        if int(tool.get_now_time_str().replace(":", "")) < int("092600"):
+                        if int(tool.get_now_time_str().replace(":", "")) < int("092500"):
                             raise Exception('鏈埌鎺ュ彈鏃堕棿')
                         # 棣栨澘浠g爜
                         dataList, is_add = data_process.parseGPCode(_str)
@@ -251,46 +262,9 @@
                             for data in dataList:
                                 code = data["code"]
                                 codes.append(code)
-                                limit_up_price = gpcode_manager.get_limit_up_price(code)
-                                if limit_up_price is not None:
-                                    limit_up_price_dict[code] = limit_up_price
-                                else:
-                                    temp_codes.append(code)
-                                # data["price"]
-                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
-                                                   "volumeUnit": data["volumeUnit"]})
+
                         # 淇濆瓨鏈瓫閫夌殑棣栨澘浠g爜
                         new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes)
-                        for code in new_add_codes:
-                            if (not l2_trade_util.is_in_forbidden_trade_codes(
-                                    code)) and juejin.JueJinManager.get_lowest_price_rate(code, 15) >= 0.3:
-                                l2_trade_util.forbidden_trade(code)
-
-                        if new_add_codes:
-                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
-                            # 鍔犲叆棣栨澘鍘嗗彶璁板綍
-                            gpcode_manager.FirstCodeManager.add_record(new_add_codes)
-                            logger_first_code_record.info("鏂板棣栨澘锛歿}", new_add_codes)
-                            # 鑾峰彇60澶╂渶澶ц褰�
-                            for code in new_add_codes:
-                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
-                                    volumes = juejin.get_volumn(code)
-                                    code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1])
-                            # 绉婚櫎浠g爜
-                            listen_codes = gpcode_manager.get_listen_codes()
-                            for lc in listen_codes:
-                                if not gpcode_manager.is_in_gp_pool(lc):
-                                    # 绉婚櫎浠g爜
-                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�")
-
-                        if temp_codes:
-                            # 鑾峰彇娑ㄥ仠浠�
-                            juejin.re_set_price_pres(temp_codes)
-                            # 閲嶆柊鑾峰彇娑ㄥ仠浠�
-                            for code in temp_codes:
-                                limit_up_price = gpcode_manager.get_limit_up_price(code)
-                                if limit_up_price is not None:
-                                    limit_up_price_dict[code] = limit_up_price
                         # 淇濆瓨鑷敱娴侀�氳偂鏈�
                         zyltgb_list = []
                         for data in dataList:
@@ -303,7 +277,64 @@
                             ZYLTGBUtil.save_list(zyltgb_list)
                             global_data_loader.load_zyltgb()
 
-                            # 淇濆瓨鐜颁环
+                        bad_codes = set()
+
+                        if new_add_codes:
+                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes))
+                            # 鍔犲叆棣栨澘鍘嗗彶璁板綍
+                            gpcode_manager.FirstCodeManager.add_record(new_add_codes)
+                            logger_first_code_record.info("鏂板棣栨澘锛歿}", new_add_codes)
+                            # 鑾峰彇60澶╂渶澶ц褰�
+                            for code in codes:
+                                if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None:
+                                    volumes_data = juejin.get_volumns_by_code(code, 150)
+                                    volumes = juejin.parse_max_volume(volumes_data[:60])
+                                    logger_first_code_record.info("{} 鑾峰彇鍒伴鏉�60澶╂渶澶ч噺锛歿}", code, volumes)
+                                    code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1])
+                                    # 鍒ゆ柇K绾垮舰鎬�
+                                    is_has_k_format, msg = code_nature_analyse.is_has_k_format(
+                                        gpcode_manager.get_limit_up_price(code), volumes_data)
+                                    if not is_has_k_format:
+                                        logger_first_code_record.info("{}棣栨澘K绾垮舰鎬佷笉濂�,{}", code, msg)
+                                        # 鑲℃�т笉濂斤紝灏变笉瑕佸姞鍏�
+                                        bad_codes.add(code)
+                                        # 鍔犲叆绂佹浜ゆ槗浠g爜
+                                        l2_trade_util.forbidden_trade(code)
+                                        break
+                                    else:
+                                        code_nature_analyse.set_record_datas(code,
+                                                                             gpcode_manager.get_limit_up_price(code),
+                                                                             volumes_data)
+
+                            # 绉婚櫎浠g爜
+                            listen_codes = gpcode_manager.get_listen_codes()
+                            for lc in listen_codes:
+                                if not gpcode_manager.is_in_gp_pool(lc):
+                                    # 绉婚櫎浠g爜
+                                    l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�")
+
+                        # 淇濆瓨鐜颁环
+                        if dataList:
+                            for data in dataList:
+                                code = data["code"]
+                                codes.append(code)
+                                limit_up_price = gpcode_manager.get_limit_up_price(code)
+                                if limit_up_price is not None:
+                                    limit_up_price_dict[code] = limit_up_price
+                                else:
+                                    temp_codes.append(code)
+                                tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"],
+                                                   "volumeUnit": data["volumeUnit"]})
+                        # 鑾峰彇娑ㄥ仠浠�
+                        if temp_codes:
+                            # 鑾峰彇娑ㄥ仠浠�
+                            juejin.re_set_price_pres(temp_codes)
+                            # 閲嶆柊鑾峰彇娑ㄥ仠浠�
+                            for code in temp_codes:
+                                limit_up_price = gpcode_manager.get_limit_up_price(code)
+                                if limit_up_price is not None:
+                                    limit_up_price_dict[code] = limit_up_price
+                        # 淇濆瓨鐜颁环
                         self.first_tick_datas.clear()
                         self.first_tick_datas.extend(tick_datas)
 
@@ -419,10 +450,13 @@
                     datas = data_process.parseData(_str)
                     channel = datas["channel"]
                     code = datas["code"]
+                    msg = ""
                     try:
+
                         if not gpcode_manager.is_in_gp_pool(code) and not gpcode_manager.is_in_first_gp_codes(code):
                             # 娌″湪鐩爣浠g爜涓笖娌℃湁鍦ㄩ鏉夸粖鏃ュ巻鍙蹭唬鐮佷腑
                             raise Exception("浠g爜娌″湪鐩戝惉涓�")
+
                         data = datas["data"]
                         buy_time = data["buyTime"]
                         buy_one_price = data["buyOnePrice"]
@@ -431,29 +465,38 @@
                         if buy_one_price is None:
                             print('涔�1浠锋病鏈夛紝', code)
                         limit_up_price = gpcode_manager.get_limit_up_price(code)
+
                         if limit_up_price is not None:
+                            code_price_manager.Buy1PriceManager.process(code, buy_one_price, buy_time, limit_up_price)
+                            _start_time = time.time()
+                            msg += "涔�1浠锋牸澶勭悊锛�" + f"{_start_time - __start_time} "
+
                             buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price,
                                                                             buy_time,
                                                                             buy_queue)
+                            msg += "涔伴槦鍒椾繚瀛橈細" + f"{time.time() - _start_time} "
+                            _start_time = time.time()
+
                             if buy_queue_result_list:
-                                raise  Exception("娴嬭瘯涓柇")
+
                                 # 鏈夋暟鎹�
                                 try:
                                     buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                         decimal.Decimal("0.00"))
                                     # 鑾峰彇鎵ц浣嶆椂闂�
-                                    exec_time = None
-                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
+
+                                    buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data(
                                         code)
-                                    if buy_exec_index:
+                                    if True:
                                         # 鍙湁涓嬪崟杩囧悗鎵嶈幏鍙栦氦鏄撹繘搴�
+                                        exec_time = None
                                         try:
-                                            exec_time = \
-                                                l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
-                                                    "time"]
+                                            if buy_exec_index:
+                                                exec_time = \
+                                                    l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"][
+                                                        "time"]
                                         except:
                                             pass
-
                                         buy_progress_index = self.tradeBuyQueue.compute_traded_index(code,
                                                                                                      buy_one_price_,
                                                                                                      buy_queue_result_list,
@@ -468,8 +511,17 @@
                                             logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}  鏁版嵁-{}", code,
                                                                            buy_progress_index,
                                                                            json.dumps(buy_queue_result_list))
+                                            # 璁$畻澶у崟鎴愪氦棰�
+                                            deal_big_money_manager.set_trade_progress(code, buy_progress_index,
+                                                                                      l2.l2_data_util.local_today_datas.get(
+                                                                                          code),
+                                                                                      l2.l2_data_util.local_today_num_operate_map.get(
+                                                                                          code))
+
                                         else:
                                             raise Exception("鏆傛湭鑾峰彇鍒颁氦鏄撹繘搴�")
+                                        msg += "璁$畻鎴愪氦杩涘害锛�" + f"{time.time() - _start_time} "
+                                        _start_time = time.time()
                                 except Exception as e:
                                     logging.exception(e)
                                     print("涔板叆闃熷垪", code, buy_queue_result_list)
@@ -482,28 +534,31 @@
                             code):
                             self.l2_trade_buy_queue_dict[code] = buy_queue
                             logger_l2_trade_buy_queue.info("{}-{}", code, buy_queue)
+                            msg += "淇濆瓨璁板綍鏃ュ織锛�" + f"{time.time() - _start_time} "
+                            _start_time = time.time()
                         # 淇濆瓨鏈�杩戠殑璁板綍
                         if self.ths_l2_trade_queue_manager.save_recod(code, data):
                             if buy_time != "00:00:00":
                                 logger_l2_trade_queue.info("{}-{}", code, data)
-                                self.buy1_price_manager.save(code, buy_one_price)
                                 need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                                    int(buy_one_volumn),
                                                                                                    buy_one_price)
-                                # if need_cancel:
-                                #    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
-                                if need_sync:
-                                    # 鍚屾鏁版嵁
-                                    L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
+                                # if need_sync:
+                                #     # 鍚屾鏁版嵁
+                                #     s =  time.time()
+                                #     L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
+                                #     msg += "閲忔牎楠岋細"+f"{time.time()-s} "
                         # print(buy_time, buy_one_price, buy_one_volumn)
 
                         # print("L2涔板崠闃熷垪",datas)
+                        msg += "涔�1澶勭悊锛�" + f"{time.time() - _start_time} "
+                        _start_time = time.time()
                     except:
                         pass
                     finally:
                         space = time.time() - __start_time
                         if space > 0.1:
-                            logger_debug.info("{}鎴愪氦闃熷垪澶勭悊鏃堕棿锛歿}", code, space)
+                            logger_debug.info("{}鎴愪氦闃熷垪澶勭悊鏃堕棿锛歿},{}", code, space, msg)
 
                 elif type == 20:
                     # 鐧诲綍
@@ -517,9 +572,11 @@
                 # 鐜颁环鏇存柊
                 elif type == 40:
                     datas = data_process.parse(_str)["data"]
+                    if datas is None:
+                        datas = []
                     print("浜屾澘鐜颁环")
                     # 鑾峰彇鏆傚瓨鐨勪簩鐗堢幇浠锋暟鎹�
-                    if datas and self.first_tick_datas:
+                    if self.first_tick_datas:
                         datas.extend(self.first_tick_datas)
                     if datas is not None:
                         print("浜屾澘鐜颁环鏁伴噺", len(datas))
@@ -546,8 +603,6 @@
                                 # 璁板綍鏁版嵁
                                 logger_buy_1_volumn_record.info("{}-{}", code, data)
                             self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price)
-                            # 淇濆瓨涔�1浠锋牸
-                            self.buy1_price_manager.save(code, price)
                             # 鏍℃鏃堕棿
                             time_ = tool.compute_buy1_real_time(time_)
                             # 淇濆瓨鏁版嵁
@@ -607,10 +662,51 @@
                         return_str = json.dumps(return_json)
                 elif type == 70:
                     # 閫夎偂瀹濈儹闂ㄦ蹇�
-                    datas = data_process.parse(_str)["data"]
+                    data_json = data_process.parse(_str)
+                    day = data_json["day"]
+                    datas = data_json["data"]
                     if datas:
-                        hot_block_data_process.save_datas(datas)
+                        hot_block_data_process.save_datas(day, datas)
                     print(datas)
+                elif type == 71:
+                    # 鏍规嵁浠g爜鑾峰彇閫夎偂瀹濈儹闂ㄦ蹇�
+                    day = tool.get_now_date_str()
+                    code = data_process.parse(_str)["data"]["code"]
+                    todays = hot_block_data_process.XGBHotBlockDataManager.list_by_code(code, day)
+                    today_datas = []
+                    if todays:
+                        for data in todays:
+                            block = data[2]
+                            block_datas = hot_block_data_process.XGBHotBlockDataManager.list_by_block(block, day)
+                            block_datas = list(block_datas)
+                            # 鏍规嵁娑ㄥ仠鏃堕棿鎺掑簭
+                            block_datas.sort(key=lambda d: (d[4] if len(d[4]) > 6 else '15:00:00'))
+                            for i in range(len(block_datas)):
+                                if block_datas[i][3] == code:
+                                    today_datas.append(
+                                        {"block_name": block, "block_size": len(block_datas), "index": i,
+                                         "price": block_datas[i][5], "rate": block_datas[i][6]})
+                                    break
+                    # 鑾峰彇鍓嶄竴涓氦鏄撴棩
+                    last_day = juejin.JueJinManager.get_previous_trading_date(day)
+                    lasts = hot_block_data_process.XGBHotBlockDataManager.list_by_code(code, last_day)
+                    last_datas = []
+                    if todays:
+                        for data in lasts:
+                            block = data[2]
+                            block_datas = hot_block_data_process.XGBHotBlockDataManager.list_by_block(block, last_day)
+                            block_datas = list(block_datas)
+                            # 鏍规嵁娑ㄥ仠鏃堕棿鎺掑簭
+                            block_datas.sort(key=lambda d: (d[4] if len(d[4]) > 6 else '15:00:00'))
+                            for i in range(len(block_datas)):
+                                if block_datas[i][3] == code:
+                                    last_datas.append(
+                                        {"block_name": block, "block_size": len(block_datas), "index": i,
+                                         "price": block_datas[i][5], "rate": block_datas[i][6]})
+                                    break
+                    final_data = {'code': code, 'today': today_datas, 'last_day': last_datas}
+                    return_str = json.dumps({"code": 0, "data": final_data})
+                    pass
                 elif type == 201:
                     # 鍔犲叆榛戝悕鍗�
                     data = json.loads(_str)
@@ -621,7 +717,7 @@
                         if not name:
                             results = juejin.JueJinManager.get_gp_codes_names([code])
                             if results:
-                                gpcode_manager.CodesNameManager.add_first_code_name(code,results[code])
+                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
 
                     return_str = json.dumps({"code": 0})
                 elif type == 202:
@@ -661,6 +757,30 @@
                 elif type == 302:
                     # 榛戝悕鍗曞垪琛�
                     codes = l2_trade_util.WhiteListCodeManager.list_codes()
+                    datas = []
+                    for code in codes:
+                        name = gpcode_manager.get_code_name(code)
+                        datas.append(f"{name}:{code}")
+                    return_str = json.dumps({"code": 0, "data": datas})
+                elif type == 401:
+                    # 鍔犲叆鎯宠涔�
+                    data = json.loads(_str)
+                    codes = data["data"]["codes"]
+                    for code in codes:
+                        gpcode_manager.WantBuyCodesManager.add_code(code)
+                        name = gpcode_manager.get_code_name(code)
+                        if not name:
+                            results = juejin.JueJinManager.get_gp_codes_names([code])
+                            if results:
+                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
+                elif type == 402:
+                    data = json.loads(_str)
+                    codes = data["data"]["codes"]
+                    for code in codes:
+                        gpcode_manager.WantBuyCodesManager.remove_code(code)
+                    return_str = json.dumps({"code": 0})
+                elif type == 403:
+                    codes = gpcode_manager.WantBuyCodesManager.list_code()
                     datas = []
                     for code in codes:
                         name = gpcode_manager.get_code_name(code)
@@ -743,11 +863,27 @@
     result = json.loads(result)
     if result["code"] != 0:
         raise Exception(result["msg"])
+    else:
+        # 娴嬮�熸垚鍔�
+        client_infos = []
+        for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
+            client_infos.append((client, index))
+        l2_listen_pos_health_manager.init_all(client_infos)
 
 
 if __name__ == "__main__":
-    listen_codes = gpcode_manager.get_listen_codes()
-    for lc in listen_codes:
-        if not gpcode_manager.is_in_gp_pool(lc):
-            # 绉婚櫎浠g爜
-            l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "浠g爜琚Щ闄�")
+    codes = ["601698"]
+    for code in codes:
+        volumes_data = juejin.get_volumns_by_code(code, 150)
+        volumes_data = volumes_data[1:]
+        global_data_loader.load_zyltgb()
+        limit_up_price = float(gpcode_manager.get_limit_up_price(code))
+        # 鍒ゆ柇鑲℃��
+        # is_k_format, msg = code_nature_analyse.is_has_k_format(float(limit_up_price), volumes_data)
+        # print(code, is_k_format, msg)
+
+        code_nature_analyse.set_record_datas(code,
+                                             limit_up_price,
+                                             volumes_data)
+
+        print(code_nature_analyse.get_k_format(float(limit_up_price), volumes_data))

--
Gitblit v1.8.0