| | |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from utils import tool |
| | | |
| | | __db = 0 |
| | | _redisManager = redis_manager.RedisManager(0) |
| | | |
| | | |
| | |
| | | |
| | | # 自由流通股本工具类 |
| | | class ZYLTGBUtil: |
| | | __db = 0 |
| | | |
| | | @classmethod |
| | | def save(cls, code, val, unit): |
| | | RedisUtils.setex(_redisManager.getRedis(), "zyltgb-{}".format(code), tool.get_expire(), |
| | |
| | | float(val) * 10000)) |
| | | |
| | | @classmethod |
| | | def save_async(cls, code, val, unit): |
| | | RedisUtils.setex_async(cls.__db, "zyltgb-{}".format(code), tool.get_expire(), |
| | | round(float(val) * 100000000) if int(unit) == 0 else round( |
| | | float(val) * 10000)) |
| | | |
| | | @classmethod |
| | | def get(cls, code): |
| | | val = RedisUtils.get(_redisManager.getRedis(), "zyltgb-{}".format(code)) |
| | | if val is not None: |
| | |
| | | from db import redis_manager_delegate as redis_manager |
| | | from log_module.log import logger_day_volumn |
| | | |
| | | __db = 0 |
| | | __redis_manager = redis_manager.RedisManager(0) |
| | | |
| | | |
| | |
| | | |
| | | # datas:[(code, volumn)] |
| | | def set_today_volumns(datas): |
| | | pipe = __redis_manager.getRedis().pipeline() |
| | | for d in datas: |
| | | code, volumn = d |
| | | logger_day_volumn.info("code:{} volumn:{}".format(code, volumn)) |
| | |
| | | if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000: |
| | | continue |
| | | __today_volumn_cache[code] = volumn |
| | | RedisUtils.setex(pipe, "volumn_today-{}".format(code), tool.get_expire(), volumn) |
| | | pipe.execute() |
| | | RedisUtils.setex_async(__db, "volumn_today-{}".format(code), tool.get_expire(), volumn) |
| | | |
| | | |
| | | # 获取今日量 |
| | |
| | | SERVER_PORT = 10008 |
| | | TEST = True |
| | | L1_MIN_RATE = 4.0 |
| | | L2_CODES_INFO_PATH = "/home/userzjj/logs/l2_codes.txt" |
| | | |
| | |
| | | type_ = "set_target_codes" |
| | | request_id = f"sb_{int(time.time() * 1000)}" |
| | | fdata = json.dumps( |
| | | {"type": type_, "data": {"data": datas}, "request_id": request_id}) |
| | | {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}) |
| | | if pipe_l2 is not None: |
| | | pipe_l2.send(fdata) |
| | | # 记录新增加的代码 |
| | |
| | | self.__process_codes_data(codes_data) |
| | | except Exception as e: |
| | | logger_l2_codes_subscript.exception(e) |
| | | finally: |
| | | # 保存一份最新的数据 |
| | | self.__set_latest_datas(codes_data) |
| | | |
| | | @classmethod |
| | | def __set_latest_datas(cls, codes_data): |
| | | data_str = json.dumps([tool.get_now_date_str(), codes_data]) |
| | | with open(constant.L2_CODES_INFO_PATH, mode='w') as f: |
| | | f.write(data_str) |
| | | |
| | | @classmethod |
| | | def __get_latest_datas(cls): |
| | | if os.path.exists(constant.L2_CODES_INFO_PATH): |
| | | with open(constant.L2_CODES_INFO_PATH, mode='r') as f: |
| | | str_ = f.readline() |
| | | data_json = json.loads(str_) |
| | | if data_json[0] == tool.get_now_date_str(): |
| | | return data_json[1] |
| | | return [] |
| | | |
| | | def set_code_special_watch_volume(self, code, volume): |
| | | # 有效期为2s |
| | |
| | | if pRspInfo['ErrorID'] == 0: |
| | | print("----L2行情登录成功----") |
| | | self.is_login = True |
| | | # t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True) |
| | | # # 后台运行 |
| | | # t1.start() |
| | | # 初始设置值 |
| | | t1 = threading.Thread(target=lambda: self.__process_codes_data(self.__get_latest_datas()), daemon=True) |
| | | # 后台运行 |
| | | t1.start() |
| | | |
| | | def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubMarketData") |
| | |
| | | # try: |
| | | print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], |
| | | pRspInfo["ErrorMsg"]) |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | |
| | | # 订阅的代码 |
| | | flist = [] |
| | | temp_volumns = [] |
| | | |
| | | # 预加载自由流通股本 |
| | | if not global_util.zyltgb_map: |
| | | global_data_loader.load_zyltgb() |
| | | |
| | | for d in datas: |
| | | code = d[0] |
| | | # 格式 (代码,现价,涨幅,量,更新时间) |
| | |
| | | continue |
| | | # 获取自由流通市值 |
| | | if code not in global_util.zyltgb_map: |
| | | zylt = ZYLTGBUtil.get(code) |
| | | if zylt: |
| | | global_util.zyltgb_map[code] = zylt |
| | | if code not in global_util.zyltgb_map: |
| | | zylt = kpl_api.getZYLTAmount(code) |
| | | if zylt: |
| | | # 保存自由流通股本 |
| | | ZYLTGBUtil.save(code, zylt // 10000, 1) |
| | | ZYLTGBUtil.save_async(code, zylt // 10000, 1) |
| | | global_util.zyltgb_map[code] = int(zylt) |
| | | # 保存今日实时量 |
| | | temp_volumns.append((code, d[3])) |
| | |
| | | print("收到来自L1的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | timestamp = val.get("time") |
| | | # 大于10s的数据放弃处理 |
| | | if type_ == "set_target_codes": |
| | | if time.time() * 1000 - timestamp > 10*1000: |
| | | continue |
| | | TradeServerProcessor.set_target_codes(val) |
| | | except Exception as e: |
| | | logging.exception(e) |