"""
|
华鑫目标代码管理
|
"""
|
import logging
|
import queue
|
import time
|
|
import constant
|
from code_attribute import global_data_loader, code_volumn_manager, first_target_code_data_processor, gpcode_manager
|
from code_attribute.code_data_util import ZYLTGBUtil
|
from db import redis_manager_delegate as redis_manager
|
from log_module import async_log_util
|
from log_module.log import logger_l2_codes_subscript, logger_debug
|
from third_data import kpl_data_manager, kpl_api, history_k_data_manager
|
from trade import current_price_process_manager
|
from utils import tool, global_util, init_data_util
|
|
redisManager = redis_manager.RedisManager(4)
|
l2_codes_queue = queue.Queue()
|
|
|
# 华鑫Level2订阅代码管理
|
class HuaXinL2SubscriptCodesManager:
|
__L2_CODE_KEY = "huaxin_l2_code_list"
|
__SUBSCRIPT_KEY = "huaxin_subscript_codes"
|
__subscript_codes = []
|
|
@classmethod
|
def __get_redis(cls):
|
return redisManager.getRedis()
|
|
@classmethod
|
def clear(cls):
|
# cls.__get_redis().delete(cls.__L2_CODE_KEY)
|
l2_codes_queue.clear()
|
|
@classmethod
|
def push(cls, datas, request_id=None):
|
l2_codes_queue.put_nowait((int(time.time()), datas, request_id))
|
async_log_util.info(logger_l2_codes_subscript, "加入L2代码处理队列:数量-{}", len(datas))
|
# cls.__get_redis().lpush(cls.__L2_CODE_KEY, json.dumps())
|
|
@classmethod
|
def pop(cls):
|
return l2_codes_queue.get()
|
# val = cls.__get_redis().lpop(cls.__L2_CODE_KEY)
|
|
# 设置订阅代码
|
@classmethod
|
def save_subscript_codes(cls, codes):
|
cls.__subscript_codes = codes
|
# cls.__get_redis().setex(cls.__SUBSCRIPT_KEY, tool.get_expire(), json.dumps(codes))
|
|
# 获取订阅的代码
|
@classmethod
|
def get_subscript_codes(cls):
|
return cls.__subscript_codes
|
|
|
# 根据华鑫L2选出目标代码
|
class HuaXinL1TargetCodesManager:
|
__current_price_dict = {}
|
|
@classmethod
|
def set_level_1_codes_datas(cls, datas, request_id=None):
|
async_log_util.info(logger_l2_codes_subscript, f"({request_id})接受到L1的数据,开始预处理")
|
yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
|
fixed_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
|
if fixed_codes is None:
|
fixed_codes = set()
|
# 订阅的代码
|
flist = []
|
temp_volumns = []
|
|
# 预加载自由流通股本
|
if not global_util.zyltgb_map:
|
global_data_loader.load_zyltgb()
|
|
# 首先加载涨停价
|
need_get_limit_up_codes = []
|
for d in datas:
|
code = d[0]
|
if not gpcode_manager.get_limit_up_price(code):
|
need_get_limit_up_codes.append(code)
|
if need_get_limit_up_codes:
|
# 加载昨日收盘价
|
async_log_util.info(logger_l2_codes_subscript, f"({request_id})准备加载昨日收盘价")
|
history_k_data_manager.re_set_price_pres(need_get_limit_up_codes, True)
|
async_log_util.info(logger_l2_codes_subscript, f"({request_id})昨日收盘价加载完成")
|
|
for d in datas:
|
code = d[0]
|
if not tool.is_can_buy_code(code):
|
continue
|
# 如果现价是0.0就取买1价
|
price = d[1] if d[1] > 0 else d[5]
|
# 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
|
# 剔除昨日涨停的票且不在固定代码中的票
|
if code in yesterday_codes and code not in fixed_codes:
|
continue
|
# 剔除股价大于40块的票
|
if price > constant.MAX_SUBSCRIPT_CODE_PRICE:
|
continue
|
|
# -------获取自由流通市值-------
|
if code not in global_util.zyltgb_map:
|
# 获取自由流通量
|
zylt = None
|
zylt_volume = global_util.zylt_volume_map.get(code)
|
if zylt_volume and price > 0:
|
zylt = zylt_volume * price
|
if not zylt:
|
try:
|
__start_time = time.time()
|
zylt = kpl_api.getZYLTAmount(code)
|
async_log_util.info(logger_l2_codes_subscript,
|
f"{request_id} {code}获取自由流通市值耗时-{round((time.time() - __start_time) * 1000)}ms")
|
# 保存自由流通股本
|
if zylt:
|
ZYLTGBUtil.save_async(code, zylt, price)
|
except:
|
pass
|
if zylt:
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if not limit_up_price:
|
history_k_data_manager.re_set_price_pres([code], True)
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price:
|
zylt = int(zylt / price * float(limit_up_price))
|
global_util.zyltgb_map[code] = int(zylt)
|
# 保存今日实时量
|
temp_volumns.append((code, d[3]))
|
|
zyltgb = 0
|
if code in global_util.zyltgb_map:
|
zyltgb = global_util.zyltgb_map[code]
|
# 量的单位为手(不是股)
|
# 价格以买1价格处理
|
fitem = {"code": code, "price": price, "volume": d[3] // (10000 * 100), "volumeUnit": 1,
|
"time": "00:00:00",
|
"zyltgb": zyltgb // 10000, "zyltgbUnit": 1}
|
flist.append(fitem)
|
code_volumn_manager.CodeVolumeManager().set_today_volumns(temp_volumns)
|
async_log_util.info(logger_l2_codes_subscript, f"{request_id}接受到L1的数据,预处理完成")
|
try:
|
tick_datas = first_target_code_data_processor.process_first_codes_datas(flist, request_id)
|
current_price_process_manager.accept_prices(tick_datas, request_id)
|
except Exception as e:
|
logger_debug.exception(e)
|
finally:
|
async_log_util.info(logger_l2_codes_subscript, f"({request_id})L1数据整体处理结束")
|
|
|
if __name__ == "__main__":
|
code = "002703"
|
if code not in global_util.zyltgb_map:
|
zylt = kpl_api.getZYLTAmount(code)
|
if zylt:
|
# 保存自由流通股本
|
ZYLTGBUtil.save(code, zylt // 10000, 1)
|
global_util.zyltgb_map[code] = int(zylt)
|
zyltgb = 0
|
if code in global_util.zyltgb_map:
|
zyltgb = global_util.zyltgb_map[code]
|
print(zyltgb)
|