From 6df8d9ac75a041377c01c80e6e970e5c75ce7662 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 06 六月 2025 17:33:41 +0800
Subject: [PATCH] 初始化导入

---
 strategy/strategy_variable_factory.py |  412 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 406 insertions(+), 6 deletions(-)

diff --git a/strategy/strategy_variable_factory.py b/strategy/strategy_variable_factory.py
index 2f366ff..a8a0643 100644
--- a/strategy/strategy_variable_factory.py
+++ b/strategy/strategy_variable_factory.py
@@ -4,11 +4,14 @@
 import datetime
 import json
 import os
+import re
 
+import constant
 from code_attribute import global_data_loader
 from db import mysql_data_delegate
 from strategy.data_analyzer import KTickLineAnalyzer, KPLLimitUpDataAnalyzer, K60SLineAnalyzer
 from strategy.strategy_variable import StockVariables
+from third_data import kpl_api, kpl_util
 from third_data.history_k_data_manager import HistoryKDataManager
 from third_data.history_k_data_util import JueJinLocalApi, HistoryKDatasUtils
 from utils import global_util, tool
@@ -29,11 +32,14 @@
         self.jueJinLocalApi = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf",
                                              "018db265fa34e241dd6198b7ca507ee0a82ad029")
         self.trade_days = self.load_trade_days()
+        self.plate_codes = {}
+        # 浠g爜鐨勭簿閫夋澘鍧� 锛� {"浠g爜":{鏉垮潡}}
+        self.jx_blocks = {}
 
     def load_kline_data(self):
         """
         鍔犺浇鏃绾挎暟鎹�
-        :return: 鏃绾挎暟鎹�
+        :return: {"浠g爜": 鏃绾挎暟鎹畗
         """
         dir_path = os.path.join(self.cache_path, "k_bars")
         day = self.trade_days[0]
@@ -86,7 +92,7 @@
             k_bar_code_data_dict[code] = date_datas
         return k_bar_code_data_dict
 
-    def load_tick_data(self):
+    def load_tick_data(self, target_codes=None):
         """
         鍔犺浇褰撴棩鐨則ick鏁版嵁
         :return:tick鏁版嵁瀛楀吀
@@ -99,6 +105,8 @@
                 if f.find(self.now_day) < 0:
                     continue
                 code = f.split("_")[1][:6]
+                if target_codes and code not in target_codes:
+                    continue
                 tick_path = os.path.join(tick_dir_path, f)
                 with open(tick_path, mode='r') as f:
                     lines = f.readlines()
@@ -124,12 +132,115 @@
     def load_limit_up_data(self):
         """
         鍔犺浇娑ㄥ仠鏁版嵁
