"""
|
L2的数据处理
|
"""
|
import json
|
|
from db import redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from utils import tool
|
from utils.tool import CodeDataCacheUtil
|
|
_db = 1
|
_redisManager = redis_manager.RedisManager(1)
|
|
|
# 下单临时信息
|
class OrderBeginPosInfo(object):
|
MODE_NORMAL = 0
|
MODE_FAST = 1
|
MODE_ACTIVE = 2
|
MODE_RADICAL = 3
|
|
# mode: 0-普通交易 1-快速交易
|
def __init__(self, buy_single_index=None, buy_exec_index=-1, buy_compute_index=None, num=0, count=0,
|
max_num_set=None, buy_volume_rate=None, sell_info=None, threshold_money=None, mode=0, mode_desc=None,
|
at_limit_up=False):
|
self.buy_single_index = buy_single_index
|
self.buy_exec_index = buy_exec_index
|
self.buy_compute_index = buy_compute_index
|
self.num = num
|
self.count = count
|
self.threshold_money = threshold_money
|
if max_num_set:
|
self.max_num_set = list(max_num_set)
|
else:
|
self.max_num_set = []
|
self.buy_volume_rate = buy_volume_rate
|
self.sell_info = sell_info
|
self.mode = mode
|
self.mode_desc = mode_desc
|
# 是否是板上买
|
self.at_limit_up = at_limit_up
|
|
def get_max_num_set(self):
|
if self.max_num_set:
|
return set(self.max_num_set)
|
return None
|
|
def to_json_str(self):
|
return json.dumps(vars(self))
|
|
def to_dict(self):
|
return vars(self)
|
|
@classmethod
|
def to_object(cls, json_str: str):
|
d = json.loads(json_str)
|
return OrderBeginPosInfo(**d)
|
|
|
class L2DataException(Exception):
|
# 价格不匹配
|
CODE_PRICE_ERROR = 1
|
# 无收盘价
|
CODE_NO_CLOSE_PRICE = 2
|
|
def __init__(self, code, msg):
|
super().__init__(self)
|
self.code = code
|
self.msg = msg
|
|
def __str__(self):
|
return self.msg
|
|
def get_code(self):
|
return self.code
|
|
|
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
|
class TradePointManager:
|
__db = 1
|
__redisManager = redis_manager.RedisManager(1)
|
__buy_compute_index_info_cache = {}
|
__buy_cancel_single_pos_cache = {}
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(TradePointManager, cls).__new__(cls, *args, **kwargs)
|
cls.load_data()
|
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
@classmethod
|
def load_data(cls):
|
redis_ = cls.__get_redis()
|
keys = RedisUtils.keys(redis_, "buy_compute_index_info-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(redis_, k)
|
order = OrderBeginPosInfo.to_object(val)
|
CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, order)
|
|
keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(redis_, k)
|
CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val))
|
|
# 删除买入点数据
|
def delete_buy_point(self, code):
|
CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code)
|
RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code))
|
|
# 获取买入点信息
|
# 返回数据为:买入点 累计纯买额 已经计算的数据索引
|
|
def get_buy_compute_start_data(self, code):
|
_key = "buy_compute_index_info-{}".format(code)
|
_data_json = RedisUtils.get(self.__get_redis(), _key)
|
if _data_json is None:
|
return None, None, None, 0, 0, [], 0
|
_data = json.loads(_data_json)
|
return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6]
|
|
def get_buy_compute_start_data_cache(self, code) -> OrderBeginPosInfo:
|
cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
|
return OrderBeginPosInfo()
|
|
# 设置买入点的值
|
# buy_single_index 买入信号位
|
# buy_exec_index 买入执行位
|
# compute_index 计算位置
|
# nums 累计纯买额
|
|
def set_buy_compute_start_data_v2(self, code, order: OrderBeginPosInfo):
|
expire = tool.get_expire()
|
_key = "buy_compute_index_info-{}".format(code)
|
data_ = None
|
if order.buy_single_index is not None:
|
data_ = order
|
else:
|
old_order = self.get_buy_compute_start_data_cache(code)
|
order.buy_single_index = old_order.buy_single_index
|
data_ = order
|
CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_)
|
RedisUtils.setex_async(self.__db, _key, expire, data_.to_json_str())
|
|
# 是否已经下单
|
@classmethod
|
def is_placed_order(cls, order_begin_pos: OrderBeginPosInfo):
|
return order_begin_pos and order_begin_pos.buy_exec_index is not None and order_begin_pos.buy_exec_index > -1
|
|
|
# 清除l2数据
|
def clear_l2_data(code):
|
redis_l2 = redis_manager.RedisManager(1).getRedis()
|
try:
|
keys = RedisUtils.keys(redis_l2, "l2-{}-*".format(code), auto_free=False)
|
for k in keys:
|
RedisUtils.delete(redis_l2, k, auto_free=False)
|
|
RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code), auto_free=False)
|
finally:
|
RedisUtils.realse(redis_l2)
|
|
|
second_930 = 9 * 3600 + 30 * 60 + 0
|
|
|
# 初始化l2固定代码库
|
def init_l2_fixed_codes():
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
try:
|
count = RedisUtils.scard(redis, key, auto_free=False)
|
if count > 0:
|
RedisUtils.delete(redis, key, auto_free=False)
|
RedisUtils.sadd(redis, key, "000000", auto_free=False)
|
RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False)
|
finally:
|
RedisUtils.realse(redis)
|
|
|
# 移除l2固定监控代码
|
def remove_from_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
RedisUtils.srem(_redisManager.getRedis(), key, code)
|
|
|
# 添加代码到L2固定监控
|
def add_to_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
RedisUtils.sadd(_redisManager.getRedis(), key, code)
|
RedisUtils.expire(_redisManager.getRedis(), key, tool.get_expire())
|
|
|
# 是否在l2固定监控代码中
|
def is_in_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
return RedisUtils.sismember(_redisManager.getRedis(), key, code)
|
|
|
if __name__ == "__main__":
|
code = "002886"
|
TradePointManager().set_buy_compute_start_data_v2(code, OrderBeginPosInfo(buy_single_index=10,
|
buy_exec_index=30,
|
buy_compute_index=40,
|
num=20000, count=10,
|
buy_volume_rate=0.6,
|
mode=OrderBeginPosInfo.MODE_NORMAL,
|
))
|
print(TradePointManager().get_buy_compute_start_data_cache(code).max_num_set)
|
RedisUtils.run_loop()
|