"""
|
华兴交易记录
|
"""
|
|
# 委托记录
|
import datetime
|
import json
|
|
from db.redis_manager import RedisUtils
|
from utils import tool
|
from db import mysql_data, redis_manager
|
|
# 委托列表
|
from utils.history_k_data_util import HistoryKDatasUtils
|
|
|
class DelegateRecordManager:
|
key_list = ["id", "orderLocalID", "securityID", "securityName", "direction", "orderSysID", "insertTime",
|
"insertDate", "acceptTime", "cancelTime", "limitPrice", "turnover", "volume", "volumeTraded",
|
"orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID", "orderRef", "sinfo"]
|
|
@classmethod
|
def add(cls, datas):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
if datas:
|
for d in datas:
|
# 查询是否有数据
|
_id = f"{d['insertDate']}-{d['orderLocalID']}"
|
result = mysqldb.select_one(
|
f"select * from hx_trade_delegate_record where id='{_id}'")
|
if not result:
|
# 新增数据
|
nameDict = HistoryKDatasUtils.get_gp_codes_names([d['securityID']])
|
name = nameDict.get(d['securityID'])
|
mysqldb.execute(
|
"insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
|
_id, d["orderLocalID"], d["securityID"], name, d["direction"],
|
d["orderSysID"], d["insertTime"], d["insertDate"], d["acceptTime"], d["cancelTime"],
|
d["limitPrice"], d["turnover"], d["volume"], d["volumeTraded"], d["orderStatus"],
|
d["orderSubmitStatus"], d["statusMsg"], tool.get_now_datetime_str(),
|
tool.get_now_datetime_str(), d["accountID"], d["orderRef"], d["sinfo"]))
|
else:
|
# 修改数据
|
updateDict = {}
|
if result[5] != d['orderSysID']:
|
updateDict['orderSysID'] = d['orderSysID']
|
if result[8] != d['acceptTime']:
|
updateDict['acceptTime'] = d['acceptTime']
|
if result[9] != d['cancelTime']:
|
updateDict['cancelTime'] = d['cancelTime']
|
if result[11] != str(d['turnover']):
|
updateDict['turnover'] = d['turnover']
|
if result[13] != d['volumeTraded']:
|
updateDict['volumeTraded'] = d['volumeTraded']
|
if result[14] != int(d['orderStatus']):
|
updateDict['orderStatus'] = d['orderStatus']
|
if result[15] != int(d['orderSubmitStatus']):
|
updateDict['orderSubmitStatus'] = d['orderSubmitStatus']
|
if result[16] != d['statusMsg']:
|
updateDict['statusMsg'] = d['statusMsg']
|
if updateDict:
|
# 有更新数据
|
updateDict['updateTime'] = tool.get_now_datetime_str()
|
where_list = []
|
for k in updateDict:
|
if type(updateDict[k]) == str:
|
where_list.append(f"{k}='{updateDict[k]}'")
|
else:
|
where_list.append(f"{k}={updateDict[k]}")
|
mysqldb.execute(
|
f"update hx_trade_delegate_record set {','.join(where_list)} where id='{result[0]}'")
|
finally:
|
pass
|
|
@classmethod
|
def list_by_day(cls, day, min_update_time, orderStatus=[]):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
where_list = [f"r.insertDate='{day}'"]
|
if min_update_time:
|
where_list.append(f"updateTime > '{min_update_time}'")
|
if orderStatus:
|
ss = " or ".join([f"orderStatus = {k}" for k in orderStatus])
|
where_list.append(f"({ss})")
|
results = mysqldb.select_all(
|
f"select * from hx_trade_delegate_record r where {' and '.join(where_list)} order by createTime")
|
# 转dict
|
fresults = []
|
max_update_time = None
|
if results:
|
for r in results:
|
if not max_update_time:
|
max_update_time = r[18]
|
if r[18] > max_update_time:
|
max_update_time = r[18]
|
temp = {}
|
for i in range(len(r)):
|
if type(r[i]) == datetime.datetime:
|
temp[cls.key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S")
|
else:
|
temp[cls.key_list[i]] = r[i]
|
fresults.append(temp)
|
return fresults, max_update_time.strftime("%Y-%m-%d %H:%M:%S") if max_update_time else None
|
finally:
|
pass
|
|
# 获取最近的撤单记录
|
@classmethod
|
def list_latest_cancel_records(cls, count, day=tool.get_now_date_str("%Y%m%d")):
|
mysqldb = mysql_data.Mysqldb()
|
where_list = [f"r.insertDate='{day}'", "r.cancelTime!=''"]
|
results = mysqldb.select_many(
|
f"select * from hx_trade_delegate_record r where {' and '.join(where_list)} order by r.cancelTime desc",
|
count)
|
fresults = []
|
if results:
|
for r in results:
|
temp = {}
|
for i in range(len(r)):
|
if type(r[i]) == datetime.datetime:
|
temp[cls.key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S")
|
else:
|
temp[cls.key_list[i]] = r[i]
|
fresults.append(temp)
|
return fresults
|
|
|
# 持仓记录
|
class PositionManager:
|
__redisManager = redis_manager.RedisManager(2)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
# 保存代码的量
|
@classmethod
|
def __save_code_volume(cls, code, volume):
|
RedisUtils.setex(cls.__get_redis(), f"available_position_{code}", tool.get_expire(), f"{volume}")
|
|
@classmethod
|
def get_code_volume(cls, code):
|
val = RedisUtils.get(cls.__get_redis(), f"available_position_{code}")
|
if not val:
|
return 0
|
return int(val)
|
|
@classmethod
|
def add(cls, datas):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
if datas:
|
# 统计可用量
|
volume_dict = {}
|
for d in datas:
|
if d["securityID"] not in volume_dict:
|
volume_dict[d["securityID"]] = 0
|
volume_dict[d["securityID"]] = volume_dict[d["securityID"]] + d["availablePosition"]
|
for k in volume_dict:
|
cls.__save_code_volume(k, volume_dict[k])
|
|
for d in datas:
|
_id = f"{d['investorID']}-{d['tradingDay']}-{d['securityID']}"
|
# 查询是否有数据
|
result = mysqldb.select_one(
|
f"select * from hx_trade_position where id='{_id}'")
|
if not result:
|
# 新增数据
|
mysqldb.execute(
|
"insert into hx_trade_position values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
|
_id, d["investorID"], d["tradingDay"], d["securityName"], d["securityID"],
|
d["historyPos"], d["historyPosFrozen"], d["todayBSPos"], d["todayBSPosFrozen"],
|
d["historyPosPrice"],
|
d["totalPosCost"], d["prePosition"], d["availablePosition"], d["currentPosition"],
|
d["openPosCost"],
|
d["todayCommission"], d["todayTotalBuyAmount"], d["todayTotalSellAmount"],
|
tool.get_now_datetime_str(),
|
tool.get_now_datetime_str()))
|
else:
|
# 修改数据
|
updateDict = {}
|
if result[5] != d['historyPos']:
|
updateDict['historyPos'] = d['historyPos']
|
if result[6] != d['historyPosFrozen']:
|
updateDict['historyPosFrozen'] = d['historyPosFrozen']
|
if result[7] != d['todayBSPos']:
|
updateDict['todayBSPos'] = d['todayBSPos']
|
if result[8] != d['todayBSPosFrozen']:
|
updateDict['todayBSPosFrozen'] = d['todayBSPosFrozen']
|
if result[9] != f"{d['historyPosPrice']}":
|
updateDict['historyPosPrice'] = d['historyPosPrice']
|
if result[10] != f"{d['totalPosCost']}":
|
updateDict['totalPosCost'] = d['totalPosCost']
|
if result[11] != d['prePosition']:
|
updateDict['prePosition'] = d['prePosition']
|
if result[12] != d['availablePosition']:
|
updateDict['availablePosition'] = d['availablePosition']
|
if result[13] != d['currentPosition']:
|
updateDict['currentPosition'] = d['currentPosition']
|
if result[14] != f"{d['openPosCost']}":
|
updateDict['openPosCost'] = d['openPosCost']
|
if result[15] != f"{d['todayCommission']}":
|
updateDict['todayCommission'] = d['todayCommission']
|
if result[16] != f"{d['todayTotalBuyAmount']}":
|
updateDict['todayTotalBuyAmount'] = d['todayTotalBuyAmount']
|
if result[17] != f"{d['todayTotalSellAmount']}":
|
updateDict['todayTotalSellAmount'] = d['todayTotalSellAmount']
|
if updateDict:
|
# 有更新数据
|
updateDict['updateTime'] = tool.get_now_datetime_str()
|
where_list = []
|
for k in updateDict:
|
if type(updateDict[k]) == str:
|
where_list.append(f"{k}='{updateDict[k]}'")
|
else:
|
where_list.append(f"{k}={updateDict[k]}")
|
mysqldb.execute(
|
f"update hx_trade_position set {','.join(where_list)} where id='{result[0]}'")
|
finally:
|
pass
|
|
@classmethod
|
def list_by_day(cls, day):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
results = mysqldb.select_all(
|
f"select * from hx_trade_position r where r.tradingDay='{day}' order by createTime")
|
# 转dict
|
key_list = ["id", "investorID", "securityName", "securityID", "historyPos", "historyPosFrozen",
|
"todayBSPos", "todayBSPosFrozen", "historyPosPrice", "totalPosCost", "prePosition",
|
"availablePosition", "currentPosition",
|
"openPosCost", "todayCommission", "todayTotalBuyAmount", "todayTotalSellAmount", "createTime",
|
"updateTime"]
|
fresults = []
|
|
if results:
|
for r in results:
|
temp = {}
|
for i in range(len(r)):
|
if type(r[i]) == datetime.datetime:
|
temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S")
|
else:
|
temp[key_list[i]] = r[i]
|
fresults.append(temp)
|
return fresults
|
finally:
|
pass
|
|
@classmethod
|
def get_volume_by_code(cls, code):
|
mysqldb = mysql_data.Mysqldb()
|
return mysqldb.select_one(f"select currentPosition from hx_trade_position where securityID='{code}'")
|
|
@classmethod
|
def get_cost_price(cls, code, day=tool.get_now_date_str("%Y%m%d")):
|
# 获取成本价
|
mysqldb = mysql_data.Mysqldb()
|
result = mysqldb.select_one(
|
f"select totalPosCost,currentPosition from hx_trade_position where securityID='{code}' and tradingDay='{day}'")
|
if not result:
|
raise Exception("尚未持仓")
|
if result[1] == 0:
|
raise Exception("已经清仓")
|
return round(float(result[0]) / result[1], 2)
|
|
|
# 成交记录
|
class DealRecordManager:
|
@classmethod
|
def add(cls, datas):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
if datas:
|
for d in datas:
|
# 查询是否有数据
|
result = mysqldb.select_one(
|
f"select * from hx_trade_deal_record where tradeID='{d['tradeID']}'")
|
if not result:
|
# 新增数据
|
mysqldb.execute(
|
"insert into hx_trade_deal_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
|
d["tradeID"], d["securityID"], d["orderLocalID"], d["direction"],
|
d["orderSysID"], round(d["price"], 2), d["tradeTime"], d["volume"],
|
d["tradeDate"],
|
d["tradingDay"], d["pbuID"], d["accountID"],
|
tool.get_now_datetime_str(),
|
tool.get_now_datetime_str()))
|
finally:
|
pass
|
|
@classmethod
|
def list_by_day(cls, day):
|
mysqldb = mysql_data.Mysqldb()
|
try:
|
results = mysqldb.select_all(
|
f"select * from hx_trade_deal_record r where r.tradeDate='{day}' order by createTime")
|
# 转dict
|
key_list = ["tradeID", "securityID", "orderLocalID", "direction", "orderSysID", "price",
|
"tradeTime", "volume", "tradeDate", "tradingDay", "pbuID",
|
"accountID", "createTime",
|
"updateTime"]
|
fresults = []
|
if results:
|
for r in results:
|
temp = {}
|
for i in range(len(r)):
|
if type(r[i]) == datetime.datetime:
|
temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S")
|
else:
|
temp[key_list[i]] = r[i]
|
fresults.append(temp)
|
return fresults
|
finally:
|
pass
|
return []
|
|
|
# 资金管理
|
class MoneyManager:
|
__redisMananger = redis_manager.RedisManager(2)
|
|
@classmethod
|
def get_redis(cls):
|
return cls.__redisMananger.getRedis()
|
|
@classmethod
|
def save_data(cls, data):
|
RedisUtils.setex(cls.get_redis(), "huaxin_money", tool.get_expire(), json.dumps(data))
|
|
@classmethod
|
def get_data(cls):
|
val = RedisUtils.get(cls.get_redis(), "huaxin_money")
|
if not val:
|
return None
|
return json.loads(val)
|
|
|
if __name__ == "__main__":
|
results = DelegateRecordManager.list_latest_cancel_records(10)
|
print(results)
|