From fb47d36048e94b9a506d5c153e3dd19a01e37df1 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 30 十月 2023 16:30:27 +0800
Subject: [PATCH] bug修复
---
third_data/kpl_data_manager.py | 115 ++++++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 96 insertions(+), 19 deletions(-)
diff --git a/third_data/kpl_data_manager.py b/third_data/kpl_data_manager.py
index 5b06f51..d90a373 100644
--- a/third_data/kpl_data_manager.py
+++ b/third_data/kpl_data_manager.py
@@ -1,12 +1,18 @@
import json
+import logging
import os
+import threading
+import time
+
+import requests
import constant
+from db.redis_manager_delegate import RedisUtils
from utils import tool
# 寮�鐩樺暒鍘嗗彶娑ㄥ仠鏁版嵁绠$悊
-from db import mysql_data, redis_manager
-from log_module.log import logger_kpl_limit_up_reason_change
+from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
+from log_module.log import logger_kpl_limit_up_reason_change, logger_debug
from third_data import kpl_util, kpl_api
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodesHisReasonAndBlocksManager
@@ -21,13 +27,13 @@
return self.__redisManager.getRedis()
def save_reason(self, code, reason):
- self.__get_redis().setex(f"kpl_limitup_reason-{code}", tool.get_expire(), reason)
+ RedisUtils.setex(self.__get_redis(), f"kpl_limitup_reason-{code}", tool.get_expire(), reason)
def list_all(self):
- keys = self.__get_redis().keys("kpl_limitup_reason-*")
+ keys = RedisUtils.keys(self.__get_redis(), "kpl_limitup_reason-*")
dict_ = {}
for k in keys:
- val = self.__get_redis().get(k)
+ val = RedisUtils.get(self.__get_redis(), k)
dict_[k.split("-")[1]] = val
return dict_
@@ -157,7 +163,6 @@
wheres.append(f"hb.`_hot_block_name` != '{b}'")
wheres = " and ".join(wheres)
sql = f"SELECT GROUP_CONCAT(_hot_block_name) FROM (SELECT hb.`_hot_block_name`,hb.`_day` FROM `kpl_limit_up_record` hb WHERE hb.`_code`='{code}' AND {wheres} ORDER BY hb.`_day` DESC LIMIT 2) a GROUP BY a._day ORDER BY a._day DESC LIMIT 1"
- print(sql)
mysqldb = mysql_data.Mysqldb()
return mysqldb.select_one(sql)
@@ -186,14 +191,17 @@
class KPLDataManager:
__latest_datas = {}
+ kpl_data_update_info = {}
- def __save_in_file(self, key, datas):
+ @classmethod
+ def __save_in_file(cls, key, datas):
name = f"{tool.get_now_date_str()}_{key}.log"
path = f"{constant.CACHE_PATH}/{name}"
with open(path, 'w') as f:
f.write(json.dumps(datas))
- def __get_from_file(self, key, day=tool.get_now_date_str()):
+ @classmethod
+ def __get_from_file(cls, key, day=tool.get_now_date_str()):
name = f"{day}_{key}.log"
path = f"{constant.CACHE_PATH}/{name}"
if not os.path.exists(path):
@@ -204,7 +212,8 @@
return json.loads(lines[0])
return None
- def get_from_file(self, type, day):
+ @classmethod
+ def get_from_file(cls, type, day):
name = f"{day}_{type.value}.log"
path = f"{constant.CACHE_PATH}/{name}"
if not os.path.exists(path):
@@ -215,8 +224,9 @@
return json.loads(lines[0])
return None
+ @classmethod
# 鑾峰彇鏈�杩戝嚑澶╃殑鏁版嵁锛屾牴鎹棩鏈熷�掑簭杩斿洖
- def get_latest_from_file(self, type, count):
+ def get_latest_from_file(cls, type, count):
files = os.listdir(constant.CACHE_PATH)
file_name_list = []
for f in files:
@@ -237,17 +247,20 @@
return fresults
- def save_data(self, type, datas):
- self.__latest_datas[type] = datas
- self.__save_in_file(type, datas)
+ @classmethod
+ def save_data(cls, type, datas):
+ cls.kpl_data_update_info[type] = (tool.get_now_time_str(), len(datas))
+ cls.__latest_datas[type] = datas
+ cls.__save_in_file(type, datas)
- def get_data(self, type):
+ @classmethod
+ def get_data(cls, type):
type = type.value
- if type in self.__latest_datas:
- return self.__latest_datas[type]
- result = self.__get_from_file(type)
+ if type in cls.__latest_datas:
+ return cls.__latest_datas[type]
+ result = cls.__get_from_file(type)
if result is not None:
- self.__latest_datas[type] = result
+ cls.__latest_datas[type] = result
return result
@@ -282,6 +295,7 @@
if day in __limit_up_list_records_dict:
datas = __limit_up_list_records_dict[day]
else:
+ logger_debug.info("浠庢枃浠朵腑鑾峰彇鍓嶅嚑澶╃殑瀹炴椂娑ㄥ仠鏁版嵁")
datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, 10)
if datas:
# 淇濆瓨鏁版嵁
@@ -301,5 +315,68 @@
return yesterday_codes
+# 杩愯鎷夊彇浠诲姟
+def run_pull_task():
+ def __upload_data(type, datas):
+ root_data = {
+ "type": type,
+ "data": datas
+ }
+ requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
+
+ def get_limit_up():
+ while True:
+ if tool.is_trade_time():
+ try:
+ results = kpl_api.getLimitUpInfo()
+ result = json.loads(results)
+ start_time = time.time()
+ __upload_data("limit_up", result)
+ logger_kpl_limit_up_reason_change.info("涓婁紶鑰楁椂锛歿}", time.time() - start_time)
+ except Exception as e:
+ logging.exception(e)
+ time.sleep(3)
+
+ def get_bidding_money():
+ # 绔炰环鏁版嵁涓婁紶
+ while True:
+ if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
+ try:
+ results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
+ result = json.loads(results)
+ __upload_data("biddings", result)
+ except Exception as e:
+ pass
+ time.sleep(3)
+
+ def get_market_industry():
+ while True:
+ if tool.is_trade_time():
+ try:
+ results = kpl_api.getMarketIndustryRealRankingInfo()
+ result = json.loads(results)
+ __upload_data("industry_rank", result)
+ except:
+ pass
+ time.sleep(3)
+
+ def get_market_jingxuan():
+ while True:
+ if tool.is_trade_time():
+ try:
+ results = kpl_api.getMarketJingXuanRealRankingInfo()
+ result = json.loads(results)
+ __upload_data("jingxuan_rank", result)
+ except:
+ pass
+ time.sleep(3)
+
+ threading.Thread(target=get_limit_up, daemon=True).start()
+ threading.Thread(target=get_bidding_money, daemon=True).start()
+ # threading.Thread(target=get_market_industry, daemon=True).start()
+ # threading.Thread(target=get_market_jingxuan, daemon=True).start()
+
+
if __name__ == "__main__":
- print(KPLLimitUpDataRecordManager.get_latest_blocks_set("002671"))
+ run_pull_task()
+ input()
--
Gitblit v1.8.0