"""
|
华兴交易记录
|
"""
|
|
# 委托记录
|
import copy
|
import datetime
|
import json
|
|
from db.redis_manager_delegate import RedisUtils, RedisManager
|
from utils import tool, huaxin_util
|
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
|
|
# 委托列表
|
class DelegateRecordManager:
|
# 当前处于委托状态的数据
|
__current_delegate_records_dict_cache = {}
|
mysqldb = mysql_data.Mysqldb()
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(DelegateRecordManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
fresults, max_update_time = cls.list_by_day(tool.get_now_date_str("%Y%m%d"), None,
|
orderStatus=[huaxin_util.TORA_TSTP_OST_Cached,
|
huaxin_util.TORA_TSTP_OST_Accepted])
|
cls.__current_delegate_records_dict_cache.clear()
|
for d in fresults:
|
cls.__current_delegate_records_dict_cache[d['orderSysID']] = d
|
|
# 获取当前处于委托状态的订单
|
def list_current_delegates(self, code=None):
|
if self.__current_delegate_records_dict_cache:
|
fresults = []
|
for k in self.__current_delegate_records_dict_cache:
|
item = self.__current_delegate_records_dict_cache[k]
|
if code and item["securityID"] != code:
|
continue
|
fresults.append(item)
|
return fresults
|
return None
|
|
@classmethod
|
def add(cls, datas):
|
try:
|
if datas:
|
for d in datas:
|
cls.add_one(d)
|
finally:
|
pass
|
|
@classmethod
|
def add_one(cls, d):
|
if huaxin_util.is_can_cancel(str(d["orderStatus"])):
|
cls.__current_delegate_records_dict_cache[d['orderSysID']] = d
|
else:
|
if d['orderSysID'] in cls.__current_delegate_records_dict_cache:
|
cls.__current_delegate_records_dict_cache.pop(d['orderSysID'])
|
# 查询是否有数据
|
_id = f"{d['insertDate']}-{d['orderLocalID']}"
|
result = cls.mysqldb.select_one(
|
f"select * from hx_trade_delegate_record where id='{_id}'")
|
if not result:
|
# 新增数据
|
nameDict = HistoryKDatasUtils.get_gp_codes_names([d['securityID']])
|
name = nameDict.get(d['securityID'])
|
cls.mysqldb.execute(
|
"insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', '%s', '%s')" % (
|
_id, d["orderLocalID"], d["securityID"], name, d["direction"],
|
d["orderSysID"], d["insertTime"], d["insertDate"], d["acceptTime"], d["cancelTime"],
|
d["limitPrice"], d["turnover"], d["volume"], d["volumeTraded"], d["orderStatus"],
|
d["orderSubmitStatus"], d["statusMsg"], tool.get_now_datetime_str(),
|
tool.get_now_datetime_str(), d["accountID"], d["orderRef"], d["sinfo"]))
|
else:
|
# 修改数据
|
updateDict = {}
|
if result[5] != d['orderSysID']:
|
updateDict['orderSysID'] = d['orderSysID']
|
if result[8] != d['acceptTime']:
|
updateDict['acceptTime'] = d['acceptTime']
|
if result[9] != d['cancelTime']:
|
updateDict['cancelTime'] = d['cancelTime']
|
if result[11] != str(d['turnover']):
|
updateDict['turnover'] = d['turnover']
|
if result[13] != d['volumeTraded']:
|
updateDict['volumeTraded'] = d['volumeTraded']
|
if result[14] != int(d['orderStatus']):
|
updateDict['orderStatus'] = d['orderStatus']
|
if result[15] != int(d['orderSubmitStatus']):
|
updateDict['orderSubmitStatus'] = d['orderSubmitStatus']
|
if result[16] != d['statusMsg']:
|
updateDict['statusMsg'] = d['statusMsg']
|
if result[20] != d['orderRef']:
|
updateDict['orderRef'] = d['orderRef']
|
if result[21] != d['sinfo']:
|
updateDict['sinfo'] = d['sinfo']
|
if updateDict:
|
# 有更新数据
|
updateDict['updateTime'] = tool.get_now_datetime_str()
|
where_list = []
|
for k in updateDict:
|
if type(updateDict[k]) == str:
|
where_list.append(f"{k}='{updateDict[k]}'")
|
else:
|
where_list.append(f"{k}={updateDict[k]}")
|
cls.mysqldb.execute(
|
f"update hx_trade_delegate_record set {','.join(where_list)} where id='{result[0]}'")
|
|
@classmethod
|
def list_by_day(cls, day, min_update_time, orderStatus=[]):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
where_list = [f"r.insertDate='{day}'"]
|
if min_update_time:
|
where_list.append(f"updateTime > '{min_update_time}'")
|
if orderStatus:
|
ss = " or ".join([f"orderStatus = {k}" for k in orderStatus])
|
where_list.append(f"({ss})")
|
results = mysqldb.select_all(
|
f"select * from hx_trade_delegate_record r where {' and '.join(where_list)} order by createTime")
|
# 转dict
|
key_list = ["id", "orderLocalID", "securityID", "securityName", "direction", "orderSysID", "insertTime",
|
"insertDate", "acceptTime", "cancelTime", "limitPrice", "turnover", "volume", "volumeTraded",
|
"orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID",
|
"orderRef", "sinfo"]
|
fresults = []
|
max_update_time = None
|
if results:
|
for r in results:
|
|
temp = {}
|
for i in range(len(r)):
|
if type(r[i]) == datetime.datetime:
|
temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S")
|
else:
|
temp[key_list[i]] = r[i]
|
fresults.append(temp)
|
if not max_update_time:
|
max_update_time = temp["updateTime"]
|
if r[18] > max_update_time:
|
max_update_time = temp["updateTime"]
|
return fresults, max_update_time
|
finally:
|
pass
|
|
|
# 持仓记录
|
class PositionManager:
|
__redisManager = redis_manager.RedisManager(2)
|
latest_positions = []
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
# 保存代码的量
|
@classmethod
|
def __save_code_volume(cls, code, volume):
|
RedisUtils.setex(cls.__get_redis(), f"available_position_{code}", tool.get_expire(), f"{volume}")
|
|
@classmethod
|
def get_code_volume(cls, code):
|
val = RedisUtils.get(cls.__get_redis(), f"available_position_{code}")
|
if not val:
|
return 0
|
return int(val)
|
|
@classmethod
|
def cache(cls, datas):
|
cls.latest_positions = copy.deepcopy(datas)
|
|
@classmethod
|
def add(cls, datas):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
if datas:
|
# 统计可用量
|
volume_dict = {}
|
for d in datas:
|
if d["securityID"] not in volume_dict:
|
volume_dict[d["securityID"]] = 0
|
volume_dict[d["securityID"]] = volume_dict[d["securityID"]] + d["availablePosition"]
|
for k in volume_dict:
|
cls.__save_code_volume(k, volume_dict[k])
|
|
for d in datas:
|
_id = f"{d['investorID']}-{d['tradingDay']}-{d['securityID']}"
|
# 查询是否有数据
|
result = mysqldb.select_one(
|
f"select * from hx_trade_position where id='{_id}'")
|
if not result:
|
# 新增数据
|
mysqldb.execute(
|
"insert into hx_trade_position values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
|
_id, d["investorID"], d["tradingDay"], d["securityName"], d["securityID"],
|
d["historyPos"], d["historyPosFrozen"], d["todayBSPos"], d["todayBSPosFrozen"],
|
d["historyPosPrice"],
|
d["totalPosCost"], d["prePosition"], d["availablePosition"], d["currentPosition"],
|
d["openPosCost"],
|
d["todayCommission"], d["todayTotalBuyAmount"], d["todayTotalSellAmount"],
|
tool.get_now_datetime_str(),
|
tool.get_now_datetime_str()))
|
else:
|
# 修改数据
|
updateDict = {}
|
if result[5] != d['historyPos']:
|
updateDict['historyPos'] = d['historyPos']
|
if result[6] != d['historyPosFrozen']:
|
updateDict['historyPosFrozen'] = d['historyPosFrozen']
|
if result[7] != d['todayBSPos']:
|
updateDict['todayBSPos'] = d['todayBSPos']
|
if result[8] != d['todayBSPosFrozen']:
|
updateDict['todayBSPosFrozen'] = d['todayBSPosFrozen']
|
if result[9] != f"{d['historyPosPrice']}":
|
updateDict['historyPosPrice'] = d['historyPosPrice']
|
if result[10] != f"{d['totalPosCost']}":
|
updateDict['totalPosCost'] = d['totalPosCost']
|
if result[11] != d['prePosition']:
|
updateDict['prePosition'] = d['prePosition']
|
if result[12] != d['availablePosition']:
|
updateDict['availablePosition'] = d['availablePosition']
|
if result[13] != d['currentPosition']:
|
updateDict['currentPosition'] = d['currentPosition']
|
if result[14] != f"{d['openPosCost']}":
|
updateDict['openPosCost'] = d['openPosCost']
|
if result[15] != f"{d['todayCommission']}":
|
updateDict['todayCommission'] = d['todayCommission']
|
if result[16] != f"{d['todayTotalBuyAmount']}":
|
updateDict['todayTotalBuyAmount'] = d['todayTotalBuyAmount']
|
if result[17] != f"{d['todayTotalSellAmount']}":
|
updateDict['todayTotalSellAmount'] = d['todayTotalSellAmount']
|
if updateDict:
|
# 有更新数据
|
updateDict['updateTime'] = tool.get_now_datetime_str()
|
where_list = []
|
for k in updateDict:
|
if type(updateDict[k]) == str:
|
where_list.append(f"{k}='{updateDict[k]}'")
|
else:
|
where_list.append(f"{k}={updateDict[k]}")
|
mysqldb.execute(
|
f"update hx_trade_position set {','.join(where_list)} where id='{result[0]}'")
|
finally:
|
pass
|
|
@classmethod
|
def list_by_day(cls, day):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
results = mysqldb.select_all(
|
f"select * from hx_trade_position r where r.tradingDay='{day}' order by createTime")
|
# 转dict
|
key_list = ["id", "investorID", "securityName", "securityID", "historyPos", "historyPosFrozen",
|
"todayBSPos", "todayBSPosFrozen", "historyPosPrice", "totalPosCost", "prePosition",
|
"availablePosition", "currentPosition",
|
"openPosCost", "todayCommission", "todayTotalBuyAmount", "todayTotalSellAmount", "createTime",
|
"updateTime"]
|
fresults = []
|
|
if results:
|
for r in results:
|
temp = {}
|
for i in range(len(r)):
|
if type(r[i]) == datetime.datetime:
|
temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S")
|
else:
|
temp[key_list[i]] = r[i]
|
fresults.append(temp)
|
return fresults
|
finally:
|
pass
|
|
@classmethod
|
def get_volume_by_code(cls, code):
|
mysqldb = mysql_data.Mysqldb()
|
mysqldb.select_one(f"select currentPosition from hx_trade_position where securityID='{code}'")
|
|
# 获取持仓代码
|
@classmethod
|
def get_position_codes(cls):
|
codes = []
|
if cls.latest_positions:
|
for d in cls.latest_positions:
|
if d["prePosition"] <= 0:
|
continue
|
codes.append(d["securityID"])
|
return codes
|
|
|
# 成交记录
|
class DealRecordManager:
|
__latest_deal_trade_id_dict = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(DealRecordManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_data()
|
return cls.__instance
|
|
@classmethod
|
def __load_data(cls):
|
fresults = cls.list_by_day(tool.get_now_date_str("%Y%m%d"))
|
if fresults:
|
for r in fresults:
|
cls.__latest_deal_trade_id_dict[r["tradeID"]] = r
|
|
def list_sell_by_code_cache(self, code):
|
fresults = []
|
for k in self.__latest_deal_trade_id_dict:
|
d = self.__latest_deal_trade_id_dict[k]
|
if d["securityID"] != code:
|
continue
|
if int(d["direction"]) != 1:
|
continue
|
fresults.append(d)
|
return fresults
|
|
@classmethod
|
def add(cls, datas):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
if datas:
|
for d in datas:
|
cls.__latest_deal_trade_id_dict[d['tradeID']] = d
|
# 查询是否有数据
|
result = mysqldb.select_one(
|
f"select * from hx_trade_deal_record where tradeID='{d['tradeID']}'")
|
if not result:
|
# 新增数据
|
mysqldb.execute(
|
"insert into hx_trade_deal_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
|
d["tradeID"], d["securityID"], d["orderLocalID"], d["direction"],
|
d["orderSysID"], round(d["price"], 2), d["tradeTime"], d["volume"],
|
d["tradeDate"],
|
d["tradingDay"], d["pbuID"], d["accountID"],
|
tool.get_now_datetime_str(),
|
tool.get_now_datetime_str()))
|
finally:
|
pass
|
|
@classmethod
|
def list_by_day(cls, day):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
results = mysqldb.select_all(
|
f"select * from hx_trade_deal_record r where r.tradeDate='{day}' order by createTime")
|
# 转dict
|
key_list = ["tradeID", "securityID", "orderLocalID", "direction", "orderSysID", "price",
|
"tradeTime", "volume", "tradeDate", "tradingDay", "pbuID",
|
"accountID", "createTime",
|
"updateTime"]
|
fresults = []
|
if results:
|
for r in results:
|
temp = {}
|
for i in range(len(r)):
|
if type(r[i]) == datetime.datetime:
|
temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S")
|
else:
|
temp[key_list[i]] = r[i]
|
fresults.append(temp)
|
return fresults
|
finally:
|
pass
|
return []
|
|
|
# 资金管理
|
class MoneyManager:
|
__redisMananger = redis_manager.RedisManager(2)
|
|
@classmethod
|
def get_redis(cls):
|
return cls.__redisMananger.getRedis()
|
|
@classmethod
|
def save_data(cls, data):
|
RedisUtils.setex(cls.get_redis(), "huaxin_money", tool.get_expire(), json.dumps(data))
|
|
@classmethod
|
def get_data(cls):
|
val = RedisUtils.get(cls.get_redis(), "huaxin_money")
|
if not val:
|
return None
|
return json.loads(val)
|
|
|
# 交易订单号管理
|
class TradeOrderIdManager:
|
__db = 2
|
__redisManager = RedisManager(2)
|
__instance = None
|
__huaxin_order_id_cache = {}
|
__huaxin_order_ref_cache = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(TradeOrderIdManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "huaxin_order_id-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
vals = RedisUtils.smembers(__redis, k)
|
tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_id_cache, code, vals)
|
keys = RedisUtils.keys(__redis, "huaxin_order_ref-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
vals = RedisUtils.smembers(__redis, k)
|
tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_ref_cache, code, vals)
|
finally:
|
RedisUtils.realse(__redis)
|
# 添加订单ID
|
|
def add_order_ref(self, code, order_ref):
|
self.add_order_refs(code, [order_ref])
|
|
def add_order_refs(self, code, order_ref_list):
|
if code not in self.__huaxin_order_ref_cache:
|
self.__huaxin_order_ref_cache[code] = set()
|
for val in order_ref_list:
|
self.__huaxin_order_ref_cache[code].add(val)
|
RedisUtils.sadd_async(self.__db, f"huaxin_order_ref-{code}", val)
|
RedisUtils.expire_async(self.__db, f"huaxin_order_ref-{code}", tool.get_expire())
|
|
# 删除订单ID
|
def remove_order_ref(self, code, order_ref):
|
val = order_ref
|
if code in self.__huaxin_order_ref_cache:
|
self.__huaxin_order_ref_cache[code].discard(val)
|
RedisUtils.srem_async(self.__db, f"huaxin_order_ref-{code}", val)
|
|
# 查询所有的订单号
|
def list_order_refs(self, code):
|
return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_ref-{code}")
|
|
def list_order_refs_cache(self, code):
|
if code in self.__huaxin_order_ref_cache:
|
return self.__huaxin_order_ref_cache[code]
|
return set()
|
|
# 添加订单ID
|
def add_order_id(self, code, account_id, sys_order_id):
|
val = json.dumps((account_id, sys_order_id))
|
if code not in self.__huaxin_order_id_cache:
|
self.__huaxin_order_id_cache[code] = set()
|
self.__huaxin_order_id_cache[code].add(val)
|
RedisUtils.sadd_async(self.__db, f"huaxin_order_id-{code}", val)
|
RedisUtils.expire_async(self.__db, f"huaxin_order_id-{code}", tool.get_expire())
|
|
# 删除订单ID
|
def remove_order_id(self, code, account_id, order_id):
|
val = json.dumps((account_id, order_id))
|
if code in self.__huaxin_order_id_cache:
|
self.__huaxin_order_id_cache[code].discard(val)
|
RedisUtils.srem_async(self.__db, f"huaxin_order_id-{code}", val)
|
|
# 查询所有的订单号
|
|
def list_order_ids(self, code):
|
return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_id-{code}")
|
|
def list_order_ids_cache(self, code):
|
if code in self.__huaxin_order_id_cache:
|
return self.__huaxin_order_id_cache[code]
|
return set()
|
|
|
if __name__ == "__main__":
|
print(DelegateRecordManager().list_current_delegates("600239"))
|