-        :return: 娑ㄥ仠鏁版嵁璁板綍
+        :return: 娑ㄥ仠鏁版嵁璁板綍[(浠g爜, 鏃ユ湡, 鏉垮潡, 鏄惁鐐告澘)]
         """
         mysql = mysql_data_delegate.Mysqldb()
         results = mysql.select_all(
-            f"select _code, _day, _hot_block_name from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day<='{self.trade_days[0]}' and _open=0")
+            f"select _code, _day, _hot_block_name, _open, _blocks  from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day <='{self.trade_days[0]}'")
+        for r in results:
+            r[2] = kpl_util.filter_block(r[2])
         return results
+
+    def __compute_limit_up_reasons_for_refer(self, block_infos):
+        """
+        璁$畻鍙傝�冪エ鐨勬定鍋滃師鍥�
+        @param block_infos: [(鏉垮潡, 鏃ユ湡)]
+        @return:
+        """
+        # [(鏉垮潡, 鏃ユ湡)]
+        block_infos.sort(key=lambda x: x[1], reverse=True)
+        # {"鏉垮潡":[(鍑虹幇娆℃暟, 鏈�杩戝嚭鐜版棩鏈�)]}
+        temp_dict = {}
+        for b in block_infos:
+            if b[0] in constant.KPL_INVALID_BLOCKS:
+                continue
+            if b[0] not in temp_dict:
+                temp_dict[b[0]] = [0, b[1]]
+            temp_dict[b[0]][0] += 1
+        if not temp_dict:
+            return set()
+        temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict]
+        # 鎸夌収娑ㄥ仠娆℃暟涓庢渶杩戞定鍋滄椂闂存帓搴�
+        temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True)
+        # 鍙栨定鍋滄鏁版渶澶氱殑鍜屾渶杩戞定鍋滅殑
+        # 鍙栫浉鍚屾鏁扮殑鍘熷洜
+        if temp_list:
+            _list = [t for t in temp_list if t[1] == temp_list[0][1]]
+            if _list[0][1] == 1:
+                _list = _list[:1]
+            blocks = set([x[0] for x in _list])
+        else:
+            blocks = set()
+
+        blocks -= constant.KPL_INVALID_BLOCKS
+        # 鍘婚櫎渚嬪姒傚康杩欎簺娉涙寚璇�
+        return set([kpl_util.filter_block(x) for x in blocks])
+
+    def load_code_plates_for_refer(self):
+        """
+        鑾峰彇鍙傝�冪エ鐨勬定鍋滃師鍥�
+        @return:
+        """
+        sql = f"SELECT r.`_code`, r.`_day`, r.`_hot_block_name`, r.`_blocks`,  r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{tool.date_sub(self.now_day, 365)}' and r.`_day` <'{self.now_day}'"
+        mysql = mysql_data_delegate.Mysqldb()
+        kpl_results = mysql.select_all(sql)
+        # {"浠g爜":[(鏉垮潡, 鏃ユ湡), (鏉垮潡, 鏃ユ湡)]}
+        kpl_block_dict = {}
+        for r in kpl_results:
+            # 褰撴棩鐐告澘鐨勪笉璁$畻鍘熷洜
+            if r[4] == 1:
+                continue
+            code = r[0]
+            if code not in kpl_block_dict:
+                kpl_block_dict[code] = []
+            kpl_block_dict[code].append((r[2], r[1]))  # (鏉垮潡, 鏃ユ湡)
+        reasons_dict = {}
+        for code in kpl_block_dict:
+            block_infos = kpl_block_dict.get(code)
+            reasons_dict[code] = self.__compute_limit_up_reasons_for_refer(block_infos)
+        return reasons_dict
+
+    def load_target_plate_and_codes(self):
+        """
+        鍔犺浇鐩爣鏉垮潡涓庡搴旂殑浠g爜锛�
+        浠庢渶杩�120涓氦鏄撴棩鐨勭湡姝f定鍋滄暟鎹腑
+        @return: {"鏉垮潡":[浠g爜]}
+        """
+        end_date = self.trade_days[:60][-1]
+        start_date = self.trade_days[:60][0]
+        mysql = mysql_data_delegate.Mysqldb()
+        # 鑾峰彇涓婁釜浜ゆ槗鏃ユ定鍋滅殑绁�
+        results = mysql.select_all(
+            f"SELECT r.`_code` FROM `kpl_limit_up_record` r where r._day='{self.trade_days[0]}' and r._open = 0")
+        exclude_codes = set([x[0] for x in results])
+        results = mysql.select_all(
+            f"select r.`_hot_block_name` from `kpl_limit_up_record` r where r.`_open`=0 and r.`_day`>'{end_date}' and r.`_day` <= '{start_date}' group by r.`_hot_block_name`")
+        blocks = set([x[0] for x in results])
+        fresults = {}
+        all_buy_plates_of_codes = self.load_all_buy_plates_of_codes()
+        valid_codes = set(all_buy_plates_of_codes.keys())
+        for b in blocks:
+            sql = f"""
+                    SELECT * FROM
+                    (
+                    SELECT r.`_code`, r.`_code_name`, COUNT(*) AS `count`, MAX(r.`_day`) AS _day FROM `kpl_limit_up_record` r WHERE r.`_open`=0  AND r.`_day`>'{end_date}' AND r.`_day`<='{start_date}' AND r.`_hot_block_name`='{b}' GROUP BY r.`_code`
+                    ) a 
+                    
+                    ORDER BY a.count DESC,a._day  DESC
+            """
+
+            results = mysql.select_all(sql)
+            # 鍙栧墠1/3
+            if results:
+                results = [x for x in results if
+                           (tool.is_can_buy_code(x[0]) and x[0] in valid_codes and x[0] not in exclude_codes)]
+                max_count = len(results) // 3 if len(results) % 3 == 0 else len(results) // 3 + 1
+                results = results[:max_count]
+                # 鍙栧墠10
+                results = results[:10]
+                codes = [x[0] for x in results]
+                fresults[kpl_util.filter_block(b)] = codes
+        return fresults
 
     def load_trade_days(self):
         """
