Administrator
2023-09-15 f324426b82216d30ffb6f3e57e82bb37efc2b985
订阅优化
7个文件已修改
62 ■■■■ 已修改文件
code_attribute/code_data_util.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_volumn_manager.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/constant.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_data_util.py
@@ -11,6 +11,7 @@
from db.redis_manager_delegate import RedisUtils
from utils import tool
__db = 0
_redisManager = redis_manager.RedisManager(0)
@@ -29,6 +30,8 @@
# 自由流通股本工具类
class ZYLTGBUtil:
    __db = 0
    @classmethod
    def save(cls, code, val, unit):
        RedisUtils.setex(_redisManager.getRedis(), "zyltgb-{}".format(code), tool.get_expire(),
@@ -36,6 +39,12 @@
                             float(val) * 10000))
    @classmethod
    def save_async(cls, code, val, unit):
        RedisUtils.setex_async(cls.__db, "zyltgb-{}".format(code), tool.get_expire(),
                               round(float(val) * 100000000) if int(unit) == 0 else round(
                                   float(val) * 10000))
    @classmethod
    def get(cls, code):
        val = RedisUtils.get(_redisManager.getRedis(), "zyltgb-{}".format(code))
        if val is not None:
code_attribute/code_volumn_manager.py
@@ -12,6 +12,7 @@
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_day_volumn
__db = 0
__redis_manager = redis_manager.RedisManager(0)
@@ -63,7 +64,6 @@
# datas:[(code, volumn)]
def set_today_volumns(datas):
    pipe = __redis_manager.getRedis().pipeline()
    for d in datas:
        code, volumn = d
        logger_day_volumn.info("code:{} volumn:{}".format(code, volumn))
@@ -72,8 +72,7 @@
        if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000:
            continue
        __today_volumn_cache[code] = volumn
        RedisUtils.setex(pipe, "volumn_today-{}".format(code), tool.get_expire(), volumn)
    pipe.execute()
        RedisUtils.setex_async(__db, "volumn_today-{}".format(code), tool.get_expire(), volumn)
# 获取今日量
huaxin_client/constant.py
@@ -5,3 +5,5 @@
SERVER_PORT = 10008
TEST = True
L1_MIN_RATE = 4.0
L2_CODES_INFO_PATH = "/home/userzjj/logs/l2_codes.txt"
huaxin_client/l1_client.py
@@ -127,7 +127,7 @@
    type_ = "set_target_codes"
    request_id = f"sb_{int(time.time() * 1000)}"
    fdata = json.dumps(
        {"type": type_, "data": {"data": datas}, "request_id": request_id})
        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
    if pipe_l2 is not None:
        pipe_l2.send(fdata)
    # 记录新增加的代码
huaxin_client/l2_client.py
@@ -146,6 +146,25 @@
            self.__process_codes_data(codes_data)
        except Exception as e:
            logger_l2_codes_subscript.exception(e)
        finally:
            # 保存一份最新的数据
            self.__set_latest_datas(codes_data)
    @classmethod
    def __set_latest_datas(cls, codes_data):
        data_str = json.dumps([tool.get_now_date_str(), codes_data])
        with open(constant.L2_CODES_INFO_PATH, mode='w') as f:
            f.write(data_str)
    @classmethod
    def __get_latest_datas(cls):
        if os.path.exists(constant.L2_CODES_INFO_PATH):
            with open(constant.L2_CODES_INFO_PATH, mode='r') as f:
                str_ = f.readline()
                data_json = json.loads(str_)
                if data_json[0] == tool.get_now_date_str():
                    return data_json[1]
        return []
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为2s
@@ -168,9 +187,10 @@
        if pRspInfo['ErrorID'] == 0:
            print("----L2行情登录成功----")
            self.is_login = True
            # t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True)
            # # 后台运行
            # t1.start()
            # 初始设置值
            t1 = threading.Thread(target=lambda: self.__process_codes_data(self.__get_latest_datas()), daemon=True)
            # 后台运行
            t1.start()
    def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
        print("OnRspSubMarketData")
@@ -186,7 +206,8 @@
        # try:
        print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
              pRspInfo["ErrorMsg"])
        async_log_util.info(logger_local_huaxin_l2_subscript, f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        async_log_util.info(logger_local_huaxin_l2_subscript,
                            f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
        if pRspInfo["ErrorID"] == 0:
            print("订阅成功")
            self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
l2/huaxin/huaxin_target_codes_manager.py
@@ -68,6 +68,11 @@
        # 订阅的代码
        flist = []
        temp_volumns = []
        # 预加载自由流通股本
        if not global_util.zyltgb_map:
            global_data_loader.load_zyltgb()
        for d in datas:
            code = d[0]
            # 格式 (代码,现价,涨幅,量,更新时间)
@@ -79,14 +84,10 @@
                continue
            # 获取自由流通市值
            if code not in global_util.zyltgb_map:
                zylt = ZYLTGBUtil.get(code)
                if zylt:
                    global_util.zyltgb_map[code] = zylt
            if code not in global_util.zyltgb_map:
                zylt = kpl_api.getZYLTAmount(code)
                if zylt:
                    # 保存自由流通股本
                    ZYLTGBUtil.save(code, zylt // 10000, 1)
                    ZYLTGBUtil.save_async(code, zylt // 10000, 1)
                    global_util.zyltgb_map[code] = int(zylt)
            # 保存今日实时量
            temp_volumns.append((code, d[3]))
trade/huaxin/trade_server.py
@@ -454,7 +454,11 @@
                    print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
                    # 大于10s的数据放弃处理
                    if type_ == "set_target_codes":
                        if time.time() * 1000 - timestamp > 10*1000:
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logging.exception(e)