""" 华兴交易记录 """ # 委托记录 import copy import datetime import json from db.redis_manager_delegate import RedisUtils, RedisManager from utils import tool, huaxin_util from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager from third_data.history_k_data_util import HistoryKDatasUtils # 委托列表 class DelegateRecordManager: # 当前处于委托状态的数据 __current_delegate_records_dict_cache = {} mysqldb = mysql_data.Mysqldb() __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(DelegateRecordManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): fresults, max_update_time = cls.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=[huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Accepted]) cls.__current_delegate_records_dict_cache.clear() for d in fresults: cls.__current_delegate_records_dict_cache[d['orderSysID']] = d # 获取当前处于委托状态的订单 def list_current_delegates(self, code=None): if self.__current_delegate_records_dict_cache: fresults = [] for k in self.__current_delegate_records_dict_cache: item = self.__current_delegate_records_dict_cache[k] if code and item["securityID"] != code: continue fresults.append(item) return fresults return None @classmethod def add(cls, datas): try: if datas: for d in datas: cls.add_one(d) finally: pass @classmethod def add_one(cls, d): if huaxin_util.is_can_cancel(str(d["orderStatus"])): cls.__current_delegate_records_dict_cache[d['orderSysID']] = d else: if d['orderSysID'] in cls.__current_delegate_records_dict_cache: cls.__current_delegate_records_dict_cache.pop(d['orderSysID']) # 查询是否有数据 _id = f"{d['insertDate']}-{d['orderLocalID']}" result = cls.mysqldb.select_one( f"select * from hx_trade_delegate_record where id='{_id}'") if not result: # 新增数据 nameDict = HistoryKDatasUtils.get_gp_codes_names([d['securityID']]) name = nameDict.get(d['securityID']) cls.mysqldb.execute( "insert into hx_trade_delegate_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s', '%s', '%s')" % ( _id, d["orderLocalID"], d["securityID"], name, d["direction"], d["orderSysID"], d["insertTime"], d["insertDate"], d["acceptTime"], d["cancelTime"], d["limitPrice"], d["turnover"], d["volume"], d["volumeTraded"], d["orderStatus"], d["orderSubmitStatus"], d["statusMsg"], tool.get_now_datetime_str(), tool.get_now_datetime_str(), d["accountID"], d["orderRef"], d["sinfo"])) else: # 修改数据 updateDict = {} if result[5] != d['orderSysID']: updateDict['orderSysID'] = d['orderSysID'] if result[8] != d['acceptTime']: updateDict['acceptTime'] = d['acceptTime'] if result[9] != d['cancelTime']: updateDict['cancelTime'] = d['cancelTime'] if result[11] != str(d['turnover']): updateDict['turnover'] = d['turnover'] if result[13] != d['volumeTraded']: updateDict['volumeTraded'] = d['volumeTraded'] if result[14] != int(d['orderStatus']): updateDict['orderStatus'] = d['orderStatus'] if result[15] != int(d['orderSubmitStatus']): updateDict['orderSubmitStatus'] = d['orderSubmitStatus'] if result[16] != d['statusMsg']: updateDict['statusMsg'] = d['statusMsg'] if result[20] != d['orderRef']: updateDict['orderRef'] = d['orderRef'] if result[21] != d['sinfo']: updateDict['sinfo'] = d['sinfo'] if updateDict: # 有更新数据 updateDict['updateTime'] = tool.get_now_datetime_str() where_list = [] for k in updateDict: if type(updateDict[k]) == str: where_list.append(f"{k}='{updateDict[k]}'") else: where_list.append(f"{k}={updateDict[k]}") cls.mysqldb.execute( f"update hx_trade_delegate_record set {','.join(where_list)} where id='{result[0]}'") @classmethod def list_by_day(cls, day, min_update_time, orderStatus=[]): mysqldb = mysql_data.Mysqldb() try: where_list = [f"r.insertDate='{day}'"] if min_update_time: where_list.append(f"updateTime > '{min_update_time}'") if orderStatus: ss = " or ".join([f"orderStatus = {k}" for k in orderStatus]) where_list.append(f"({ss})") results = mysqldb.select_all( f"select * from hx_trade_delegate_record r where {' and '.join(where_list)} order by createTime") # 转dict key_list = ["id", "orderLocalID", "securityID", "securityName", "direction", "orderSysID", "insertTime", "insertDate", "acceptTime", "cancelTime", "limitPrice", "turnover", "volume", "volumeTraded", "orderStatus", "orderSubmitStatus", "statusMsg", "createTime", "updateTime", "accountID", "orderRef", "sinfo"] fresults = [] max_update_time = None if results: for r in results: temp = {} for i in range(len(r)): if type(r[i]) == datetime.datetime: temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S") else: temp[key_list[i]] = r[i] fresults.append(temp) if not max_update_time: max_update_time = temp["updateTime"] if r[18] > max_update_time: max_update_time = temp["updateTime"] return fresults, max_update_time finally: pass # 持仓记录 class PositionManager: __redisManager = redis_manager.RedisManager(2) latest_positions = [] @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() # 保存代码的量 @classmethod def __save_code_volume(cls, code, volume): RedisUtils.setex(cls.__get_redis(), f"available_position_{code}", tool.get_expire(), f"{volume}") @classmethod def get_code_volume(cls, code): val = RedisUtils.get(cls.__get_redis(), f"available_position_{code}") if not val: return 0 return int(val) @classmethod def cache(cls, datas): cls.latest_positions = copy.deepcopy(datas) @classmethod def add(cls, datas): mysqldb = mysql_data.Mysqldb() try: if datas: # 统计可用量 volume_dict = {} for d in datas: if d["securityID"] not in volume_dict: volume_dict[d["securityID"]] = 0 volume_dict[d["securityID"]] = volume_dict[d["securityID"]] + d["availablePosition"] for k in volume_dict: cls.__save_code_volume(k, volume_dict[k]) for d in datas: _id = f"{d['investorID']}-{d['tradingDay']}-{d['securityID']}" # 查询是否有数据 result = mysqldb.select_one( f"select * from hx_trade_position where id='{_id}'") if not result: # 新增数据 mysqldb.execute( "insert into hx_trade_position values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( _id, d["investorID"], d["tradingDay"], d["securityName"], d["securityID"], d["historyPos"], d["historyPosFrozen"], d["todayBSPos"], d["todayBSPosFrozen"], d["historyPosPrice"], d["totalPosCost"], d["prePosition"], d["availablePosition"], d["currentPosition"], d["openPosCost"], d["todayCommission"], d["todayTotalBuyAmount"], d["todayTotalSellAmount"], tool.get_now_datetime_str(), tool.get_now_datetime_str())) else: # 修改数据 updateDict = {} if result[5] != d['historyPos']: updateDict['historyPos'] = d['historyPos'] if result[6] != d['historyPosFrozen']: updateDict['historyPosFrozen'] = d['historyPosFrozen'] if result[7] != d['todayBSPos']: updateDict['todayBSPos'] = d['todayBSPos'] if result[8] != d['todayBSPosFrozen']: updateDict['todayBSPosFrozen'] = d['todayBSPosFrozen'] if result[9] != f"{d['historyPosPrice']}": updateDict['historyPosPrice'] = d['historyPosPrice'] if result[10] != f"{d['totalPosCost']}": updateDict['totalPosCost'] = d['totalPosCost'] if result[11] != d['prePosition']: updateDict['prePosition'] = d['prePosition'] if result[12] != d['availablePosition']: updateDict['availablePosition'] = d['availablePosition'] if result[13] != d['currentPosition']: updateDict['currentPosition'] = d['currentPosition'] if result[14] != f"{d['openPosCost']}": updateDict['openPosCost'] = d['openPosCost'] if result[15] != f"{d['todayCommission']}": updateDict['todayCommission'] = d['todayCommission'] if result[16] != f"{d['todayTotalBuyAmount']}": updateDict['todayTotalBuyAmount'] = d['todayTotalBuyAmount'] if result[17] != f"{d['todayTotalSellAmount']}": updateDict['todayTotalSellAmount'] = d['todayTotalSellAmount'] if updateDict: # 有更新数据 updateDict['updateTime'] = tool.get_now_datetime_str() where_list = [] for k in updateDict: if type(updateDict[k]) == str: where_list.append(f"{k}='{updateDict[k]}'") else: where_list.append(f"{k}={updateDict[k]}") mysqldb.execute( f"update hx_trade_position set {','.join(where_list)} where id='{result[0]}'") finally: pass @classmethod def list_by_day(cls, day): mysqldb = mysql_data.Mysqldb() try: results = mysqldb.select_all( f"select * from hx_trade_position r where r.tradingDay='{day}' order by createTime") # 转dict key_list = ["id", "investorID", "securityName", "securityID", "historyPos", "historyPosFrozen", "todayBSPos", "todayBSPosFrozen", "historyPosPrice", "totalPosCost", "prePosition", "availablePosition", "currentPosition", "openPosCost", "todayCommission", "todayTotalBuyAmount", "todayTotalSellAmount", "createTime", "updateTime"] fresults = [] if results: for r in results: temp = {} for i in range(len(r)): if type(r[i]) == datetime.datetime: temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S") else: temp[key_list[i]] = r[i] fresults.append(temp) return fresults finally: pass @classmethod def get_volume_by_code(cls, code): mysqldb = mysql_data.Mysqldb() mysqldb.select_one(f"select currentPosition from hx_trade_position where securityID='{code}'") # 获取持仓代码 @classmethod def get_position_codes(cls): codes = [] if cls.latest_positions: for d in cls.latest_positions: if d["prePosition"] <= 0: continue codes.append(d["securityID"]) return codes # 成交记录 class DealRecordManager: __latest_deal_trade_id_dict = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(DealRecordManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): fresults = cls.list_by_day(tool.get_now_date_str("%Y%m%d")) if fresults: for r in fresults: cls.__latest_deal_trade_id_dict[r["tradeID"]] = r def list_sell_by_code_cache(self, code): fresults = [] for k in self.__latest_deal_trade_id_dict: d = self.__latest_deal_trade_id_dict[k] if d["securityID"] != code: continue if int(d["direction"]) != 1: continue fresults.append(d) return fresults @classmethod def add(cls, datas): mysqldb = mysql_data.Mysqldb() try: if datas: for d in datas: cls.__latest_deal_trade_id_dict[d['tradeID']] = d # 查询是否有数据 result = mysqldb.select_one( f"select * from hx_trade_deal_record where tradeID='{d['tradeID']}'") if not result: # 新增数据 mysqldb.execute( "insert into hx_trade_deal_record values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( d["tradeID"], d["securityID"], d["orderLocalID"], d["direction"], d["orderSysID"], round(d["price"], 2), d["tradeTime"], d["volume"], d["tradeDate"], d["tradingDay"], d["pbuID"], d["accountID"], tool.get_now_datetime_str(), tool.get_now_datetime_str())) finally: pass @classmethod def list_by_day(cls, day): mysqldb = mysql_data.Mysqldb() try: results = mysqldb.select_all( f"select * from hx_trade_deal_record r where r.tradeDate='{day}' order by createTime") # 转dict key_list = ["tradeID", "securityID", "orderLocalID", "direction", "orderSysID", "price", "tradeTime", "volume", "tradeDate", "tradingDay", "pbuID", "accountID", "createTime", "updateTime"] fresults = [] if results: for r in results: temp = {} for i in range(len(r)): if type(r[i]) == datetime.datetime: temp[key_list[i]] = r[i].strftime("%Y-%m-%d %H:%M:%S") else: temp[key_list[i]] = r[i] fresults.append(temp) return fresults finally: pass return [] # 资金管理 class MoneyManager: __redisMananger = redis_manager.RedisManager(2) @classmethod def get_redis(cls): return cls.__redisMananger.getRedis() @classmethod def save_data(cls, data): RedisUtils.setex(cls.get_redis(), "huaxin_money", tool.get_expire(), json.dumps(data)) @classmethod def get_data(cls): val = RedisUtils.get(cls.get_redis(), "huaxin_money") if not val: return None return json.loads(val) # 交易订单号管理 class TradeOrderIdManager: __db = 2 __redisManager = RedisManager(2) __instance = None __huaxin_order_id_cache = {} __huaxin_order_ref_cache = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TradeOrderIdManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "huaxin_order_id-*") for k in keys: code = k.split("-")[-1] vals = RedisUtils.smembers(__redis, k) tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_id_cache, code, vals) keys = RedisUtils.keys(__redis, "huaxin_order_ref-*") for k in keys: code = k.split("-")[-1] vals = RedisUtils.smembers(__redis, k) tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_ref_cache, code, vals) finally: RedisUtils.realse(__redis) # 添加订单ID def add_order_ref(self, code, order_ref): self.add_order_refs(code, [order_ref]) def add_order_refs(self, code, order_ref_list): if code not in self.__huaxin_order_ref_cache: self.__huaxin_order_ref_cache[code] = set() for val in order_ref_list: self.__huaxin_order_ref_cache[code].add(val) RedisUtils.sadd_async(self.__db, f"huaxin_order_ref-{code}", val) RedisUtils.expire_async(self.__db, f"huaxin_order_ref-{code}", tool.get_expire()) # 删除订单ID def remove_order_ref(self, code, order_ref): val = order_ref if code in self.__huaxin_order_ref_cache: self.__huaxin_order_ref_cache[code].discard(val) RedisUtils.srem_async(self.__db, f"huaxin_order_ref-{code}", val) # 查询所有的订单号 def list_order_refs(self, code): return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_ref-{code}") def list_order_refs_cache(self, code): if code in self.__huaxin_order_ref_cache: return self.__huaxin_order_ref_cache[code] return set() # 添加订单ID def add_order_id(self, code, account_id, sys_order_id): val = json.dumps((account_id, sys_order_id)) if code not in self.__huaxin_order_id_cache: self.__huaxin_order_id_cache[code] = set() self.__huaxin_order_id_cache[code].add(val) RedisUtils.sadd_async(self.__db, f"huaxin_order_id-{code}", val) RedisUtils.expire_async(self.__db, f"huaxin_order_id-{code}", tool.get_expire()) # 删除订单ID def remove_order_id(self, code, account_id, order_id): val = json.dumps((account_id, order_id)) if code in self.__huaxin_order_id_cache: self.__huaxin_order_id_cache[code].discard(val) RedisUtils.srem_async(self.__db, f"huaxin_order_id-{code}", val) # 查询所有的订单号 def list_order_ids(self, code): return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_id-{code}") def list_order_ids_cache(self, code): if code in self.__huaxin_order_id_cache: return self.__huaxin_order_id_cache[code] return set() if __name__ == "__main__": print(DelegateRecordManager().list_current_delegates("600239"))