@@ -182,6 +293,226 @@
         except Exception as e:
             return set(), None
 
+    def load_plate_codes(self, plate_code, plate_name):
+        """
+        鑾峰彇鏉垮潡鏈夊彲鑳戒拱鐨勭洰鏍囦唬鐮�
+        @param plate_code:
+        @return:[(浠g爜, 棰嗘定娆℃暟, 鏈�澶ч娑ㄦ鏁�)]
+        """
+
+        if not plate_code or plate_name == '鏃�' or plate_name in constant.KPL_INVALID_BLOCKS:
+            return set()
+        if plate_code in self.plate_codes:
+            return self.plate_codes.get(plate_code)
+        dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day)
+        if not os.path.exists(dir_path):
+            os.makedirs(dir_path)
+        path_ = os.path.join(dir_path, plate_code + ".text")
+        datas = []
+        if os.path.exists(path_):
+            with open(path_, mode='r', encoding='utf-8') as f:
+                codes_info = []
+                lines = f.readlines()
+                for i in range(len(lines)):
+                    if i == 0:
+                        continue
+                    line = lines[i]
+                    if line:
+                        r = eval(line)
+                        datas.append(r)
+        else:
+            codes_info = []
+            results = self.request_plate_codes(plate_code)
+            with open(path_, mode='w', encoding='utf-8') as f:
+                f.write(plate_name)
+                f.write("\n")
+                f.writelines([f"{x}\n" for x in results])
+            datas = results
+        # 淇濆瓨鍒板唴瀛樹腑
+        if datas:
+            max_data = max(datas, key=lambda x: x[3])
+            for r in datas:
+                if r[3] < 1:
+                    continue
+                if re.match(r"椋庨櫓|绔嬫", r[2]):
+                    continue
+                if r[1].find("ST") >= 0:
+                    continue
+                if r[1].find("S") >= 0:
+                    continue
+                if not tool.is_can_buy_code(r[0]):
+                    continue
+                codes_info.append((r[0], r[3], max_data[3]))
+                # if len(codes_info) >= 10:
+                #     break
+        f_codes_info = codes_info  # [d for d in codes_info if d[1] >= d[2] // 2]
+        self.plate_codes[plate_code] = f_codes_info
+        return self.plate_codes.get(plate_code)
+
+    def load_all_codes_of_plates(self, is_for_buy=False):
+        """
+        鍔犺浇鎵�鏈夋澘鍧楃殑棰嗘定绁�
+        @return:{"鏉垮潡浠g爜":(鏉垮潡鍚嶇О, [(浠g爜,浠g爜鍚嶇О,鏍囩,棰嗘定娆℃暟)])}
+        """
+        dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day)
+        if not os.path.exists(dir_path):
+            return None
+        fdata = {}
+        plate_files = os.listdir(dir_path)
+        for plate_file in plate_files:
+            plate_code = plate_file.split(".")[0]
+            path_ = os.path.join(dir_path, plate_file)
+            with open(path_, mode='r', encoding='utf-8') as f:
+                datas = []
+                lines = f.readlines()
+                for i in range(len(lines)):
+                    if i == 0:
+                        continue
+                    line = lines[i]
+                    if line:
+                        r = eval(line)
+                        if not is_for_buy:
+                            if r[3] < 1:
+                                continue
+                            if r[1].find("ST") >= 0:
+                                continue
+                            if r[1].find("S") >= 0:
+                                continue
+                        else:
+                            if re.match(r"椋庨櫓|绔嬫", r[2]):
+                                continue
+                            if r[1].find("ST") >= 0:
+                                continue
+                            if r[1].find("S") >= 0:
+                                continue
+                            if not tool.is_can_buy_code(r[0]):
+                                continue
+                        datas.append(r)
+                        # if len(datas) >= 10:
+                        #     break
+                fdata[plate_code] = (kpl_util.filter_block(lines[0].strip()), datas)
+        return fdata
+
+    def load_all_refer_plates_of_codes(self):
+        """
+        鍔犺浇鎵�鏈夋湁棰嗘定浠g爜鐨勯娑ㄦ澘鍧�
+        @return:
+        """
+        datas = self.load_all_codes_of_plates()
+        fdata = {}
+        for plate_code in datas:
+            plate_name = datas[plate_code][0]
+            codes_info = datas[plate_code][1]
+            for item in codes_info:
+                code, limit_up_count = item[0], item[3]
+                if code not in fdata:
+                    fdata[code] = []
+                fdata[code].append((plate_code, plate_name, limit_up_count))
+        for code in fdata:
+            fdata[code].sort(key=lambda x: x[2], reverse=True)
+            fdata[code] = fdata[code][:3]
+        return fdata
+
+    def load_all_buy_plates_of_codes(self):
+        """
+        鍔犺浇鎵�鏈変唬鐮佺殑棰嗘定鏉垮潡
+        @return: {"浠g爜":{"鏉垮潡鍚嶇О":(浠g爜, 棰嗘定娆℃暟, 鏈�澶ч娑ㄦ鏁�)}}
+        """
+        datas = self.load_all_codes_of_plates(is_for_buy=True)
+        fdata = {}
+        for plate_code in datas:
+            plate_name = datas[plate_code][0]
+            codes_info = datas[plate_code][1]
+            if not codes_info:
+                continue
+            max_count = max(codes_info, key=lambda x: x[3])[3]
+            for item in codes_info:
+                code, limit_up_count = item[0], item[3]
+                if code not in fdata:
+                    fdata[code] = {}
+                fdata[code][plate_name] = (code, limit_up_count, max_count)
+        fdata_dict = {c: [(p, fdata[c][p]) for p in fdata[c]] for c in fdata}
+        for c in fdata_dict:
+            fdata_dict[c].sort(key=lambda x: x[1], reverse=True)
+            fdata_dict[c] = fdata_dict[c][:3]
+
+        fdata = {code: {x[0]: x[1] for x in fdata_dict[code]} for code in fdata_dict}
+
+        return fdata
+
+    def request_plate_codes(self, plate_code):
+        """
+        鑾峰彇鏉垮潡鐨勪唬鐮�
+        @param plate_code:
+        @return:[浠g爜, 鍚嶇О, 椋庨櫓椤�, 棰嗘定娆℃暟]
+        """
+        fresults = []
+        for i in range(1, 10):
+            results = kpl_api.getHistoryCodesByPlateOrderByLZCS(plate_code, self.now_day, "0930", i)
+            results = json.loads(results)["list"]
+            fresults.extend(results)
+            if len(results) < 30:
+                break
+        fdatas = []
+        for result in fresults:
+            d = result[0], result[1], result[2], result[40]
+            fdatas.append(d)
+        return fdatas
+
+    def get_limit_up_reasons_with_plate_code(self):
+        """
+        鑾峰彇娑ㄥ仠鍘熷洜
+        :return: 娑ㄥ仠鏁版嵁璁板綍[(浠g爜, 鏃ユ湡, 鏉垮潡, 鏄惁鐐告澘)]
+        """
+        mysql = mysql_data_delegate.Mysqldb()
+        sql = """
+                SELECT _hot_block_code,_hot_block_name  FROM
+                (
+                SELECT r.`_hot_block_code`, r.`_hot_block_name`, r.`_create_time` FROM
+                  (SELECT  DISTINCT(c.`_hot_block_code`) FROM `kpl_limit_up_record` c WHERE c.`_day`>'鏈�灏忔棩鏈�' and c.`_day`<'浠婃棩鏃ユ湡') a
+                  LEFT JOIN kpl_limit_up_record r ON r.`_hot_block_code` = a._hot_block_code ORDER BY r.`_create_time` DESC
+                 ) b GROUP BY b._hot_block_code HAVING b._hot_block_code IS NOT NULL
+        """
+        sql = sql.replace("鏈�灏忔棩鏈�", self.trade_days[-1]).replace("浠婃棩鏃ユ湡", self.now_day)
+        results = mysql.select_all(sql)
+        return [x for x in results if kpl_util.filter_block(x[1]) not in constant.KPL_INVALID_BLOCKS]
+
+    def load_jx_blocks(self, code):
+        if code in self.jx_blocks:
+            self.jx_blocks.get(code)
+        # 浠庢枃浠朵腑璇诲彇
+        dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day)
+        path_str = os.path.join(dir_path, f"{code}.txt")
+        if os.path.exists(path_str):
+            with open(path_str, mode='r', encoding='utf-8') as f:
+                lines = f.readlines()
+                blocks = eval(lines[0])
+                self.jx_blocks[code] = blocks
+        if code in self.jx_blocks:
+            return self.jx_blocks.get(code)
+
+        blocks = kpl_api.getCodeJingXuanBlocks(code)
+        blocks = set([kpl_util.filter_block(x[1]) for x in blocks])
+        blocks -= constant.KPL_INVALID_BLOCKS
+        self.jx_blocks[code] = blocks
+        # 淇濆瓨鍒版枃浠�
+        if not os.path.exists(dir_path):
+            os.makedirs(dir_path)
+        with open(os.path.join(dir_path, f"{code}.txt"), mode='w', encoding='utf-8') as f:
+            f.write(f"{blocks}")
+        return blocks
+
+    def load_all_jx_blocks(self):
+        code_blocks = {}
+        # 浠庢枃浠朵腑璇诲彇
+        dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day)
+        files = os.listdir(dir_path)
+        for file in files:
+            code = file[:6]
+            with open(os.path.join(dir_path, file), mode='r', encoding='utf-8') as f:
+                code_blocks[code] = eval(f.readlines()[0])
+        return code_blocks
+
 
 class StrategyVariableFactory:
     @staticmethod
