""" 华兴交易记录 """ # 委托记录 import datetime import json from db.redis_manager import RedisUtils from utils import tool from db import mysql_data, redis_manager # 委托列表 from utils.history_k_data_util import HistoryKDatasUtils class DelegateRecordManager: key_list = ["id", "orderLocalID", "securityID", "securityName", "direction", "orderSysID", "insertTime", "insertDate", "acceptTime", "cancelTime", "limitPrice", "turnover", "volume", "volumeTraded", "orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID", "orderRef", "sinfo"] @classmethod def add(cls, datas): mysqldb = mysql_data.Mysqldb() try: if datas: for d in datas: # 查询是否有数据 _id = f"{d['insertDate']}-{d['orderLocalID']}" result = mysqldb.select_one( f"select * from hx_trade_delegate_record where id='{_id}'") if not result: # 新增数据 nameDict = HistoryKDatasUtils.get_gp_codes_names([d['securityID']]) name = nameDict.get(d['securityID']) mysqldb.execute( "insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( _id, d["orderLocalID"], d["securityID"], name, d["direction"], d["orderSysID"], d["insertTime"], d["insertDate"], d["acceptTime"], d["cancelTime"], d["limitPrice"], d["turnover"], d["volume"], d["volumeTraded"], d["orderStatus"], d["orderSubmitStatus"], d["statusMsg"], tool.get_now_datetime_str(), tool.get_now_datetime_str(), d["accountID"], d["orderRef"], d["sinfo"])) else: # 修改数据 updateDict = {} if result[5] != d['orderSysID']: updateDict['orderSysID'] = d['orderSysID'] if result[8] != d['acceptTime']: updateDict['acceptTime'] = d['acceptTime'] if result[9] != d['cancelTime']: updateDict['cancelTime'] = d['cancelTime'] if result[11] != str(d['turnover']): updateDict['turnover'] = d['turnover'] if result[13] != d['volumeTraded']: updateDict['volumeTraded'] = d['volumeTraded'] if result[14] != int(d['orderStatus']): updateDict['orderStatus'] = d['orderStatus'] if result[15] != int(d['orderSubmitStatus']): updateDict['orderSubmitStatus'] = d['orderSubmitStatus'] if result[16] != d['statusMsg']: updateDict['statusMsg'] = d['statusMsg'] if updateDict: # 有更新数据 updateDict['updateTime'] = tool.get_now_datetime_str() where_list = [] for k in updateDict: if type(updateDict[k]) == str: where_list.append(f"{k}='{updateDict[k]}'") else: where_list.append(f"{k}={updateDict[k]}") mysqldb.execute( f"update hx_trade_delegate_record set {','.join(where_list)} where id='{result[0]}'") finally: pass @classmethod def list_by_day(cls, day, min_update_time, orderStatus=[]): mysqldb = mysql_data.Mysqldb() try: where_list = [f"r.insertDate='{day}'"] if min_update_time: where_list.append(f"updateTime > '{min_update_time}'") if orderStatus: ss = " or ".join([f"orderStatus = {k}" for k in orderStatus]) where_list.append(f"({ss})") results = mysqldb.select_all( f"select * from hx_trade_delegate_record r where {' and '.join(where_list)} order by createTime") # 转dict fresults = [] max_update_time = None if results: for r in results: if not max_update_time: max_update_time = r[18] if r[18] > max_update_time: max_update_time = r[18] temp = {} for i in range(len(r)): if type(r[i]) == datetime.datetime: temp[cls.key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S") else: temp[cls.key_list[i]] = r[i] fresults.append(temp) return fresults, max_update_time.strftime("%Y-%m-%d %H:%M:%S") if max_update_time else None finally: pass # 获取最近的撤单记录 @classmethod def list_latest_cancel_records(cls, count, day=tool.get_now_date_str("%Y%m%d")): mysqldb = mysql_data.Mysqldb() where_list = [f"r.insertDate='{day}'", "r.cancelTime!=''"] results = mysqldb.select_many( f"select * from hx_trade_delegate_record r where {' and '.join(where_list)} order by r.cancelTime desc", count) fresults = [] if results: for r in results: temp = {} for i in range(len(r)): if type(r[i]) == datetime.datetime: temp[cls.key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S") else: temp[cls.key_list[i]] = r[i] fresults.append(temp) return fresults # 持仓记录 class PositionManager: __redisManager = redis_manager.RedisManager(2) @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 保存代码的量 @classmethod def __save_code_volume(cls, code, volume): RedisUtils.setex(cls.__get_redis(), f"available_position_{code}", tool.get_expire(), f"{volume}") @classmethod def get_code_volume(cls, code): val = RedisUtils.get(cls.__get_redis(), f"available_position_{code}") if not val: return 0 return int(val) @classmethod def add(cls, datas): mysqldb = mysql_data.Mysqldb() try: if datas: # 统计可用量 volume_dict = {} for d in datas: if d["securityID"] not in volume_dict: volume_dict[d["securityID"]] = 0 volume_dict[d["securityID"]] = volume_dict[d["securityID"]] + d["availablePosition"] for k in volume_dict: cls.__save_code_volume(k, volume_dict[k]) for d in datas: _id = f"{d['investorID']}-{d['tradingDay']}-{d['securityID']}" # 查询是否有数据 result = mysqldb.select_one( f"select * from hx_trade_position where id='{_id}'") if not result: # 新增数据 mysqldb.execute( "insert into hx_trade_position values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( _id, d["investorID"], d["tradingDay"], d["securityName"], d["securityID"], d["historyPos"], d["historyPosFrozen"], d["todayBSPos"], d["todayBSPosFrozen"], d["historyPosPrice"], d["totalPosCost"], d["prePosition"], d["availablePosition"], d["currentPosition"], d["openPosCost"], d["todayCommission"], d["todayTotalBuyAmount"], d["todayTotalSellAmount"], tool.get_now_datetime_str(), tool.get_now_datetime_str())) else: # 修改数据 updateDict = {} if result[5] != d['historyPos']: updateDict['historyPos'] = d['historyPos'] if result[6] != d['historyPosFrozen']: updateDict['historyPosFrozen'] = d['historyPosFrozen'] if result[7] != d['todayBSPos']: updateDict['todayBSPos'] = d['todayBSPos'] if result[8] != d['todayBSPosFrozen']: updateDict['todayBSPosFrozen'] = d['todayBSPosFrozen'] if result[9] != f"{d['historyPosPrice']}": updateDict['historyPosPrice'] = d['historyPosPrice'] if result[10] != f"{d['totalPosCost']}": updateDict['totalPosCost'] = d['totalPosCost'] if result[11] != d['prePosition']: updateDict['prePosition'] = d['prePosition'] if result[12] != d['availablePosition']: updateDict['availablePosition'] = d['availablePosition'] if result[13] != d['currentPosition']: updateDict['currentPosition'] = d['currentPosition'] if result[14] != f"{d['openPosCost']}": updateDict['openPosCost'] = d['openPosCost'] if result[15] != f"{d['todayCommission']}": updateDict['todayCommission'] = d['todayCommission'] if result[16] != f"{d['todayTotalBuyAmount']}": updateDict['todayTotalBuyAmount'] = d['todayTotalBuyAmount'] if result[17] != f"{d['todayTotalSellAmount']}": updateDict['todayTotalSellAmount'] = d['todayTotalSellAmount'] if updateDict: # 有更新数据 updateDict['updateTime'] = tool.get_now_datetime_str() where_list = [] for k in updateDict: if type(updateDict[k]) == str: where_list.append(f"{k}='{updateDict[k]}'") else: where_list.append(f"{k}={updateDict[k]}") mysqldb.execute( f"update hx_trade_position set {','.join(where_list)} where id='{result[0]}'") finally: pass @classmethod def list_by_day(cls, day): mysqldb = mysql_data.Mysqldb() try: results = mysqldb.select_all( f"select * from hx_trade_position r where r.tradingDay='{day}' order by createTime") # 转dict key_list = ["id", "investorID", "securityName", "securityID", "historyPos", "historyPosFrozen", "todayBSPos", "todayBSPosFrozen", "historyPosPrice", "totalPosCost", "prePosition", "availablePosition", "currentPosition", "openPosCost", "todayCommission", "todayTotalBuyAmount", "todayTotalSellAmount", "createTime", "updateTime"] fresults = [] if results: for r in results: temp = {} for i in range(len(r)): if type(r[i]) == datetime.datetime: temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S") else: temp[key_list[i]] = r[i] fresults.append(temp) return fresults finally: pass @classmethod def get_volume_by_code(cls, code): mysqldb = mysql_data.Mysqldb() return mysqldb.select_one(f"select currentPosition from hx_trade_position where securityID='{code}'") @classmethod def get_cost_price(cls, code, day=tool.get_now_date_str("%Y%m%d")): # 获取成本价 mysqldb = mysql_data.Mysqldb() result = mysqldb.select_one( f"select totalPosCost,currentPosition from hx_trade_position where securityID='{code}' and tradingDay='{day}'") if not result: raise Exception("尚未持仓") if result[1] == 0: raise Exception("已经清仓") return round(float(result[0]) / result[1], 2) # 成交记录 class DealRecordManager: @classmethod def add(cls, datas): mysqldb = mysql_data.Mysqldb() try: if datas: for d in datas: # 查询是否有数据 result = mysqldb.select_one( f"select * from hx_trade_deal_record where tradeID='{d['tradeID']}'") if not result: # 新增数据 mysqldb.execute( "insert into hx_trade_deal_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( d["tradeID"], d["securityID"], d["orderLocalID"], d["direction"], d["orderSysID"], round(d["price"], 2), d["tradeTime"], d["volume"], d["tradeDate"], d["tradingDay"], d["pbuID"], d["accountID"], tool.get_now_datetime_str(), tool.get_now_datetime_str())) finally: pass @classmethod def list_by_day(cls, day): mysqldb = mysql_data.Mysqldb() try: results = mysqldb.select_all( f"select * from hx_trade_deal_record r where r.tradeDate='{day}' order by createTime") # 转dict key_list = ["tradeID", "securityID", "orderLocalID", "direction", "orderSysID", "price", "tradeTime", "volume", "tradeDate", "tradingDay", "pbuID", "accountID", "createTime", "updateTime"] fresults = [] if results: for r in results: temp = {} for i in range(len(r)): if type(r[i]) == datetime.datetime: temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S") else: temp[key_list[i]] = r[i] fresults.append(temp) return fresults finally: pass return [] # 资金管理 class MoneyManager: __redisMananger = redis_manager.RedisManager(2) @classmethod def get_redis(cls): return cls.__redisMananger.getRedis() @classmethod def save_data(cls, data): RedisUtils.setex(cls.get_redis(), "huaxin_money", tool.get_expire(), json.dumps(data)) @classmethod def get_data(cls): val = RedisUtils.get(cls.get_redis(), "huaxin_money") if not val: return None return json.loads(val) if __name__ == "__main__": results = DelegateRecordManager.list_latest_cancel_records(10) print(results)