From 4d443587c7f6539f08eca5d7b1f83dbd13cc8873 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 24 七月 2023 10:53:14 +0800 Subject: [PATCH] 选票板块即将更改 --- third_data/kpl_block_util.py | 134 +++++++++++++++++++++----------------------- 1 files changed, 65 insertions(+), 69 deletions(-) diff --git a/third_data/kpl_block_util.py b/third_data/kpl_block_util.py index 2c75ef0..0938eaf 100644 --- a/third_data/kpl_block_util.py +++ b/third_data/kpl_block_util.py @@ -11,51 +11,6 @@ from utils import tool -# latest_2_day_limit_up_datas:鏈�杩�2澶╂定鍋滄暟鎹� -def is_strong_block(block, current_limit_up_datas, latest_2_day_limit_up_datas): - # 鏄惁寮�1锛屼笖灏氭湭鐐告澘 - if current_limit_up_datas: - # 鑾峰彇浠婃棩9锛�30鐨勬椂闂存埑 - time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00" - timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S')) - for k in current_limit_up_datas: - if k[5] == block: - if int(k[2]) < timestamp: - return True, "寮�涓�" - elif k[0].find("30") == 0 or k[0].find("68") == 0: - return True, "20cm娑ㄥ仠" - - # 鏉垮潡鏈変簩鏉夸笖锛�3澶╁唴鏉垮潡鍐呮棤楂樹簬浜屾澘鐨勭エ锛� - has_continue_limit_up = False - for k in current_limit_up_datas: - if k[5] == block: - if k[4] == "2杩炴澘": - has_continue_limit_up = True - break - total_limit_up_datas = list(latest_2_day_limit_up_datas) - total_limit_up_datas.extend(current_limit_up_datas) - for k in total_limit_up_datas: - if k[5] == block: - if k[4].find("杩炴澘") > 0 and int(k[4][:1]) > 2: - has_continue_limit_up = False - break - - if has_continue_limit_up: - return True, "鏉垮潡鏈変簩鏉夸笖3澶╁唴鏉垮潡鍐呮棤楂樹簬浜屾澘" - - has_2 = False - has_3 = False - for k in current_limit_up_datas: - if k[5] == block: - if k[4].find("杩炴澘") > -1 and int(k[4][:1]) > 2: - has_3 = True - elif k[4].find("杩炴澘") > -1 and int(k[4][:1]) == 2: - has_2 = True - if not has_2 and has_3: - return True, f"鏉垮潡涓棤2鏉匡紝浣嗘湁3鏉垮強浠ヤ笂" - return False, "" - - # 鏄惁涓绘澘寮�1 # limit_up_record_datas 浠婃棩鍘嗗彶娑ㄥ仠 def is_shsz_open_limit_up(code, block, limit_up_record_datas, code_block_dict): @@ -83,7 +38,56 @@ # 鏄惁鏄墠鍑犵殑鏉垮潡 -def is_top_block(code, block, limit_up_record_datas, topn): +# 鏉垮潡涓湁涓绘澘娑ㄥ仠鐨勬墠鍙備笌鎺掑簭锛堟帓搴忔椂闂存寜鐓ф澘鍧椾腑鐨勬定鍋滄椂闂存潵鎺掑簭锛� + +def __is_top_block(block, block_codes_infos, topn): + block_limit_up_dict = {} + for b in block_codes_infos: + if b[1] not in block_limit_up_dict: + block_limit_up_dict[b[1]] = [] + block_limit_up_dict[b[1]].append(b) + # 鍓旈櫎鍙湁闈炰富鏉挎定鍋滅殑鏉垮潡 + invalid_blocks = [] + for k in block_limit_up_dict: + has_shsz = False + for b in block_limit_up_dict[k]: + if b[0].find('00') == 0 or b[0].find('60') == 0: + has_shsz = True + break + if not has_shsz: + invalid_blocks.append(k) + for k in invalid_blocks: + block_limit_up_dict.pop(k) + + # 姣忎釜鏉垮潡娑ㄥ仠鏃堕棿鎺掑簭 + invalid_blocks = [] + for k in block_limit_up_dict: + # 鍒犻櫎瀹芥硾姒傚康 + if k in constant.KPL_INVALID_BLOCKS: + invalid_blocks.append(k) + continue + block_limit_up_dict[k].sort(key=lambda x: x[2]) + + for k in invalid_blocks: + block_limit_up_dict.pop(k) + + block_codes_infos = [block_limit_up_dict[k][0] for k in block_limit_up_dict] + block_codes_infos.sort(key=lambda x: x[2]) + # 鍘婚櫎閫氱敤娑ㄥ仠鍘熷洜 + index = 0 + for d in block_codes_infos: + if d[1] == block: + if index + 1 <= topn: + return True, block_codes_infos[:topn] + else: + return False, block_codes_infos[:topn] + index += 1 + if index <= topn: + return True, block_codes_infos[:topn] + return False, block_codes_infos[:topn] + + +def is_record_top_block(code, block, limit_up_record_datas, topn): block_codes_infos = [] limit_up_time = time.time() for k in limit_up_record_datas: @@ -91,31 +95,23 @@ block_codes_infos.append((k[3], k[2], int(k[5]))) else: limit_up_time = int(k[5]) + block_codes_infos.append((code, block, limit_up_time)) + # 鎺掑簭 + return __is_top_block(block, block_codes_infos, topn) + + +def is_current_top_block(code, block, current_limit_up_datas, topn): + block_codes_infos = [] + limit_up_time = time.time() + for k in current_limit_up_datas: + if k[0] != code: + block_codes_infos.append((k[0], k[5], int(k[2]))) + else: + limit_up_time = int(k[2]) # 鎺掑簭 block_codes_infos.append((code, block, limit_up_time)) - block_first_limit_up_dict = {} - for b in block_codes_infos: - if b[1] not in block_first_limit_up_dict: - block_first_limit_up_dict[b[1]] = b - else: - if b[2] < block_first_limit_up_dict[b[1]][2]: - block_first_limit_up_dict[b[1]] = b - block_codes_infos = [block_first_limit_up_dict[k] for k in block_first_limit_up_dict] - block_codes_infos.sort(key=lambda x: x[2]) - # 鍘婚櫎閫氱敤娑ㄥ仠鍘熷洜 - index = 0 - for d in block_codes_infos: - if d[1] in constant.KPL_INVALID_BLOCKS: - continue - if d[1] == block: - if index + 1 <= topn: - return True - else: - return False - index += 1 - if index <= topn: - return True - return False + # 鎺掑簭 + return __is_top_block(block, block_codes_infos, topn) # 鑾峰彇褰撴棩鍘嗗彶韬綅 -- Gitblit v1.8.0