@@ -204,8 +535,11 @@
         instance.鏄ㄦ棩闈炴定鍋� = not KTickLineAnalyzer.is_yesterday_limit_up(kline_data_1d)
         instance.鏄ㄦ棩闈炵偢鏉� = not KTickLineAnalyzer.is_yesterday_exploded(kline_data_1d)
         instance.鏄ㄦ棩闈炶穼鍋� = not KTickLineAnalyzer.is_yesterday_limit_down(kline_data_1d)
+        instance.鏄ㄦ棩鏈�浣庝环 = KTickLineAnalyzer.get_yesterday_low_price(kline_data_1d)
+        instance.鏄ㄦ棩寮�鐩樹环 = KTickLineAnalyzer.get_yesterday_open_price(kline_data_1d)
+
         day_counts = [5, 10, 30, 60, 120]
-        for day in day_counts:
+        for day in [3, 5, 10, 30, 60, 120]:
             instance.__setattr__(f"鏃ユ渶楂樹环_{day}", KTickLineAnalyzer.get_recent_days_high(kline_data_1d, day))
         for day in day_counts:
             instance.__setattr__(f"鏃ユ渶楂橀噺_{day}", KTickLineAnalyzer.get_recent_days_max_volume(kline_data_1d, day))
@@ -238,12 +572,16 @@
         for day in day_counts:
             instance.__setattr__(f"鏃ュぇ绛変簬4娆¤穼鍋滀釜鏁癬{day}",
                                  KTickLineAnalyzer.get_fourth_or_more_limit_down_days(kline_data_1d, day))
