"""
|
交易管理器,
|
对一系列的代码交易变量,下单,撤单进行管理
|
"""
|
# 交易管理器
|
import time
|
|
import dask
|
|
from db import mysql_data, redis_manager
|
from trade import trade_data_manager, l2_trade_util
|
from trade.trade_gui import THSBuyWinManagerNew, THSGuiTrade
|
import time as t
|
from l2 import l2_data_manager, l2_data_log
|
|
from log import *
|
|
__redis_manager = redis_manager.RedisManager(2)
|
|
# 未交易
|
TRADE_STATE_NOT_TRADE = 0
|
# 下单
|
TRADE_STATE_BUY_PLACE_ORDER = 10
|
# 已委托买
|
TRADE_STATE_BUY_DELEGATED = 11
|
# 委托进行中
|
TRADE_STATE_BUY_CANCEL_ING = 13
|
# 撤销成功
|
TRADE_STATE_BUY_CANCEL_SUCCESS = 14
|
# 买成功
|
TRADE_STATE_BUY_SUCCESS = 12
|
|
guiTrade = THSGuiTrade()
|
|
|
# 获取交易状态
|
def get_trade_state(code):
|
redis = __redis_manager.getRedis()
|
state = redis.get("trade-state-{}".format(code))
|
if state is None:
|
return TRADE_STATE_NOT_TRADE
|
return int(state)
|
|
|
# 设置交易状态
|
def set_trade_state(code, state):
|
logger_trade.info("set_trade_state {}-{}".format(code, state))
|
redis = __redis_manager.getRedis()
|
redis.setex("trade-state-{}".format(code), tool.get_expire(), state)
|
|
|
def get_codes_by_trade_state(state):
|
redis = __redis_manager.getRedis()
|
keys = redis.keys("trade-state-*")
|
codes = []
|
if keys is not None:
|
for key in keys:
|
if int(redis.get(key)) == state:
|
codes.append(key.replace("trade-state-", ''))
|
return codes
|
|
|
# 设置交易账户的可用金额
|
def set_available_money(client_id, money):
|
redis = __redis_manager.getRedis()
|
redis.set("trade-account-canuse-money", money)
|
|
|
# 获取交易账户的可用金额
|
def get_available_money():
|
redis = __redis_manager.getRedis()
|
result = redis.get("trade-account-canuse-money")
|
if result is None:
|
return None
|
return round(float(result), 2)
|
|
|
# 保存交易成功的数据
|
def save_trade_success_data(datas):
|
day = datetime.datetime.now().strftime("%Y%m%d")
|
redis = __redis_manager.getRedis()
|
time_str = tool.get_now_time_str()
|
redis.setex("trade-success-latest-time", tool.get_expire(), time_str)
|
mysqldb = mysql_data.Mysqldb()
|
# 合并同一合同编号
|
dict_ = {}
|
for data in datas:
|
trade_num = data["trade_num"]
|
if trade_num not in dict_:
|
dict_[trade_num] = data
|
else:
|
# 合并成交数量与成交金额
|
dict_[trade_num]["num"] = int(dict_[trade_num]["num"]) + int(data["num"])
|
dict_[trade_num]["money"] = round(float(dict_[trade_num]["money"]) + float(data["money"]), 3)
|
|
for key in dict_:
|
data = dict_[key]
|
_time = data["time"]
|
# 过滤错误数据
|
if _time == "00:00:00":
|
continue
|
data["_id"] = "{}_{}".format(day, data["trade_num"])
|
data["day"] = day
|
data["create_time"] = int(round(t.time() * 1000))
|
counts = mysqldb.select_one("select count(*) from ths_trade_success_record where _id='{}'".format(data["_id"]))
|
if counts[0] < 1:
|
mysqldb.execute(
|
"insert into ths_trade_success_record(_id,code,money,num,price,time,trade_num,type,day,create_time) values('{}','{}','{}','{}','{}','{}','{}',{},'{}',{})".format(
|
data["_id"], data["code"], data["money"], data["num"], data["price"], data["time"],
|
data["trade_num"], data["type"], data["day"], round(t.time() * 1000)))
|
else:
|
mysqldb.execute(
|
"update ths_trade_success_record set money=%s, num=%s, price=%s,time=%s,trade_num=%s,type=%s where _id=%s",
|
(
|
data["money"], data["num"], data["price"], data["time"], data["trade_num"], data["type"],
|
data["_id"]))
|
|
|
# 保存交易委托数据
|
def save_trade_delegate_data(datas):
|
day = datetime.datetime.now().strftime("%Y%m%d")
|
time_str = tool.get_now_time_str()
|
mysqldb = mysql_data.Mysqldb()
|
for data in datas:
|
data["_id"] = "{}-{}-{}".format(day, data["code"], data["time"])
|
data["day"] = day
|
data["create_time"] = int(round(t.time() * 1000))
|
counts = mysqldb.select_one("select count(*) from ths_trade_delegate_record where _id='{}'".format(data["_id"]))
|
if counts[0] < 1:
|
mysqldb.execute(
|
"insert into ths_trade_delegate_record(_id,code,num,price,time,trade_num,trade_price,type,day,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
|
(
|
data["_id"], data["code"], data["num"], data["price"], data["time"],
|
data["trade_num"], data["trade_price"], data["type"], data["day"], round(t.time() * 1000)))
|
|
# 保存最新的委托数据
|
redis = __redis_manager.getRedis()
|
redis.setex("trade-delegate-latest", tool.get_expire(), json.dumps(datas))
|
redis.setex("trade-delegate-latest-time", tool.get_expire(), time_str)
|
|
|
# 获取交易成功数据
|
def get_trade_success_data():
|
redis = __redis_manager.getRedis()
|
day = datetime.datetime.now().strftime("%Y%m%d")
|
mysqldb = mysql_data.Mysqldb()
|
results = mysqldb.select_all("select * from ths_trade_success_record where day='{}'".format(day))
|
datas = []
|
for result in results:
|
data = {"_id": result[0], "code": result[1], "money": result[2], "num": result[3], "price": result[4],
|
"time": result[5], "trade_num": result[6], "type": result[7], "day": result[8],
|
"create_time": result[9]}
|
datas.append(data)
|
return datas, redis.get("trade-success-latest-time")
|
|
|
# 获取交易委托数据
|
def get_trade_delegate_data():
|
redis = __redis_manager.getRedis()
|
result = redis.get("trade-delegate-latest")
|
time_str = redis.get("trade-delegate-latest-time")
|
if result is None:
|
return [], time_str
|
else:
|
return json.loads(result), time_str
|
|
|
# 开始交易
|
def start_buy(code, capture_timestamp, last_data, last_data_index):
|
@dask.delayed
|
def is_forbidden(code):
|
if l2_trade_util.is_in_forbidden_trade_codes(code):
|
return Exception("禁止交易")
|
return None, None
|
|
@dask.delayed
|
def is_state_right(code):
|
trade_state = get_trade_state(code)
|
if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
|
return Exception("代码处于不可交易状态"), trade_state
|
return None, trade_state
|
|
@dask.delayed
|
def is_money_enough(code):
|
money = get_available_money()
|
if money is None:
|
return Exception("未获取到账户可用资金"), None
|
price = gpcode_manager.get_limit_up_price(code)
|
if price is None:
|
return Exception("尚未获取到涨停价"), None
|
# 买一手的资金是否足够
|
if price * 100 > money:
|
return Exception("账户可用资金不足"), price
|
return None, price
|
|
@dask.delayed
|
def can_trade(*args):
|
for arg in args:
|
if arg[0] is not None:
|
return arg[0], None, None
|
return None, args[1][1], args[2][1]
|
|
_start_time = tool.get_now_timestamp()
|
|
f1 = is_forbidden(code)
|
f2 = is_state_right(code)
|
f3 = is_money_enough(code)
|
dask_result = can_trade(f1, f2, f3)
|
ex, trade_state, price = dask_result.compute()
|
if ex is not None:
|
raise ex
|
|
# 并行改造
|
# # 是否禁止交易
|
# if l2_trade_util.is_in_forbidden_trade_codes(code):
|
# raise Exception("禁止交易")
|
# trade_state = get_trade_state(code)
|
# if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
|
# raise Exception("代码处于不可交易状态")
|
# money = get_available_money()
|
# if money is None:
|
# raise Exception("未获取到账户可用资金")
|
# price = gpcode_manager.get_limit_up_price(code)
|
# if price is None:
|
# raise Exception("尚未获取到涨停价")
|
# # 买一手的资金是否足够
|
# if price * 100 > money:
|
# raise Exception("账户可用资金不足")
|
|
print("开始买入")
|
logger_trade.info("{}开始买入".format(code))
|
set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
|
_start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True)
|
__buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
|
l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "异步买入时间", force=True)
|
|
|
# 中断买入
|
def break_buy(code, reason):
|
trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
|
|
|
# 购买
|
@tool.async_call
|
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
|
try:
|
guiTrade.buy(code, price)
|
__place_order_success(code, capture_timestamp, last_data, last_data_index)
|
except Exception as e:
|
__place_order_fail(code, trade_state)
|
logger_trade.error("{}买入异常{}".format(code, str(e)))
|
raise e
|
|
|
# 下单成功
|
def __place_order_success(code, capture_timestamp, last_data, last_data_index):
|
# 买入结束点
|
use_time = round(time.time() * 1000) - capture_timestamp
|
logger_trade.info("{}-从截图到下单成功总费时:{}".format(code, use_time))
|
# 下单成功,加入固定代码库
|
l2_data_manager.add_to_l2_fixed_codes(code)
|
# 记录下单的那一帧图片的截图时间与交易用时
|
trade_data_manager.TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data,
|
last_data_index)
|
|
print("买入结束")
|
logger_trade.info("{}买入成功".format(code))
|
|
|
# 下单失败
|
def __place_order_fail(code, trade_state):
|
print("买入异常")
|
# 状态还原
|
set_trade_state(code, trade_state)
|
|
|
# 开始取消买入
|
def start_cancel_buy(code, force=False):
|
trade_state = get_trade_state(code)
|
if trade_state == TRADE_STATE_BUY_SUCCESS:
|
return None
|
if not force:
|
if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED:
|
return None
|
try:
|
logger_trade.info("{}开始撤单".format(code))
|
set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
|
logger_trade.info("{}撤单方法开始".format(code))
|
guiTrade.cancel_buy(code)
|
logger_trade.info("{}撤单方法结束".format(code))
|
__cancel_success(code)
|
try:
|
cancel_buy_again(code)
|
except Exception as e1:
|
pass
|
except Exception as e:
|
# 状态还原
|
set_trade_state(code, trade_state)
|
logger_trade.error("{}撤单异常:{}".format(code, str(e)))
|
raise e
|
logger_trade.info("{}撤单完毕".format(code))
|
|
|
# 再次撤单,防止没有撤掉
|
@tool.async_call
|
def cancel_buy_again(code):
|
time.sleep(0.02)
|
for i in range(0, 5):
|
# 如果时
|
trade_state = get_trade_state(code)
|
if trade_state != TRADE_STATE_BUY_CANCEL_ING and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS:
|
return
|
try:
|
logger_trade.info("{}:开始再次撤单", code)
|
guiTrade.cancel_buy_again(code)
|
logger_trade.info("{}:再次撤单成功", code)
|
break
|
except Exception as e:
|
logger_trade.error("{}再次撤单异常:{}".format(code, str(e)))
|
time.sleep(0.1 + 0.05 * i)
|
pass
|
|
|
# 取消委托成功
|
def __cancel_success(code):
|
trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
|
# 下单成功,加入固定代码库
|
l2_data_manager.remove_from_l2_fixed_codes(code)
|
logger_trade.info("{}撤单成功".format(code))
|
|
|
# 处理交易成功数据
|
def process_trade_success_data(datas):
|
if datas is None:
|
return None
|
for data in datas:
|
code = data["code"]
|
_time = data["time"]
|
if _time == "00:00:00":
|
continue
|
if code is not None and int(data["type"]) == 0:
|
l2_trade_util.forbidden_trade(code)
|
state = get_trade_state(code)
|
if state != TRADE_STATE_BUY_SUCCESS:
|
set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
|
# 删除买撤记录的临时信息
|
l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
l2_data_manager.TradePointManager.delete_buy_point(code)
|
# 移除交易窗口分配
|
THSBuyWinManagerNew.cancel_distribute_win_for_code(code)
|
# TODO 完全成交后移除L2
|
|
|
# 处理委托成功数据
|
def process_trade_delegate_data(datas):
|
if datas is None:
|
return None
|
codes = []
|
for data in datas:
|
code = data["code"]
|
if code is not None:
|
codes.append(code)
|
trade_state = get_trade_state(code)
|
# 设置下单状态的代码为已委托
|
if trade_state == TRADE_STATE_BUY_PLACE_ORDER:
|
set_trade_state(code, TRADE_STATE_BUY_DELEGATED)
|
ing_codes = get_codes_by_trade_state(TRADE_STATE_BUY_CANCEL_ING)
|
if ing_codes is not None:
|
for code in ing_codes:
|
if code in codes:
|
# 强制重新取消
|
start_cancel_buy(code, True)
|
else:
|
set_trade_state(code, TRADE_STATE_BUY_CANCEL_SUCCESS)
|
l2_data_manager.remove_from_l2_fixed_codes(code)
|
|
|
def __clear_data(code):
|
redis_l2 = redis_manager.RedisManager(1).getRedis()
|
|
keys = redis_l2.keys("*{}*".format(code))
|
for k in keys:
|
# if (k.find("l2-") is None or k.find("l2-") < 0) and (k.find("big_data-") is None or k.find("big_data-") < 0):
|
redis_l2.delete(k)
|
|
redis_trade = redis_manager.RedisManager(2).getRedis()
|
redis_trade.delete("trade-state-{}".format(code))
|
|
redis_info = redis_manager.RedisManager(0).getRedis()
|
keys = redis_info.keys("*{}*".format(code))
|
for k in keys:
|
if k.find("pre") is not None:
|
continue
|
if k.find("zyltgb") is not None:
|
continue
|
|
redis_info.delete(k)
|
|
|
def __clear_big_data():
|
redis_l2 = redis_manager.RedisManager(1).getRedis()
|
keys = redis_l2.keys("big_data-*")
|
for k in keys:
|
redis_l2.delete(k)
|
|
|
if __name__ == "__main__":
|
cancel_buy_again("000637")
|