import threading from third_data import history_k_data_util, kpl_api from db import redis_manager_delegate as redis_manager from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils from utils import tool __db = 0 _redisManager = redis_manager.RedisManager(0) def update_all_zylt_volumes(): """ 更新所有目标代码的自由流通股本 @return: """ # 判断当前是否是交易日 previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str()) if history_k_data_util.HistoryKDatasUtils.get_next_trading_date(previous_trading_date) != tool.get_now_date_str(): # 非交易日 return 0 def update_zylt_volume(codes): # 获取最近的价格 price_datas = history_k_data_util.JueJinApi.get_gp_current_info(codes, "symbol,price") price_dict = {x['symbol'].split(".")[1]: x['price'] for x in price_datas} for code in price_dict: try: zylt = kpl_api.getZYLTAmount(code) ZYLTGBUtil.save_volume(code, int(zylt / price_dict[code])) except: pass # 刷新目标代码的自由流通量 codes_sh, codes_sz = l1_subscript_codes_manager.get_codes(False) codes = set() if codes_sh: for code_byte in codes_sh: codes.add(code_byte.decode()) for code_byte in codes_sz: codes.add(code_byte.decode()) updated_codes = ZYLTGBUtil.get_today_updated_volume_codes() codes = codes - set(updated_codes) threading.Thread(target=lambda: update_zylt_volume(codes), daemon=True).start() return len(codes) # 自由流通股本工具类 class ZYLTGBUtil: __db = 0 __mysql = Mysqldb() @classmethod def save(cls, code, val, unit): RedisUtils.setex(_redisManager.getRedis(), "zyltgb-{}".format(code), tool.get_expire(), round(float(val) * 100000000) if int(unit) == 0 else round( float(val) * 10000)) @classmethod def save_async(cls, code, val, price): """ 异步保存自由流通股本 @param code: @param val: @param price: @return: """ RedisUtils.setex_async(cls.__db, "zyltgb-{}".format(code), tool.get_expire(), val) if price > 0: zylt_volume = int(int(val) / float(price)) cls.save_volume(code, zylt_volume) @classmethod def save_volume(cls, code, zylt_volume): result = cls.__mysql.select_one(f"select * from kpl_zylt_volume where code = {code}") if result: cls.__mysql.execute( f"update kpl_zylt_volume set zylt_volume = {zylt_volume}, update_time=now() where code = {code}") else: cls.__mysql.execute( f"insert into kpl_zylt_volume(code, zylt_volume, create_time, update_time) values ('{code}',{zylt_volume},now(),now())") @classmethod def get_today_updated_volume_codes(cls): """ 获取今日已经更新量的代码 @return: """ fresults = cls.__mysql.select_all( f"select code from kpl_zylt_volume where update_time >= '{tool.get_now_date_str()}'") if fresults: return [x[0] for x in fresults] return [] @classmethod def count_today_updated_volume_codes(cls): """ 查询今日已经更新量的个数 @return: """ fresults = cls.__mysql.select_one( f"select count(code) from kpl_zylt_volume where update_time >= '{tool.get_now_date_str()}'") if fresults: return fresults[0] return 0 @classmethod def get(cls, code): val = RedisUtils.get(_redisManager.getRedis(), "zyltgb-{}".format(code)) if val is not None: return int(val) return None if __name__ == "__main__": # previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str()) # if history_k_data_util.HistoryKDatasUtils.get_next_trading_date(previous_trading_date) != tool.get_now_date_str(): # print("非交易日") # else: # print("交易日") # zylt = kpl_api.getZYLTAmount("000333") # print(zylt) update_all_zylt_volumes() input()