+        for day in [5, 10, 15, 30, 60, 120]:
+            instance.__setattr__(f"鏃ユ斁鍊嶉噺鏃ユ湡_{day}",
+                                 KTickLineAnalyzer.get_recent_days_double_volume_date(kline_data_1d, day))
 
         for day in day_counts:
             days = trade_days[:day]
             instance.__setattr__(f"鏃ヤ釜鑲℃渶姝g殑鍘熷洜_{day}",
                                  KPLLimitUpDataAnalyzer.get_most_common_reasons(limit_up_data_records, min_day=days[-1],
                                                                                 max_day=days[0]))
+
         if kline_data_60s_dict:
             for day in day_counts:
                 # 鑾峰彇鏃鏈�楂橀噺鐨勪俊鎭�
@@ -258,9 +596,71 @@
         return instance
 
 
+def __test_jx_blocks(__DataLoader):
+    def load_all_codes():
+        codes = set()
+        with open("D:/codes/codes_sh.text") as f:
+            lines = f.readlines()
+            codes |= set([x.strip() for x in lines if x.find("30") != 0])
+        with open("D:/codes/codes_sz.text") as f:
+            lines = f.readlines()
+            codes |= set([x.strip() for x in lines if x.find("30") != 0])
+        return codes
+
+    code_blocks = __DataLoader.load_all_jx_blocks()
+    # codes = load_all_codes()
+    # for code in codes:
+    #     print(code)
+    #     __DataLoader.load_jx_blocks(code)
+    codes = ["002639", "002366"]
+    same_blocks = set()
+    for c in codes:
+        blocks = __DataLoader.load_jx_blocks(c)
+        if not same_blocks:
+            same_blocks = blocks
+        same_blocks &= blocks
+    print("鐩稿悓鏉垮潡", same_blocks)
+    for code in code_blocks:
+        if len(code_blocks[code] & same_blocks) == len(same_blocks):
+            if code in codes:
+                continue
+            print(code, code_blocks[code])
+
+
 if __name__ == "__main__":
