import threading
|
|
from third_data import history_k_data_util, kpl_api
|
from db import redis_manager_delegate as redis_manager
|
from db.mysql_data_delegate import Mysqldb
|
from db.redis_manager_delegate import RedisUtils
|
from utils import tool
|
|
__db = 0
|
_redisManager = redis_manager.RedisManager(0)
|
|
|
def update_all_zylt_volumes():
|
"""
|
更新所有目标代码的自由流通股本
|
@return:
|
"""
|
# 判断当前是否是交易日
|
previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
|
if history_k_data_util.HistoryKDatasUtils.get_next_trading_date(previous_trading_date) != tool.get_now_date_str():
|
# 非交易日
|
return 0
|
|
def update_zylt_volume(codes):
|
# 获取最近的价格
|
price_datas = history_k_data_util.JueJinApi.get_gp_current_info(codes, "symbol,price")
|
price_dict = {x['symbol'].split(".")[1]: x['price'] for x in price_datas}
|
for code in price_dict:
|
try:
|
zylt = kpl_api.getZYLTAmount(code)
|
ZYLTGBUtil.save_volume(code, int(zylt / price_dict[code]))
|
except:
|
pass
|
|
# 刷新目标代码的自由流通量
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False)
|
codes = set()
|
if codes_sh:
|
for code_byte in codes_sh:
|
codes.add(code_byte.decode())
|
for code_byte in codes_sz:
|
codes.add(code_byte.decode())
|
|
updated_codes = ZYLTGBUtil.get_today_updated_volume_codes()
|
codes = codes - set(updated_codes)
|
threading.Thread(target=lambda: update_zylt_volume(codes), daemon=True).start()
|
return len(codes)
|
|
|
# 自由流通股本工具类
|
class ZYLTGBUtil:
|
__db = 0
|
__mysql = Mysqldb()
|
|
@classmethod
|
def save(cls, code, val, unit):
|
RedisUtils.setex(_redisManager.getRedis(), "zyltgb-{}".format(code), tool.get_expire(),
|
round(float(val) * 100000000) if int(unit) == 0 else round(
|
float(val) * 10000))
|
|
@classmethod
|
def save_async(cls, code, val, price):
|
"""
|
异步保存自由流通股本
|
@param code:
|
@param val:
|
@param price:
|
@return:
|
"""
|
RedisUtils.setex_async(cls.__db, "zyltgb-{}".format(code), tool.get_expire(),
|
val)
|
if price > 0:
|
zylt_volume = int(int(val) / float(price))
|
cls.save_volume(code, zylt_volume)
|
|
@classmethod
|
def save_volume(cls, code, zylt_volume):
|
result = cls.__mysql.select_one(f"select * from kpl_zylt_volume where code = {code}")
|
if result:
|
cls.__mysql.execute(
|
f"update kpl_zylt_volume set zylt_volume = {zylt_volume}, update_time=now() where code = {code}")
|
else:
|
cls.__mysql.execute(
|
f"insert into kpl_zylt_volume(code, zylt_volume, create_time, update_time) values ('{code}',{zylt_volume},now(),now())")
|
|
@classmethod
|
def get_today_updated_volume_codes(cls):
|
"""
|
获取今日已经更新量的代码
|
@return:
|
"""
|
fresults = cls.__mysql.select_all(
|
f"select code from kpl_zylt_volume where update_time >= '{tool.get_now_date_str()}'")
|
if fresults:
|
return [x[0] for x in fresults]
|
return []
|
|
@classmethod
|
def count_today_updated_volume_codes(cls):
|
"""
|
查询今日已经更新量的个数
|
@return:
|
"""
|
fresults = cls.__mysql.select_one(
|
f"select count(code) from kpl_zylt_volume where update_time >= '{tool.get_now_date_str()}'")
|
if fresults:
|
return fresults[0]
|
return 0
|
|
@classmethod
|
def get(cls, code):
|
val = RedisUtils.get(_redisManager.getRedis(), "zyltgb-{}".format(code))
|
if val is not None:
|
return int(val)
|
return None
|
|
|
if __name__ == "__main__":
|
# previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
|
# if history_k_data_util.HistoryKDatasUtils.get_next_trading_date(previous_trading_date) != tool.get_now_date_str():
|
# print("非交易日")
|
# else:
|
# print("交易日")
|
# zylt = kpl_api.getZYLTAmount("000333")
|
# print(zylt)
|
update_all_zylt_volumes()
|
input()
|