"""
|
交易数据股那里器
|
用于对交易临时数据(交易状态,代码状态等)进行管理
|
"""
|
import json
|
import time
|
|
# 交易撤销数据管理器
|
import constant
|
from db.redis_manager_delegate import RedisUtils
|
from utils import global_util, tool
|
import l2_data_util
|
from db import redis_manager_delegate as redis_manager
|
from log_module.log import logger_trade
|
|
|
class TradeCancelDataManager:
|
capture_time_dict = {}
|
|
# 保存截图时间
|
@classmethod
|
def save_l2_capture_time(cls, client_id, pos, code, capture_time):
|
cls.capture_time_dict["{}-{}-{}".format(client_id, pos, code)] = {"create_time": round(time.time() * 1000),
|
"capture_time": capture_time}
|
|
# 获取最近一次的截图时间
|
@classmethod
|
def get_latest_l2_capture_time(cls, client_id, pos, code):
|
val = cls.capture_time_dict.get("{}-{}-{}".format(client_id, pos, code))
|
if val is None:
|
return -1
|
# 间隔时间不能大于1s
|
if round(time.time() * 1000) - val["create_time"] > 1000:
|
return -1
|
return val["capture_time"]
|
|
# 获取l2数据的增长速度
|
@classmethod
|
def get_l2_data_grow_speed(cls, client_id, pos, code, add_datas, capture_time):
|
count = 0
|
for data in add_datas:
|
count += data["re"]
|
lastest_capture_time = cls.get_latest_l2_capture_time(client_id, pos, code)
|
if lastest_capture_time < 0:
|
raise Exception("获取上次l2数据截图时间出错")
|
return count / (capture_time - lastest_capture_time)
|
|
# 获取买入确认点的位置
|
@classmethod
|
def get_buy_sure_position(cls, index, speed, trade_time):
|
return index + round(speed * trade_time)
|
|
|
class TradeBuyDataManager:
|
__db = 0
|
redisManager = redis_manager.RedisManager(0)
|
buy_sure_position_dict = {}
|
__buy_position_info_cache = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(TradeBuyDataManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "buy_position_info-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
tool.CodeDataCacheUtil.set_cache(cls.__buy_position_info_cache, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 设置买入点的信息
|
# trade_time: 买入点截图时间与下单提交时间差值
|
# capture_time: 买入点截图时间
|
# last_data: 买入点最后一条数据
|
|
def set_buy_position_info(self, code, capture_time, trade_time, last_data, last_data_index):
|
val = (capture_time, trade_time, last_data, last_data_index)
|
tool.CodeDataCacheUtil.set_cache(self.__buy_position_info_cache, code, val)
|
RedisUtils.setex_async(self.__db, "buy_position_info-{}".format(code), tool.get_expire(),
|
json.dumps(val))
|
|
# 获取买入点信息
|
|
def get_buy_position_info(self, code):
|
val_str = RedisUtils.get(self.redisManager.getRedis(), "buy_position_info-{}".format(code))
|
if val_str is None:
|
return None, None, None, None
|
else:
|
val = json.loads(val_str)
|
return val[0], val[1], val[2], val[3]
|
|
def get_buy_position_info_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_position_info_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None, None, None, None
|
|
# 删除买入点信息
|
|
def remove_buy_position_info(self, code):
|
tool.CodeDataCacheUtil.clear_cache(self.__buy_position_info_cache, code)
|
RedisUtils.delete_async(self.__db, "buy_position_info-{}".format(code))
|
|
# 设置买入确认点信息
|
|
def __set_buy_sure_position(self, code, index, data):
|
logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data)
|
key = "buy_sure_position-{}".format(code)
|
RedisUtils.setex(self.redisManager.getRedis(), key, tool.get_expire(), json.dumps((index, data)))
|
self.buy_sure_position_dict[code] = (index, data)
|
# 移除下单信号的详细信息
|
self.remove_buy_position_info(code)
|
|
# 清除买入确认点信息
|
|
def __clear_buy_sure_position(self, code):
|
key = "buy_sure_position-{}".format(code)
|
RedisUtils.delete(self.redisManager.getRedis(), key)
|
if code in self.buy_sure_position_dict:
|
self.buy_sure_position_dict.pop(code)
|
|
# 获取买入确认点信息
|
|
def get_buy_sure_position(self, code):
|
temp = self.buy_sure_position_dict.get(code)
|
if temp is not None:
|
return temp[0], temp[1]
|
|
key = "buy_sure_position-{}".format(code)
|
val = RedisUtils.get(self.redisManager.getRedis(), key)
|
if val is None:
|
return None, None
|
else:
|
val = json.loads(val)
|
self.buy_sure_position_dict[code] = (val[0], val[1])
|
return val[0], val[1]
|
|
# 处理买入确认点信息
|
|
def process_buy_sure_position_info(self, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas):
|
buy_capture_time, trade_time, l2_data, l2_data_index = self.get_buy_position_info_cache(code)
|
if buy_capture_time is None:
|
# 没有购买者信息
|
return None
|
if capture_time - buy_capture_time < trade_time:
|
# 时间未等待足够
|
return None
|
# 时间差是否相差2s及以上
|
old_time = l2_data["val"]["time"]
|
new_time = l2_latest_data["val"]["time"]
|
old_time_int = l2_data_util.get_time_as_seconds(old_time)
|
new_time_int = l2_data_util.get_time_as_seconds(new_time)
|
if new_time_int - old_time_int >= 2:
|
# 间隔2s及其以上表示数据异常
|
# 间隔2s以上的就以下单时间下一秒末尾作为确认点
|
start_index = l2_data_index
|
if len(l2_today_datas) - 1 > start_index:
|
for i in range(start_index + 1, len(l2_today_datas)):
|
_time = l2_today_datas[i]["val"]["time"]
|
if l2_data_util.get_time_as_seconds(_time) - old_time_int >= 2:
|
index = i - 1
|
data = l2_today_datas[index]
|
self.__set_buy_sure_position(code, index, data)
|
break
|
else:
|
self.__set_buy_sure_position(code, l2_data_index, l2_data)
|
elif new_time_int - old_time_int >= 0:
|
# 间隔2s内表示数据正常,将其位置设置为新增数据的中间位置
|
index = len(l2_today_datas) - 1 - (len(l2_add_datas)) // 2
|
data = l2_today_datas[index]
|
self.__set_buy_sure_position(code, index, data)
|
else:
|
# 间隔时间小于0 ,一般产生原因是数据回溯产生,故不做处理
|
logger_trade.warning("预估委托位置错误:数据间隔时间小于0 code-{}", code)
|
pass
|
|
|
# 代码实时价格管理器
|
class CodeActualPriceProcessor:
|
__under_water_last_time_cache = {}
|
__code_current_rate_cache = {}
|
__code_current_rate_latest = {}
|
__db = 0
|
__redisManager = redis_manager.RedisManager(0)
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(CodeActualPriceProcessor, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "under_water_last_time-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
tool.CodeDataCacheUtil.set_cache(cls.__under_water_last_time_cache, code, val)
|
|
keys = RedisUtils.keys(__redis, "code_current_rate-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
tool.CodeDataCacheUtil.set_cache(cls.__code_current_rate_cache, code, float(val))
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 保存跌价的时间
|
def __save_down_price_time(self, code, time_str):
|
key = "under_water_last_time-{}".format(code)
|
tool.CodeDataCacheUtil.set_cache(self.__under_water_last_time_cache, code, time_str)
|
RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), time_str)
|
|
def __remove_down_price_time(self, code):
|
key = "under_water_last_time-{}".format(code)
|
tool.CodeDataCacheUtil.clear_cache(self.__under_water_last_time_cache, code)
|
RedisUtils.delete(self.__get_redis(), key)
|
|
def __get_last_down_price_time(self, code):
|
key = "under_water_last_time-{}".format(code)
|
return RedisUtils.get(self.__get_redis(), key)
|
|
def __get_last_down_price_time_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__under_water_last_time_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def __increment_down_price_time(self, code, seconds):
|
key = "under_water_seconds-{}".format(code)
|
RedisUtils.incrby(
|
self.__get_redis(), key, seconds)
|
# 设置个失效时间
|
RedisUtils.expire(self.__get_redis(), key, tool.get_expire())
|
|
def __get_down_price_time_as_seconds(self, code):
|
key = "under_water_seconds-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None
|
else:
|
return int(val)
|
|
# 清除所有的水下捞数据
|
def clear_under_water_data(self):
|
key = "under_water_*"
|
keys = RedisUtils.keys(self.__get_redis(), key)
|
for k in keys:
|
RedisUtils.delete(self.__get_redis(), k)
|
|
def __save_current_price_codes_count(self, count):
|
key = "current_price_codes_count"
|
RedisUtils.setex(self.__get_redis(), key, 10, count)
|
|
def __get_current_price_codes_count(self):
|
key = "current_price_codes_count"
|
count = RedisUtils.get(self.__get_redis(), key)
|
return 0 if count is None else count
|
|
# 保存当前涨幅
|
def __save_current_rate(self, code, rate):
|
# 变化之后才会持久化
|
if self.__code_current_rate_latest.get(code) == rate:
|
return
|
self.__code_current_rate_latest[code] = rate
|
tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, code, rate)
|
key = "code_current_rate-{}".format(code)
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(), rate)
|
|
# 批量保存
|
def __save_current_rates(self, datas):
|
# 变化之后才会持久化
|
pipe = self.__get_redis().pipeline()
|
for d in datas:
|
if self.__code_current_rate_latest.get(d[0]) == d[1]:
|
continue
|
self.__code_current_rate_latest[d[0]] = d[1]
|
tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, d[0], d[1])
|
key = "code_current_rate-{}".format(d[0])
|
RedisUtils.setex(pipe, key, tool.get_expire(), d[1])
|
pipe.execute()
|
|
# 获取当前涨幅
|
def __get_current_rate(self, code):
|
key = "code_current_rate-{}".format(code)
|
rate = RedisUtils.get(self.__get_redis(), key)
|
if rate is not None:
|
return float(rate)
|
return None
|
|
def get_current_rate(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__code_current_rate_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None
|
|
def process_rate(self, code, rate, time_str):
|
# 保存目前的代码涨幅
|
self.__save_current_rate(code, rate)
|
# 9点半之前的数据不处理
|
if int(time_str.replace(":", "")) < int("093000"):
|
return
|
# now_str = tool.get_now_time_str()
|
if rate >= 0:
|
down_start_time = self.__get_last_down_price_time_cache(code)
|
if down_start_time is None:
|
return
|
else:
|
# 累计增加时间
|
time_second = tool.trade_time_sub(time_str, down_start_time)
|
self.__increment_down_price_time(code, time_second)
|
# 删除起始时间
|
self.__remove_down_price_time(code)
|
else:
|
# 记录开始值
|
if self.__get_last_down_price_time_cache(code) is None:
|
self.__save_down_price_time(code, time_str)
|
|
# datas:[(代码,比例)]
|
def process_rates(self, datas, time_str):
|
# 9点半之前的数据不处理
|
if int(time_str.replace(":", "")) < int("093000"):
|
return
|
# 保存目前的代码涨幅
|
self.__save_current_rates(datas)
|
|
# now_str = tool.get_now_time_str()
|
for d in datas:
|
code, rate = d[0], d[1]
|
if rate >= 0:
|
down_start_time = self.__get_last_down_price_time_cache(code)
|
if down_start_time is None:
|
continue
|
else:
|
# 累计增加时间
|
time_second = tool.trade_time_sub(time_str, down_start_time)
|
self.__increment_down_price_time(code, time_second)
|
# 删除起始时间
|
self.__remove_down_price_time(code)
|
else:
|
# 记录开始值
|
if self.__get_last_down_price_time_cache(code) is None:
|
self.__save_down_price_time(code, time_str)
|
|
# 保存现价
|
def save_current_price(self, code, price, is_limit_up):
|
global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time()))
|
pass
|
|
# 获取现价
|
def get_current_price(self, code):
|
return global_util.cuurent_prices.get(code)
|
|
# 现价代码数量
|
def save_current_price_codes_count(self, count):
|
self.__save_current_price_codes_count(count)
|
|
def get_current_price_codes_count(self):
|
return self.__get_current_price_codes_count()
|
|
# 是否为水下捞
|
def is_under_water(self, code, now_time=None):
|
time_seconds = self.__get_down_price_time_as_seconds(code)
|
if time_seconds is None:
|
return False
|
else:
|
if time_seconds >= constant.UNDER_WATER_PRICE_TIME_AS_SECONDS:
|
if now_time is None:
|
now_time = tool.get_now_time_str()
|
space = tool.trade_time_sub(now_time, "09:30:00")
|
if space > 0 and time_seconds / space >= 0.2:
|
return True
|
return False
|
|
# 当前代码是否涨停
|
def current_is_limit_up(self, code):
|
data = self.get_current_price(code)
|
if data is None:
|
return None
|
return data[1]
|
|
# 获取涨幅前几的代码
|
def get_top_rate_codes(self, top_n):
|
keys = "code_current_rate-*"
|
keys = RedisUtils.keys(self.__get_redis(), keys)
|
infos = []
|
for k in keys:
|
code = k.split("-")[1]
|
rate = self.get_current_rate(code)
|
infos.append((code, rate))
|
# 排序信息
|
sorted_infos = sorted(infos, key=lambda tup: tup[1], reverse=True)
|
sorted_infos = sorted_infos[:top_n]
|
codes = []
|
for data in sorted_infos:
|
codes.append(data[0])
|
return codes
|
|
|
# 涨停次数管理
|
class PlaceOrderCountManager:
|
__db = 0
|
__redisManager = redis_manager.RedisManager(0)
|
__place_order_count_cache = {}
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(PlaceOrderCountManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
redis_ = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(redis_, "place_order_count-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
count = RedisUtils.get(redis_, k)
|
cls.__place_order_count_cache[code] = int(count)
|
finally:
|
RedisUtils.realse(redis_)
|
|
def __incre_place_order_count(self, code):
|
if code not in self.__place_order_count_cache:
|
self.__place_order_count_cache[code] = 0
|
self.__place_order_count_cache[code] += 1
|
|
key = "place_order_count-{}".format(code)
|
RedisUtils.incrby_async(self.__db, key, 1)
|
RedisUtils.expire_async(self.__db, key, tool.get_expire())
|
|
def __get_place_order_count(self, code):
|
key = "place_order_count-{}".format(code)
|
count = RedisUtils.get(self.__get_redis(), key)
|
if count is not None:
|
return int(count)
|
return 0
|
|
def __get_place_order_count_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__place_order_count_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return 0
|
|
def place_order(self, code):
|
self.__incre_place_order_count(code)
|
|
def get_place_order_count(self, code):
|
return self.__get_place_order_count_cache(code)
|
|
def clear_place_order_count(self, code):
|
self.__place_order_count_cache[code] = 0
|
key = "place_order_count-{}".format(code)
|
RedisUtils.delete_async(self.__db, key)
|
|
|
if __name__ == "__main__":
|
processor = CodeActualPriceProcessor()
|
print(processor.get_top_rate_codes(30))
|