+    __DataLoader = DataLoader("2025-06-05")
+    # __test_jx_blocks(__DataLoader)
+
     # instance = StockVariables()
     # day = 5
     # instance.__setattr__(f"鏃ユ渶楂樹环_{day}", 12.00)
     # print(instance.鏃ユ渶楂樹环_5)
-    DataLoader("2025-05-06").load_tick_data()
+
+    # 涓嬭浇鐩爣绁ㄧ殑鏉垮潡
+    # fdata = __DataLoader.load_all_refer_plates_of_codes()
+    # print(fdata.get("000833"))
+
+    # result_dict = __DataLoader.load_code_plates_for_refer()
+    # print(result_dict["301279"])
+
+    results = __DataLoader.load_target_plate_and_codes()
+    plates = ["鏈夎壊閲戝睘"]
+    print("==========鏂伴鏉�=======")
+    for p in plates:
+        print(p, results.get(p))
+
+    # print("椋熷搧楗枡", results.get("椋熷搧楗枡"))
+    # print("閿傜數姹�", results.get("閿傜數姹�"))
+    # print("鏁板瓧缁忔祹", results.get("鏁板瓧缁忔祹"))
+    # print("鍦颁骇閾�", results.get("鍦颁骇閾�"))
+    # print("鐗╂祦", results.get("鐗╂祦"))
+
+    # 涓嬭浇娑ㄥ仠鍘熷洜鏉垮潡瀵瑰簲鐨勪唬鐮�
+    plates = __DataLoader.get_limit_up_reasons_with_plate_code()
+    for p in plates:
+        print(p)
+        __DataLoader.load_plate_codes(p[0], p[1])
+
+    # DataLoader("2025-05-06").load_tick_data()
+    #
+    # print(re.match(r"椋庨櫓|绔嬫", "椋庨櫓123"))

--
Gitblit v1.8.0