Administrator
2024-10-31 b855b811e3753ffcb35f145c985bb32f4b550038
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
华鑫目标代码管理
"""
import logging
import queue
import time
 
import constant
from code_attribute import global_data_loader, code_volumn_manager, first_target_code_data_processor, gpcode_manager
from code_attribute.code_data_util import ZYLTGBUtil
from db import redis_manager_delegate as redis_manager
from log_module import async_log_util
from log_module.log import logger_l2_codes_subscript, logger_debug
from third_data import kpl_data_manager, kpl_api, history_k_data_manager
from trade import current_price_process_manager
from utils import tool, global_util, init_data_util
 
redisManager = redis_manager.RedisManager(4)
l2_codes_queue = queue.Queue()
 
 
# 华鑫Level2订阅代码管理
class HuaXinL2SubscriptCodesManager:
    __L2_CODE_KEY = "huaxin_l2_code_list"
    __SUBSCRIPT_KEY = "huaxin_subscript_codes"
    __subscript_codes = []
 
    @classmethod
    def __get_redis(cls):
        return redisManager.getRedis()
 
    @classmethod
    def clear(cls):
        # cls.__get_redis().delete(cls.__L2_CODE_KEY)
        l2_codes_queue.clear()
 
    @classmethod
    def push(cls, datas, request_id=None):
        l2_codes_queue.put_nowait((int(time.time()), datas, request_id))
        async_log_util.info(logger_l2_codes_subscript, "加入L2代码处理队列:数量-{}", len(datas))
        # cls.__get_redis().lpush(cls.__L2_CODE_KEY, json.dumps())
 
    @classmethod
    def pop(cls):
        return l2_codes_queue.get()
        # val = cls.__get_redis().lpop(cls.__L2_CODE_KEY)
 
    # 设置订阅代码
    @classmethod
    def save_subscript_codes(cls, codes):
        cls.__subscript_codes = codes
        # cls.__get_redis().setex(cls.__SUBSCRIPT_KEY, tool.get_expire(), json.dumps(codes))
 
    # 获取订阅的代码
    @classmethod
    def get_subscript_codes(cls):
        return cls.__subscript_codes
 
 
# 根据华鑫L2选出目标代码
class HuaXinL1TargetCodesManager:
    __current_price_dict = {}
 
    @classmethod
    def set_level_1_codes_datas(cls, datas, request_id=None):
        async_log_util.info(logger_l2_codes_subscript, f"({request_id})接受到L1的数据,开始预处理")
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        fixed_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
        if fixed_codes is None:
            fixed_codes = set()
        # 订阅的代码
        flist = []
        temp_volumns = []
 
        # 预加载自由流通股本
        if not global_util.zyltgb_map:
            global_data_loader.load_zyltgb()
 
        # 首先加载涨停价
        need_get_limit_up_codes = []
        for d in datas:
            code = d[0]
            if not gpcode_manager.get_limit_up_price(code):
                need_get_limit_up_codes.append(code)
        if need_get_limit_up_codes:
            # 加载昨日收盘价
            async_log_util.info(logger_l2_codes_subscript, f"({request_id})准备加载昨日收盘价")
            history_k_data_manager.re_set_price_pres(need_get_limit_up_codes, True)
            async_log_util.info(logger_l2_codes_subscript, f"({request_id})昨日收盘价加载完成")
 
        for d in datas:
            code = d[0]
            if not tool.is_can_buy_code(code):
                continue
            # 如果现价是0.0就取买1价
            price = d[1] if d[1] > 0 else d[5]
            # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
            # 剔除昨日涨停的票且不在固定代码中的票
            if code in yesterday_codes and code not in fixed_codes:
                continue
            # 剔除股价大于40块的票
            if price > constant.MAX_SUBSCRIPT_CODE_PRICE:
                continue
 
            # -------获取自由流通市值-------
            if code not in global_util.zyltgb_map:
                # 获取自由流通量
                zylt = None
                zylt_volume = global_util.zylt_volume_map.get(code)
                if zylt_volume and price > 0:
                    zylt = zylt_volume * price
                if not zylt:
                    try:
                        __start_time = time.time()
                        zylt = kpl_api.getZYLTAmount(code)
                        async_log_util.info(logger_l2_codes_subscript,
                                            f"{request_id} {code}获取自由流通市值耗时-{round((time.time() - __start_time) * 1000)}ms")
                        # 保存自由流通股本
                        if zylt:
                            ZYLTGBUtil.save_async(code, zylt, price)
                    except:
                        pass
                if zylt:
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if not limit_up_price:
                        history_k_data_manager.re_set_price_pres([code], True)
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price:
                        zylt = int(zylt / price * float(limit_up_price))
                    global_util.zyltgb_map[code] = int(zylt)
            # 保存今日实时量
            temp_volumns.append((code, d[3]))
 
            zyltgb = 0
            if code in global_util.zyltgb_map:
                zyltgb = global_util.zyltgb_map[code]
            # 量的单位为手(不是股)
            # 价格以买1价格处理
            fitem = {"code": code, "price": price, "volume": d[3] // (10000 * 100), "volumeUnit": 1,
                     "time": "00:00:00",
                     "zyltgb": zyltgb // 10000, "zyltgbUnit": 1}
            flist.append(fitem)
        code_volumn_manager.CodeVolumeManager().set_today_volumns(temp_volumns)
        async_log_util.info(logger_l2_codes_subscript, f"{request_id}接受到L1的数据,预处理完成")
        try:
            tick_datas = first_target_code_data_processor.process_first_codes_datas(flist, request_id)
            current_price_process_manager.accept_prices(tick_datas, request_id)
        except Exception as e:
            logger_debug.exception(e)
        finally:
            async_log_util.info(logger_l2_codes_subscript, f"({request_id})L1数据整体处理结束")
 
 
if __name__ == "__main__":
    code = "002703"
    if code not in global_util.zyltgb_map:
        zylt = kpl_api.getZYLTAmount(code)
        if zylt:
            # 保存自由流通股本
            ZYLTGBUtil.save(code, zylt // 10000, 1)
            global_util.zyltgb_map[code] = int(zylt)
    zyltgb = 0
    if code in global_util.zyltgb_map:
        zyltgb = global_util.zyltgb_map[code]
    print(zyltgb)