From 6df8d9ac75a041377c01c80e6e970e5c75ce7662 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 06 六月 2025 17:33:41 +0800 Subject: [PATCH] 初始化导入 --- strategy/strategy_variable_factory.py | 412 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 406 insertions(+), 6 deletions(-) diff --git a/strategy/strategy_variable_factory.py b/strategy/strategy_variable_factory.py index 2f366ff..a8a0643 100644 --- a/strategy/strategy_variable_factory.py +++ b/strategy/strategy_variable_factory.py @@ -4,11 +4,14 @@ import datetime import json import os +import re +import constant from code_attribute import global_data_loader from db import mysql_data_delegate from strategy.data_analyzer import KTickLineAnalyzer, KPLLimitUpDataAnalyzer, K60SLineAnalyzer from strategy.strategy_variable import StockVariables +from third_data import kpl_api, kpl_util from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import JueJinLocalApi, HistoryKDatasUtils from utils import global_util, tool @@ -29,11 +32,14 @@ self.jueJinLocalApi = JueJinLocalApi("41c4f5da-2591-11f0-a9c9-f4b5203f67bf", "018db265fa34e241dd6198b7ca507ee0a82ad029") self.trade_days = self.load_trade_days() + self.plate_codes = {} + # 浠g爜鐨勭簿閫夋澘鍧� 锛� {"浠g爜":{鏉垮潡}} + self.jx_blocks = {} def load_kline_data(self): """ 鍔犺浇鏃绾挎暟鎹� - :return: 鏃绾挎暟鎹� + :return: {"浠g爜": 鏃绾挎暟鎹畗 """ dir_path = os.path.join(self.cache_path, "k_bars") day = self.trade_days[0] @@ -86,7 +92,7 @@ k_bar_code_data_dict[code] = date_datas return k_bar_code_data_dict - def load_tick_data(self): + def load_tick_data(self, target_codes=None): """ 鍔犺浇褰撴棩鐨則ick鏁版嵁 :return:tick鏁版嵁瀛楀吀 @@ -99,6 +105,8 @@ if f.find(self.now_day) < 0: continue code = f.split("_")[1][:6] + if target_codes and code not in target_codes: + continue tick_path = os.path.join(tick_dir_path, f) with open(tick_path, mode='r') as f: lines = f.readlines() @@ -124,12 +132,115 @@ def load_limit_up_data(self): """ 鍔犺浇娑ㄥ仠鏁版嵁 - :return: 娑ㄥ仠鏁版嵁璁板綍 + :return: 娑ㄥ仠鏁版嵁璁板綍[(浠g爜, 鏃ユ湡, 鏉垮潡, 鏄惁鐐告澘)] """ mysql = mysql_data_delegate.Mysqldb() results = mysql.select_all( - f"select _code, _day, _hot_block_name from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day<='{self.trade_days[0]}' and _open=0") + f"select _code, _day, _hot_block_name, _open, _blocks from kpl_limit_up_record where _day>='{self.trade_days[-1]}' and _day <='{self.trade_days[0]}'") + for r in results: + r[2] = kpl_util.filter_block(r[2]) return results + + def __compute_limit_up_reasons_for_refer(self, block_infos): + """ + 璁$畻鍙傝�冪エ鐨勬定鍋滃師鍥� + @param block_infos: [(鏉垮潡, 鏃ユ湡)] + @return: + """ + # [(鏉垮潡, 鏃ユ湡)] + block_infos.sort(key=lambda x: x[1], reverse=True) + # {"鏉垮潡":[(鍑虹幇娆℃暟, 鏈�杩戝嚭鐜版棩鏈�)]} + temp_dict = {} + for b in block_infos: + if b[0] in constant.KPL_INVALID_BLOCKS: + continue + if b[0] not in temp_dict: + temp_dict[b[0]] = [0, b[1]] + temp_dict[b[0]][0] += 1 + if not temp_dict: + return set() + temp_list = [(k, temp_dict[k][0], temp_dict[k][1]) for k in temp_dict] + # 鎸夌収娑ㄥ仠娆℃暟涓庢渶杩戞定鍋滄椂闂存帓搴� + temp_list.sort(key=lambda x: (x[1], x[2]), reverse=True) + # 鍙栨定鍋滄鏁版渶澶氱殑鍜屾渶杩戞定鍋滅殑 + # 鍙栫浉鍚屾鏁扮殑鍘熷洜 + if temp_list: + _list = [t for t in temp_list if t[1] == temp_list[0][1]] + if _list[0][1] == 1: + _list = _list[:1] + blocks = set([x[0] for x in _list]) + else: + blocks = set() + + blocks -= constant.KPL_INVALID_BLOCKS + # 鍘婚櫎渚嬪姒傚康杩欎簺娉涙寚璇� + return set([kpl_util.filter_block(x) for x in blocks]) + + def load_code_plates_for_refer(self): + """ + 鑾峰彇鍙傝�冪エ鐨勬定鍋滃師鍥� + @return: + """ + sql = f"SELECT r.`_code`, r.`_day`, r.`_hot_block_name`, r.`_blocks`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{tool.date_sub(self.now_day, 365)}' and r.`_day` <'{self.now_day}'" + mysql = mysql_data_delegate.Mysqldb() + kpl_results = mysql.select_all(sql) + # {"浠g爜":[(鏉垮潡, 鏃ユ湡), (鏉垮潡, 鏃ユ湡)]} + kpl_block_dict = {} + for r in kpl_results: + # 褰撴棩鐐告澘鐨勪笉璁$畻鍘熷洜 + if r[4] == 1: + continue + code = r[0] + if code not in kpl_block_dict: + kpl_block_dict[code] = [] + kpl_block_dict[code].append((r[2], r[1])) # (鏉垮潡, 鏃ユ湡) + reasons_dict = {} + for code in kpl_block_dict: + block_infos = kpl_block_dict.get(code) + reasons_dict[code] = self.__compute_limit_up_reasons_for_refer(block_infos) + return reasons_dict + + def load_target_plate_and_codes(self): + """ + 鍔犺浇鐩爣鏉垮潡涓庡搴旂殑浠g爜锛� + 浠庢渶杩�120涓氦鏄撴棩鐨勭湡姝f定鍋滄暟鎹腑 + @return: {"鏉垮潡":[浠g爜]} + """ + end_date = self.trade_days[:60][-1] + start_date = self.trade_days[:60][0] + mysql = mysql_data_delegate.Mysqldb() + # 鑾峰彇涓婁釜浜ゆ槗鏃ユ定鍋滅殑绁� + results = mysql.select_all( + f"SELECT r.`_code` FROM `kpl_limit_up_record` r where r._day='{self.trade_days[0]}' and r._open = 0") + exclude_codes = set([x[0] for x in results]) + results = mysql.select_all( + f"select r.`_hot_block_name` from `kpl_limit_up_record` r where r.`_open`=0 and r.`_day`>'{end_date}' and r.`_day` <= '{start_date}' group by r.`_hot_block_name`") + blocks = set([x[0] for x in results]) + fresults = {} + all_buy_plates_of_codes = self.load_all_buy_plates_of_codes() + valid_codes = set(all_buy_plates_of_codes.keys()) + for b in blocks: + sql = f""" + SELECT * FROM + ( + SELECT r.`_code`, r.`_code_name`, COUNT(*) AS `count`, MAX(r.`_day`) AS _day FROM `kpl_limit_up_record` r WHERE r.`_open`=0 AND r.`_day`>'{end_date}' AND r.`_day`<='{start_date}' AND r.`_hot_block_name`='{b}' GROUP BY r.`_code` + ) a + + ORDER BY a.count DESC,a._day DESC + """ + + results = mysql.select_all(sql) + # 鍙栧墠1/3 + if results: + results = [x for x in results if + (tool.is_can_buy_code(x[0]) and x[0] in valid_codes and x[0] not in exclude_codes)] + max_count = len(results) // 3 if len(results) % 3 == 0 else len(results) // 3 + 1 + results = results[:max_count] + # 鍙栧墠10 + results = results[:10] + codes = [x[0] for x in results] + fresults[kpl_util.filter_block(b)] = codes + return fresults def load_trade_days(self): """ @@ -182,6 +293,226 @@ except Exception as e: return set(), None + def load_plate_codes(self, plate_code, plate_name): + """ + 鑾峰彇鏉垮潡鏈夊彲鑳戒拱鐨勭洰鏍囦唬鐮� + @param plate_code: + @return:[(浠g爜, 棰嗘定娆℃暟, 鏈�澶ч娑ㄦ鏁�)] + """ + + if not plate_code or plate_name == '鏃�' or plate_name in constant.KPL_INVALID_BLOCKS: + return set() + if plate_code in self.plate_codes: + return self.plate_codes.get(plate_code) + dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + path_ = os.path.join(dir_path, plate_code + ".text") + datas = [] + if os.path.exists(path_): + with open(path_, mode='r', encoding='utf-8') as f: + codes_info = [] + lines = f.readlines() + for i in range(len(lines)): + if i == 0: + continue + line = lines[i] + if line: + r = eval(line) + datas.append(r) + else: + codes_info = [] + results = self.request_plate_codes(plate_code) + with open(path_, mode='w', encoding='utf-8') as f: + f.write(plate_name) + f.write("\n") + f.writelines([f"{x}\n" for x in results]) + datas = results + # 淇濆瓨鍒板唴瀛樹腑 + if datas: + max_data = max(datas, key=lambda x: x[3]) + for r in datas: + if r[3] < 1: + continue + if re.match(r"椋庨櫓|绔嬫", r[2]): + continue + if r[1].find("ST") >= 0: + continue + if r[1].find("S") >= 0: + continue + if not tool.is_can_buy_code(r[0]): + continue + codes_info.append((r[0], r[3], max_data[3])) + # if len(codes_info) >= 10: + # break + f_codes_info = codes_info # [d for d in codes_info if d[1] >= d[2] // 2] + self.plate_codes[plate_code] = f_codes_info + return self.plate_codes.get(plate_code) + + def load_all_codes_of_plates(self, is_for_buy=False): + """ + 鍔犺浇鎵�鏈夋澘鍧楃殑棰嗘定绁� + @return:{"鏉垮潡浠g爜":(鏉垮潡鍚嶇О, [(浠g爜,浠g爜鍚嶇О,鏍囩,棰嗘定娆℃暟)])} + """ + dir_path = os.path.join(self.cache_path, "plate_codes_info", self.now_day) + if not os.path.exists(dir_path): + return None + fdata = {} + plate_files = os.listdir(dir_path) + for plate_file in plate_files: + plate_code = plate_file.split(".")[0] + path_ = os.path.join(dir_path, plate_file) + with open(path_, mode='r', encoding='utf-8') as f: + datas = [] + lines = f.readlines() + for i in range(len(lines)): + if i == 0: + continue + line = lines[i] + if line: + r = eval(line) + if not is_for_buy: + if r[3] < 1: + continue + if r[1].find("ST") >= 0: + continue + if r[1].find("S") >= 0: + continue + else: + if re.match(r"椋庨櫓|绔嬫", r[2]): + continue + if r[1].find("ST") >= 0: + continue + if r[1].find("S") >= 0: + continue + if not tool.is_can_buy_code(r[0]): + continue + datas.append(r) + # if len(datas) >= 10: + # break + fdata[plate_code] = (kpl_util.filter_block(lines[0].strip()), datas) + return fdata + + def load_all_refer_plates_of_codes(self): + """ + 鍔犺浇鎵�鏈夋湁棰嗘定浠g爜鐨勯娑ㄦ澘鍧� + @return: + """ + datas = self.load_all_codes_of_plates() + fdata = {} + for plate_code in datas: + plate_name = datas[plate_code][0] + codes_info = datas[plate_code][1] + for item in codes_info: + code, limit_up_count = item[0], item[3] + if code not in fdata: + fdata[code] = [] + fdata[code].append((plate_code, plate_name, limit_up_count)) + for code in fdata: + fdata[code].sort(key=lambda x: x[2], reverse=True) + fdata[code] = fdata[code][:3] + return fdata + + def load_all_buy_plates_of_codes(self): + """ + 鍔犺浇鎵�鏈変唬鐮佺殑棰嗘定鏉垮潡 + @return: {"浠g爜":{"鏉垮潡鍚嶇О":(浠g爜, 棰嗘定娆℃暟, 鏈�澶ч娑ㄦ鏁�)}} + """ + datas = self.load_all_codes_of_plates(is_for_buy=True) + fdata = {} + for plate_code in datas: + plate_name = datas[plate_code][0] + codes_info = datas[plate_code][1] + if not codes_info: + continue + max_count = max(codes_info, key=lambda x: x[3])[3] + for item in codes_info: + code, limit_up_count = item[0], item[3] + if code not in fdata: + fdata[code] = {} + fdata[code][plate_name] = (code, limit_up_count, max_count) + fdata_dict = {c: [(p, fdata[c][p]) for p in fdata[c]] for c in fdata} + for c in fdata_dict: + fdata_dict[c].sort(key=lambda x: x[1], reverse=True) + fdata_dict[c] = fdata_dict[c][:3] + + fdata = {code: {x[0]: x[1] for x in fdata_dict[code]} for code in fdata_dict} + + return fdata + + def request_plate_codes(self, plate_code): + """ + 鑾峰彇鏉垮潡鐨勪唬鐮� + @param plate_code: + @return:[浠g爜, 鍚嶇О, 椋庨櫓椤�, 棰嗘定娆℃暟] + """ + fresults = [] + for i in range(1, 10): + results = kpl_api.getHistoryCodesByPlateOrderByLZCS(plate_code, self.now_day, "0930", i) + results = json.loads(results)["list"] + fresults.extend(results) + if len(results) < 30: + break + fdatas = [] + for result in fresults: + d = result[0], result[1], result[2], result[40] + fdatas.append(d) + return fdatas + + def get_limit_up_reasons_with_plate_code(self): + """ + 鑾峰彇娑ㄥ仠鍘熷洜 + :return: 娑ㄥ仠鏁版嵁璁板綍[(浠g爜, 鏃ユ湡, 鏉垮潡, 鏄惁鐐告澘)] + """ + mysql = mysql_data_delegate.Mysqldb() + sql = """ + SELECT _hot_block_code,_hot_block_name FROM + ( + SELECT r.`_hot_block_code`, r.`_hot_block_name`, r.`_create_time` FROM + (SELECT DISTINCT(c.`_hot_block_code`) FROM `kpl_limit_up_record` c WHERE c.`_day`>'鏈�灏忔棩鏈�' and c.`_day`<'浠婃棩鏃ユ湡') a + LEFT JOIN kpl_limit_up_record r ON r.`_hot_block_code` = a._hot_block_code ORDER BY r.`_create_time` DESC + ) b GROUP BY b._hot_block_code HAVING b._hot_block_code IS NOT NULL + """ + sql = sql.replace("鏈�灏忔棩鏈�", self.trade_days[-1]).replace("浠婃棩鏃ユ湡", self.now_day) + results = mysql.select_all(sql) + return [x for x in results if kpl_util.filter_block(x[1]) not in constant.KPL_INVALID_BLOCKS] + + def load_jx_blocks(self, code): + if code in self.jx_blocks: + self.jx_blocks.get(code) + # 浠庢枃浠朵腑璇诲彇 + dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day) + path_str = os.path.join(dir_path, f"{code}.txt") + if os.path.exists(path_str): + with open(path_str, mode='r', encoding='utf-8') as f: + lines = f.readlines() + blocks = eval(lines[0]) + self.jx_blocks[code] = blocks + if code in self.jx_blocks: + return self.jx_blocks.get(code) + + blocks = kpl_api.getCodeJingXuanBlocks(code) + blocks = set([kpl_util.filter_block(x[1]) for x in blocks]) + blocks -= constant.KPL_INVALID_BLOCKS + self.jx_blocks[code] = blocks + # 淇濆瓨鍒版枃浠� + if not os.path.exists(dir_path): + os.makedirs(dir_path) + with open(os.path.join(dir_path, f"{code}.txt"), mode='w', encoding='utf-8') as f: + f.write(f"{blocks}") + return blocks + + def load_all_jx_blocks(self): + code_blocks = {} + # 浠庢枃浠朵腑璇诲彇 + dir_path = os.path.join(self.cache_path, "jx_blocks", self.now_day) + files = os.listdir(dir_path) + for file in files: + code = file[:6] + with open(os.path.join(dir_path, file), mode='r', encoding='utf-8') as f: + code_blocks[code] = eval(f.readlines()[0]) + return code_blocks + class StrategyVariableFactory: @staticmethod @@ -204,8 +535,11 @@ instance.鏄ㄦ棩闈炴定鍋� = not KTickLineAnalyzer.is_yesterday_limit_up(kline_data_1d) instance.鏄ㄦ棩闈炵偢鏉� = not KTickLineAnalyzer.is_yesterday_exploded(kline_data_1d) instance.鏄ㄦ棩闈炶穼鍋� = not KTickLineAnalyzer.is_yesterday_limit_down(kline_data_1d) + instance.鏄ㄦ棩鏈�浣庝环 = KTickLineAnalyzer.get_yesterday_low_price(kline_data_1d) + instance.鏄ㄦ棩寮�鐩樹环 = KTickLineAnalyzer.get_yesterday_open_price(kline_data_1d) + day_counts = [5, 10, 30, 60, 120] - for day in day_counts: + for day in [3, 5, 10, 30, 60, 120]: instance.__setattr__(f"鏃ユ渶楂樹环_{day}", KTickLineAnalyzer.get_recent_days_high(kline_data_1d, day)) for day in day_counts: instance.__setattr__(f"鏃ユ渶楂橀噺_{day}", KTickLineAnalyzer.get_recent_days_max_volume(kline_data_1d, day)) @@ -238,12 +572,16 @@ for day in day_counts: instance.__setattr__(f"鏃ュぇ绛変簬4娆¤穼鍋滀釜鏁癬{day}", KTickLineAnalyzer.get_fourth_or_more_limit_down_days(kline_data_1d, day)) + for day in [5, 10, 15, 30, 60, 120]: + instance.__setattr__(f"鏃ユ斁鍊嶉噺鏃ユ湡_{day}", + KTickLineAnalyzer.get_recent_days_double_volume_date(kline_data_1d, day)) for day in day_counts: days = trade_days[:day] instance.__setattr__(f"鏃ヤ釜鑲℃渶姝g殑鍘熷洜_{day}", KPLLimitUpDataAnalyzer.get_most_common_reasons(limit_up_data_records, min_day=days[-1], max_day=days[0])) + if kline_data_60s_dict: for day in day_counts: # 鑾峰彇鏃鏈�楂橀噺鐨勪俊鎭� @@ -258,9 +596,71 @@ return instance +def __test_jx_blocks(__DataLoader): + def load_all_codes(): + codes = set() + with open("D:/codes/codes_sh.text") as f: + lines = f.readlines() + codes |= set([x.strip() for x in lines if x.find("30") != 0]) + with open("D:/codes/codes_sz.text") as f: + lines = f.readlines() + codes |= set([x.strip() for x in lines if x.find("30") != 0]) + return codes + + code_blocks = __DataLoader.load_all_jx_blocks() + # codes = load_all_codes() + # for code in codes: + # print(code) + # __DataLoader.load_jx_blocks(code) + codes = ["002639", "002366"] + same_blocks = set() + for c in codes: + blocks = __DataLoader.load_jx_blocks(c) + if not same_blocks: + same_blocks = blocks + same_blocks &= blocks + print("鐩稿悓鏉垮潡", same_blocks) + for code in code_blocks: + if len(code_blocks[code] & same_blocks) == len(same_blocks): + if code in codes: + continue + print(code, code_blocks[code]) + + if __name__ == "__main__": + __DataLoader = DataLoader("2025-06-05") + # __test_jx_blocks(__DataLoader) + # instance = StockVariables() # day = 5 # instance.__setattr__(f"鏃ユ渶楂樹环_{day}", 12.00) # print(instance.鏃ユ渶楂樹环_5) - DataLoader("2025-05-06").load_tick_data() + + # 涓嬭浇鐩爣绁ㄧ殑鏉垮潡 + # fdata = __DataLoader.load_all_refer_plates_of_codes() + # print(fdata.get("000833")) + + # result_dict = __DataLoader.load_code_plates_for_refer() + # print(result_dict["301279"]) + + results = __DataLoader.load_target_plate_and_codes() + plates = ["鏈夎壊閲戝睘"] + print("==========鏂伴鏉�=======") + for p in plates: + print(p, results.get(p)) + + # print("椋熷搧楗枡", results.get("椋熷搧楗枡")) + # print("閿傜數姹�", results.get("閿傜數姹�")) + # print("鏁板瓧缁忔祹", results.get("鏁板瓧缁忔祹")) + # print("鍦颁骇閾�", results.get("鍦颁骇閾�")) + # print("鐗╂祦", results.get("鐗╂祦")) + + # 涓嬭浇娑ㄥ仠鍘熷洜鏉垮潡瀵瑰簲鐨勪唬鐮� + plates = __DataLoader.get_limit_up_reasons_with_plate_code() + for p in plates: + print(p) + __DataLoader.load_plate_codes(p[0], p[1]) + + # DataLoader("2025-05-06").load_tick_data() + # + # print(re.match(r"椋庨櫓|绔嬫", "椋庨櫓123")) -- Gitblit v1.8.0