# 买1量管理
|
import decimal
|
import json
|
|
from code_attribute import gpcode_manager
|
from db import redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from utils import tool
|
from trade import trade_manager, trade_constant
|
|
|
class THSBuy1VolumnManager:
|
__db = 1
|
__redisManager = redis_manager.RedisManager(1)
|
__last_data = {}
|
__code_time_volumn_dict = {}
|
__max_buy1_volumn_cache = {}
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(THSBuy1VolumnManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "max_buy1_volumn-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
tool.CodeDataCacheUtil.set_cache(cls.__max_buy1_volumn_cache, code, int(val))
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 保存最大量
|
def __save_max_buy1_volume(self, code, volume):
|
tool.CodeDataCacheUtil.set_cache(self.__max_buy1_volumn_cache, code, volume)
|
key = "max_buy1_volumn-{}".format(code)
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(), volume)
|
|
def __get_max_buy1_volume(self, code):
|
key = "max_buy1_volumn-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is not None:
|
return int(val)
|
return None
|
|
def __del_max_buy1_volume(self, code):
|
tool.CodeDataCacheUtil.clear_cache(self.__max_buy1_volumn_cache, code)
|
key = "max_buy1_volumn-{}".format(code)
|
RedisUtils.delete_async(self.__db, key)
|
|
def __save_recod(self, code, time_str, volumn):
|
# 保存每一次的
|
key = "buy1_volumn-{}-{}".format(code, time_str)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn)
|
# 保存最近的
|
key = "buy1_volumn_latest_info-{}".format(code)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((time_str, volumn)))
|
|
# 保存上一次数据
|
def __save_last_recod(self, code, time_str, volumn):
|
# 保存最近的
|
key = "buy1_volumn_last_info-{}".format(code)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((time_str, volumn)))
|
|
def __get_last_record(self, code):
|
key = "buy1_volumn_last_info-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None, None
|
val = json.loads(val)
|
return val[0], val[1]
|
|
def __get_latest_record(self, code):
|
key = "buy1_volumn_latest_info-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None, None
|
val = json.loads(val)
|
return val[0], val[1]
|
|
# 添加记录
|
def __add_recod(self, code):
|
key = "buy1_volumn_codes"
|
RedisUtils.sadd(self.__get_redis(), key, code)
|
RedisUtils.expire(self.__get_redis(), key, 10)
|
|
# 获取当前正在监听的代码
|
def get_current_codes(self):
|
key = "buy1_volumn_codes"
|
return RedisUtils.smembers(self.__get_redis(), key)
|
|
def get_buy_1_volumn(self, code, time_str):
|
key = "buy1_volumn-{}-{}".format(code, time_str)
|
return RedisUtils.get(self.__get_redis(), key)
|
|
# 返回是否需要更新数据,是否需要撤单,撤单原因
|
def save(self, code, time_str, volumn, price):
|
# 客户端数据未加载出来过滤
|
if volumn < 1:
|
return False, False, None
|
# 14:55:00之后不在处理
|
if int(time_str.replace(':', '')) >= int("145500"):
|
return False, False, None
|
|
self.__add_recod(code)
|
# 判断是否为涨停价
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price != tool.to_price(decimal.Decimal(price)):
|
# 非涨停价
|
volumn = 0
|
last_volumn = self.__last_data.get(code)
|
# 不保存和上一次相同的数据
|
if code in self.__last_data and last_volumn == volumn:
|
return False, False, None
|
self.__last_data[code] = volumn
|
|
if int(time_str.replace(':', '')) >= int("093000"):
|
# 保存最大量(9:30过后的量)
|
max_volume = self.__get_max_buy1_volume(code)
|
if max_volume is None:
|
max_volume = 0
|
if volumn > max_volume:
|
self.__save_max_buy1_volume(code, volumn)
|
|
if code not in self.__code_time_volumn_dict:
|
self.__code_time_volumn_dict[code] = {}
|
self.__code_time_volumn_dict[code][time_str] = volumn
|
# 删除倒数第2个之前的数据
|
keys = []
|
for k in self.__code_time_volumn_dict[code].keys():
|
keys.append(k)
|
keys.sort(key=lambda val: int(val.replace(":", "")))
|
if len(keys) > 2:
|
for i in range(0, len(keys) - 2):
|
self.__code_time_volumn_dict[code].pop(keys[i])
|
keys = keys[len(keys) - 2:]
|
if len(keys) == 2:
|
self.__save_last_recod(code, keys[0], self.__code_time_volumn_dict[code][keys[0]])
|
|
self.__save_recod(code, time_str, volumn)
|
|
# 如果当前已挂单
|
state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
|
if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER:
|
# 判断本次与上一次的封单额是否小于5000w
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
threshold_num = 50000000 // (limit_up_price * 100)
|
|
if volumn < threshold_num and last_volumn < threshold_num:
|
# 下降趋势
|
if volumn < last_volumn:
|
if (last_volumn - volumn) / last_volumn > 0.5:
|
return True, True, "买1主动触发,连续两次封单量降幅达50%以上,时间:{} 封单量:{}-{}".format(time_str, last_volumn,
|
volumn)
|
# 当封单额小于1000万需要撤单
|
min_num = 10000000 // (limit_up_price * 100)
|
if volumn < min_num:
|
return True, True, "买1主动触发,最新封单额小于1000万,时间:{} 封单量:{}".format(time_str, volumn)
|
|
return True, False, None
|
|
# 获取校验数据
|
# 返回上一次的数据,如果没有上一次的就返回本次的
|
def get_verify_data(self, code):
|
time_str, volumn = self.__get_last_record(code)
|
if time_str is not None:
|
return time_str, volumn
|
time_str, volumn = self.__get_latest_record(code)
|
return time_str, volumn
|
|
def get_max_buy1_volume(self, code):
|
val = self.__get_max_buy1_volume(code)
|
if val is None:
|
return -1
|
return val
|
|
def get_max_buy1_volume_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__max_buy1_volumn_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return -1
|
|
def clear_max_buy1_volume(self, code):
|
self.__del_max_buy1_volume(code)
|
|
|
class JueJinBuy1VolumnManager:
|
__redisManager = redis_manager.RedisManager(1)
|
__last_data = {}
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
def __save_recod(self, code, time_str, volumn):
|
# 保存每一次的
|
key = "buy1_volumn_juejin-{}-{}".format(code, time_str)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn)
|
key = "buy1_volumn_juejin_latest_info-{}".format(code)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn)
|
|
def __get_latest_record(self, code):
|
key = "buy1_volumn_juejin_latest_info-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None, None
|
val = json.loads(val)
|
return val[0], val[1]
|
|
# 返回是否需要更新数据
|
def save(self, code, time_str, volumn, price):
|
|
# 判断是否为涨停价
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price != tool.to_price(decimal.Decimal(price)):
|
# 非涨停价
|
volumn = 0
|
# 不保存和上一次相同的数据
|
if code in self.__last_data and self.__last_data[code] == volumn:
|
return False
|
self.__last_data[code] = volumn
|
self.__save_recod(code, time_str, volumn)
|
return True
|
|
# 获取校验数据
|
# 返回上一次的数据,如果没有上一次的就返回本次的
|
def get_verify_data(self, code):
|
time_str, volumn = self.__get_latest_record(code)
|
return time_str, volumn
|
|
|
class thsl2tradequeuemanager:
|
__redisManager = redis_manager.RedisManager(0)
|
__filter_dict = {}
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
def __save_latest_recod(self, code, info):
|
# 保存每一次的
|
key = "ths_l2_latest_trade_info-{}".format(code)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps(info))
|
|
def __get_latest_record(self, code):
|
key = "ths_l2_latest_trade_info-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None
|
return json.loads(val)
|
|
# 添加记录
|
|
def __add_buy1_code(self, code):
|
key = "buy1_volumn_codes"
|
RedisUtils.sadd(self.__get_redis(), key, code)
|
RedisUtils.expire(self.__get_redis(), key, 10)
|
|
# 获取当前正在监听的代码
|
|
def get_current_codes(self):
|
key = "buy1_volumn_codes"
|
return RedisUtils.smembers(self.__get_redis(), key)
|
|
def save_recod(self, code, data):
|
_str = json.dumps(data)
|
if code in self.__filter_dict and self.__filter_dict[code] == _str:
|
return False
|
# 添加买1记录
|
self.__add_buy1_code(code)
|
self.__filter_dict[code] = _str
|
self.__save_latest_recod(code, data)
|
buy_time = data["buyTime"]
|
buy_one_price = data["buyOnePrice"]
|
buy_one_volumn = data["buyOneVolumn"]
|
sell_time = data["sellTime"]
|
sell_one_price = data["sellOnePrice"]
|
sell_one_volumn = data["sellOneVolumn"]
|
return True
|
|
def get_sell1_info(self, code):
|
data = self.__get_latest_record(code)
|
if data is None:
|
return None, None, None
|
else:
|
sell_time = data["sellTime"]
|
sell_one_price = data["sellOnePrice"]
|
sell_one_volumn = data["sellOneVolumn"]
|
return sell_time, sell_one_price, int(sell_one_volumn)
|
|
|
if __name__ == '__main__':
|
thsl2tradequeuemanager().test()
|