"""
|
交易管理器,
|
对一系列的代码交易变量,下单,撤单进行管理
|
"""
|
# 交易管理器
|
import copy
|
import json
|
|
from code_attribute import gpcode_manager
|
from db import redis_manager_delegate as redis_manager
|
from db.mysql_data_delegate import Mysqldb
|
from db.redis_manager_delegate import RedisUtils
|
from log_module import async_log_util
|
from trade import trade_constant
|
|
from log_module.log import *
|
from utils import import_util, tool, huaxin_util
|
|
trade_gui = import_util.import_lib("trade.trade_gui")
|
|
__db = 12
|
__redis_manager = redis_manager.RedisManager(__db)
|
|
guiTrade = None # trade_gui.THSGuiTrade() if trade_gui is not None else None
|
|
latest_trade_delegate_data = []
|
|
|
# 关闭购买入口
|
# 开启购买入口
|
class TradeStateManager:
|
__instance = None
|
__db = 12
|
redisManager = redis_manager.RedisManager(12)
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(TradeStateManager, cls).__new__(cls, *args, **kwargs)
|
cls.__instance.__trade_buy_state_cache = cls.is_can_buy()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
def sync(self):
|
self.__trade_buy_state_cache = self.is_can_buy()
|
|
# 开启购买入口
|
def open_buy(self):
|
self.__trade_buy_state_cache = True
|
RedisUtils.setex_async(self.__db, "trade_buy_state", tool.get_expire(), 1)
|
|
# 关闭购买入口
|
def close_buy(self):
|
self.__trade_buy_state_cache = False
|
RedisUtils.setex_async(self.__db, "trade_buy_state", tool.get_expire(), 0)
|
|
# 是否可以下单
|
@classmethod
|
def is_can_buy(cls):
|
# 默认设置为可交易
|
val = RedisUtils.get(cls.__get_redis(), "trade_buy_state")
|
if val is None:
|
return True
|
if int(val) == 1:
|
return True
|
else:
|
return False
|
|
# 是否可以下单
|
|
def is_can_buy_cache(self):
|
# 默认设置为可交易
|
return self.__trade_buy_state_cache
|
|
|
# 代码的交易状态管理
|
class CodesTradeStateManager:
|
__trade_state_cache = {}
|
__db = 12
|
__redis_manager = redis_manager.RedisManager(12)
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(CodesTradeStateManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
# 初始化数据
|
keys = RedisUtils.keys(__redis, "trade-state-*", auto_free=False)
|
if keys:
|
for key in keys:
|
code = key.replace("trade-state-", '')
|
cls.__trade_state_cache[code] = int(RedisUtils.get(__redis, key, auto_free=False))
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 获取交易状态
|
def get_trade_state(self, code):
|
state = RedisUtils.get(self.__get_redis(), "trade-state-{}".format(code))
|
if state is None:
|
return trade_constant.TRADE_STATE_NOT_TRADE
|
return int(state)
|
|
def get_trade_state_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__trade_state_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return trade_constant.TRADE_STATE_NOT_TRADE
|
|
def get_trade_state_dict(self):
|
return copy.deepcopy(self.__trade_state_cache)
|
|
# 设置交易状态
|
def set_trade_state(self, code, state):
|
async_log_util.info(logger_trade, "set_trade_state {}-{}".format(code, state))
|
tool.CodeDataCacheUtil.set_cache(self.__trade_state_cache, code, state)
|
RedisUtils.setex_async(self.__db, "trade-state-{}".format(code), tool.get_expire(), state)
|
|
def get_codes_by_trade_state(self, state):
|
redis = self.__get_redis()
|
try:
|
keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
|
codes = []
|
if keys is not None:
|
for key in keys:
|
if int(RedisUtils.get(redis, key, auto_free=False)) == state:
|
codes.append(key.replace("trade-state-", ''))
|
return codes
|
finally:
|
RedisUtils.realse(redis)
|
|
def get_codes_by_trade_states(self, states):
|
redis = self.__get_redis()
|
try:
|
keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
|
codes = []
|
if keys is not None:
|
for key in keys:
|
if int(RedisUtils.get(redis, key, auto_free=False)) in states:
|
codes.append(key.replace("trade-state-", ''))
|
return codes
|
finally:
|
RedisUtils.realse(redis)
|
|
def get_codes_by_trade_states_cache(self, states):
|
# 获取
|
codes = []
|
for code in self.__trade_state_cache:
|
if self.__trade_state_cache[code] in states:
|
codes.append(code)
|
return codes
|
|
# 设置交易账户的可用金额
|
|
|
@tool.singleton
|
class DealCodesManager:
|
"""
|
成交代码管理
|
"""
|
__db = 12
|
|
# 成交代码的订单信息:{代码:{交易id:(量,价格,系统订单号)}}
|
|
def __init__(self):
|
self.musql = Mysqldb()
|
# 成交得订单信息
|
self.__deal_code_orders_info = {}
|
# 委托得订单信息:{code#order_ref:}
|
self.__delegate_code_orders = {}
|
self.__load_data()
|
|
def __load_data(self):
|
# 不算打板的数据
|
sql = f"select tradeID,securityID, orderSysID,price,volume from hx_trade_deal_record where `direction` = '0' and tradingDay = '{tool.get_now_date_str('%Y%m%d')}'"
|
results = self.musql.select_all(sql)
|
if results:
|
for r in results:
|
self.add_deal_order(r[1], r[4], round(float(r[3]), 2), r[0], r[2])
|
|
def add_deal_order(self, code, volume, price, trade_id, order_sys_id):
|
"""
|
添加成交订单
|
@param code:
|
@param volume:
|
@param price:
|
@param trade_id:
|
@param order_sys_id:
|
@return:
|
"""
|
pre_price = gpcode_manager.CodePrePriceManager().get_price_pre_cache(code)
|
if pre_price and round((price - pre_price) / pre_price, 4) > 0.08 * (tool.get_limit_up_rate(code) - 1) * 10:
|
# 视为打板买入,不处理数据
|
return
|
if code not in self.__deal_code_orders_info:
|
self.__deal_code_orders_info[code] = {}
|
if trade_id in self.__deal_code_orders_info[code]:
|
return
|
self.__deal_code_orders_info[code][trade_id] = (volume, price, order_sys_id)
|
|
def set_order_status(self, code, order_ref, order_sys_id, price, volume, status):
|
"""
|
设置订单状态
|
@param code:
|
@param order_ref:
|
@param order_sys_id:
|
@param price:
|
@param volume:
|
@param status:
|
@return:
|
"""
|
k = f"{code}#{order_ref}"
|
if k not in self.__delegate_code_orders:
|
return
|
# [代码,订单索引,订单号,价格,量,状态,板块集合]
|
data = self.__delegate_code_orders[k]
|
data[2] = order_sys_id
|
data[5] = status
|
data[3] = price
|
data[4] = volume
|
# 如果订单已经取消就需要删除
|
if status == huaxin_util.TORA_TSTP_OST_AllCanceled or status == huaxin_util.TORA_TSTP_OST_Rejected:
|
data = self.__delegate_code_orders.pop(k)
|
if data:
|
PlatePlaceOrderManager().remove_plates_code(data[6], code)
|
|
def get_deal_codes(self):
|
if not self.__deal_code_orders_info:
|
return set()
|
return set(self.__deal_code_orders_info.keys())
|
|
def place_order(self, plates, code, order_ref, price, volume):
|
"""
|
下单
|
@param plates:
|
@param code:
|
@param order_ref:
|
@param price:
|
@param volume:
|
@return:
|
"""
|
# 初始化委托数据 [代码,订单索引,订单号,价格,量,状态,板块集合]
|
data = [code, order_ref, '', price, volume, huaxin_util.TORA_TSTP_OST_Unknown, plates]
|
k = f"{code}#{order_ref}"
|
if k not in self.__delegate_code_orders:
|
self.__delegate_code_orders[k] = data
|
PlatePlaceOrderManager().add_plates_code(plates, code)
|
|
def place_order_fail(self, code, order_ref):
|
"""
|
下单失败了
|
@param code:
|
@param order_ref:
|
@return:
|
"""
|
k = f"{code}#{order_ref}"
|
if k in self.__delegate_code_orders:
|
data = self.__delegate_code_orders.pop(k)
|
if data:
|
PlatePlaceOrderManager().remove_plates_code(data[6], code)
|
|
def get_deal_or_delegated_codes(self):
|
"""
|
获取已经成交或者委托的代码
|
@return:
|
"""
|
codes = set()
|
if self.__delegate_code_orders:
|
for k in self.__delegate_code_orders:
|
codes.add(self.__delegate_code_orders[k][0])
|
|
if self.__deal_code_orders_info:
|
codes |= set(self.__deal_code_orders_info.keys())
|
return codes
|
|
|
@tool.singleton
|
class PlatePlaceOrderManager:
|
"""
|
板块下单管理
|
"""
|
|
def __init__(self):
|
self.__db = 12
|
self.redis_manager = redis_manager.RedisManager(self.__db)
|
# 下过单的板块代码
|
self.__place_order_plate_codes_info = {}
|
self.__load_data()
|
|
def __get_redis(self):
|
return self.redis_manager.getRedis()
|
|
def __load_data(self):
|
val = RedisUtils.get(self.__get_redis(), "place_order_plate_codes_info")
|
if val:
|
self.__place_order_plate_codes_info = json.loads(val)
|
|
def add_plates_code(self, plates, code):
|
"""
|
添加板块下单
|
@param plates:
|
@param code:
|
@return:
|
"""
|
for plate in plates:
|
if plate not in self.__place_order_plate_codes_info:
|
self.__place_order_plate_codes_info[plate] = []
|
if code not in self.__place_order_plate_codes_info[plate]:
|
self.__place_order_plate_codes_info[plate].append(code)
|
self.__sync_plate_place_order_info()
|
|
def __sync_plate_place_order_info(self):
|
"""
|
同步板块下单信息
|
@return:
|
"""
|
RedisUtils.setex_async(self.__db, "place_order_plate_codes_info", tool.get_expire(),
|
json.dumps(self.__place_order_plate_codes_info))
|
|
def remove_plates_code(self, plates, code):
|
"""
|
移除板块下单
|
@param plates:
|
@param code:
|
@return:
|
"""
|
for plate in plates:
|
if plate in self.__place_order_plate_codes_info:
|
if code in self.__place_order_plate_codes_info[plate]:
|
self.__place_order_plate_codes_info[plate].remove(code)
|
self.__sync_plate_place_order_info()
|
|
def get_plate_codes(self):
|
return self.__place_order_plate_codes_info
|
|
|
__CodesTradeStateManager = CodesTradeStateManager()
|
|
if __name__ == "__main__":
|
PlatePlaceOrderManager().add_plates_code({"通信","计算机"}, "000333")
|
place_order_plate_codes = PlatePlaceOrderManager().get_plate_codes()
|
code_sets = [set(lst) for lst in place_order_plate_codes.values()]
|
# 2. 使用 set.union() 求并集
|
union_code_sets = set().union(*code_sets)
|
print(union_code_sets)
|