'''
|
成交进度
|
'''
|
|
# 买入队列
|
import itertools
|
import json
|
|
import constant
|
from db import redis_manager
|
from db.redis_manager import RedisUtils
|
from utils import tool
|
import l2.l2_data_util
|
from log_module.log import logger_l2_trade_buy_queue, logger_l2_trade_buy_progress
|
|
|
class TradeBuyQueue:
|
__redis_manager = redis_manager.RedisManager(0)
|
|
def __init__(self):
|
self.last_buy_queue_data = {}
|
|
def __getRedis(self):
|
return self.__redis_manager.getRedis()
|
|
def __save_buy_queue_data(self, code, num_list):
|
key = "trade_buy_queue_data-{}".format(code)
|
RedisUtils.setex(self.__getRedis(), key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str())))
|
|
# 返回数据与更新时间
|
def __get_buy_queue_data(self, code):
|
key = "trade_buy_queue_data-{}".format(code)
|
val = self.__getRedis().get(key)
|
if val is None:
|
return None, None
|
val = json.loads(val)
|
return val[0], [1]
|
|
def __save_buy_progress_index(self, code, index, is_default):
|
key = "trade_buy_progress_index-{}".format(code)
|
RedisUtils.setex(self.__getRedis(), key, tool.get_expire(), json.dumps((index, is_default)))
|
# 返回数据与更新时间
|
|
def __get_buy_progress_index(self, code):
|
key = "trade_buy_progress_index-{}".format(code)
|
val = self.__getRedis().get(key)
|
if val is None:
|
return None, True
|
val = json.loads(val)
|
return int(val[0]), bool(val[1])
|
|
# 最近的非涨停买1的时间
|
def __save_latest_not_limit_up_time(self, code, time_str):
|
key = "latest_not_limit_up_time-{}".format(code)
|
RedisUtils.setex(self.__getRedis(), key, tool.get_expire(), time_str)
|
|
def __get_latest_not_limit_up_time(self, code):
|
key = "latest_not_limit_up_time-{}".format(code)
|
if not constant.TEST:
|
return self.__getRedis().get(key)
|
return None
|
|
# 保存数据,返回保存数据的条数
|
def save(self, code, limit_up_price, buy_1_price, buy_1_time, queues):
|
# 2个以上的数据才有处理价值
|
if not queues or len(queues) < 2:
|
return None
|
# 如果买1不为涨停价就不需要保存
|
old_queues = self.last_buy_queue_data.get(code)
|
if old_queues and len(old_queues) == len(queues):
|
# 元素相同就不需要再次处理
|
old_str = ",".join([str(k) for k in old_queues[1:]])
|
new_str = ",".join([str(k) for k in queues[1:]])
|
if old_str == new_str:
|
return None
|
self.last_buy_queue_data[code] = queues
|
|
if abs(float(buy_1_price) - float(limit_up_price)) >= 0.01:
|
# 保存最近的涨停起始时间
|
self.__save_latest_not_limit_up_time(code, buy_1_time)
|
return None
|
min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100))
|
num_list = []
|
# 忽略第一条数据
|
for i in range(1, len(queues)):
|
num = queues[i]
|
if num > min_num and len(num_list) < 4:
|
num_list.append(num)
|
# 保存列表
|
self.__save_buy_queue_data(code, num_list)
|
return num_list
|
|
# 保存成交索引
|
def compute_traded_index(self, code, buy1_price, buyQueueBig, exec_time=None):
|
total_datas = l2.l2_data_util.local_today_datas.get(code)
|
today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code)
|
index = None
|
if True:
|
# 最多5个数据
|
buyQueueBigTemp = buyQueueBig
|
last_index, is_default = self.get_traded_index(code)
|
c_last_index = 0
|
if not is_default and last_index is not None:
|
c_last_index = last_index
|
latest_not_limit_up_time = self.__get_latest_not_limit_up_time(code)
|
# 如果是3个/4个数据找不到就调整顺序
|
fbuyQueueBigTempList = []
|
if 3 <= len(buyQueueBigTemp) <= 4:
|
buyQueueBigTempList = itertools.permutations(buyQueueBigTemp, len(buyQueueBigTemp))
|
for tempQueue in buyQueueBigTempList:
|
if list(tempQueue) != buyQueueBigTemp:
|
fbuyQueueBigTempList.append(tempQueue)
|
fbuyQueueBigTempList.insert(0, buyQueueBigTemp)
|
for temp in fbuyQueueBigTempList:
|
try:
|
index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(code, buy1_price, total_datas,
|
today_num_operate_map,
|
temp,
|
c_last_index,
|
latest_not_limit_up_time
|
)
|
if index is not None:
|
# 判断位置是否大于执行位2s
|
if exec_time and tool.trade_time_sub(total_datas[index]["val"]["time"], exec_time) > 5:
|
# 位置是否大于执行位2s表示无效
|
index = None
|
continue
|
# 只能削减一半以下才能终止
|
if len(temp) * 2 < len(buyQueueBig):
|
index = None
|
break
|
except:
|
pass
|
|
if index is not None:
|
logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
|
logger_l2_trade_buy_progress.info(
|
f"确定交易进度成功:code-{code} index-{index} queues:{buyQueueBig} last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}")
|
# 保存成交进度
|
# self.__save_buy_progress_index(code, index, False)
|
return index
|
else:
|
logger_l2_trade_buy_progress.warning(
|
f"确定交易进度失败:code-{code} queues:{buyQueueBig} last_index-{c_last_index} latest_not_limit_up_time-{latest_not_limit_up_time} exec_time-{exec_time}")
|
return index
|
|
# 获取成交进度索引
|
def get_traded_index(self, code):
|
index, is_default = self.__get_buy_progress_index(code)
|
return index, is_default
|
|
def set_traded_index(self, code, index):
|
self.__save_buy_progress_index(code, index, False)
|
|
|
if __name__ == '__main__':
|
a = [1, 2, 3, 4]
|
results = [str(k) for k in a]
|
b = [1, 2, 3]
|
result = (",".join([str(k) for k in a]) == ",".join([str(k) for k in b]))
|
print(result)
|