""" 交易方式模块(总览处理所有渠道的各种交易方法集合) """ import logging import multiprocessing import threading from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from log_module import async_log_util from strategy import data_cache, account_management import data_server from log_module.log import logger_debug, logger_common, logger_trade from strategy.trade_setting import TradeSetting from trade import huaxin_trade_api, huaxin_trade_data_update, middle_api_protocol from utils import huaxin_util, tool # 引入日志模块 # 获取logger实例 logger = logger_common # 下单买入函数(按金额,以限价买)【按金额买 基础版】 def buy_order_by_value(symbol, buy_order_value, sec_name, current_price): if symbol[-6:] in TodayBuyCodeManager().get_buy_codes(): logger.info(f"{symbol}已经下过单") return # 自动买 开关监听方法 if not TradeSetting().get_auto_buy(): # 暂停自动买 logger.info(f"在交易方法函数处 关闭了 自动买") return # 限制交易标的的单价范围 if current_price < 2.5 or current_price > 30: # 当前单价超出预设限制 logger.info(f"当前标的个股【{sec_name}】单价超出预设限制!预设值3 < current_price < 30,当前最新价{current_price}") return price = round(float(current_price), 2) volume = (int(buy_order_value / price) // 100) * 100 if volume < 100: volume = 100 # 调用笼子价计算工具计算下单价格 order_price = round(tool.get_buy_max_price(current_price), 2) buy_order = huaxin_trade_api.order(1, symbol[-6:], volume, order_price, blocking=True) logger.info(f"current_price===={current_price} order_price===={order_price}") logger.info(f"buy_order===={buy_order}") orderStatusMsg = buy_order['data'].get('orderStatusMsg', None) orderRef = buy_order['data'].get('orderRef', None) statusMsg = buy_order['data'].get('statusMsg', None) logger.info(f"orderStatusMsg==={orderStatusMsg}") logger.info(f"statusMsg==={statusMsg}") # orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功 if statusMsg is not None and statusMsg == '': TodayBuyCodeManager().place_order(symbol[-6:], orderRef) logger.info(f"买票 下单成功:【{sec_name}】") # 每一次成功下单后要更新一下 缓存 的持仓数据 account_management.position_management() logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}") # 调用资金查询函数 查看资金变化 account_management.finance_management() logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}") if symbol[-6:] in data_cache.account_positions_dict: logger.info(f"该股已经持仓==》{sec_name}") pass # 检测持仓信息中有无下单个股且有该个股的当前持仓,只有当前持仓数量不为0时,才认为交易成功 for i in data_cache.account_positions_dict: # print(i) if i['securityID'] == symbol[-6:]: # print(i['currentPosition']) if i['currentPosition'] == 0: logger.info(f"【{i['securityName']}】交易失败~") else: # 买票后添加 持仓代码集合 data_cache.position_symbols_set.add(symbol[-6:]) logger.info(f"【{i['securityName']}】交易成功!") # 下单买入函数(按可用资金的一定比例,在涨停价买)【按金额买 高级版】 def buy_order_by_part_value(part_of_value, symbol, available, today_limit_up_price, sec_name, index): """ :param symbol: 代码 :param available: 可用资金 :param part_of_value: 计划委买 账户余额的 比例 :param today_limit_up_price: 今日涨停价 :param sec_name: 公司名称 :param index: 持仓对象列表中的个股对应序列号 :return: 尝试返回的订单数据 """ logger.info(f"当前账户可用资金available==={available}") buy_order_value = round(available * part_of_value, 2) logger.info(f"当前计划比例==={part_of_value},当前委托金额==={buy_order_value}") # 只有持仓数量大于委卖数量才进入买票流程 if available >= buy_order_value: sell_order_by_volume(symbol, buy_order_value, sec_name, today_limit_up_price) logger.info(f"【十分之{part_of_value * 10}可用资金】委买完毕") data_cache.available = available - buy_order_value logger.info(f"买票执行成功:【{sec_name}】") logger.info(f"买票后剩余资金:{data_cache.account_finance_dict['usefulMoney']}") else: logger.info(f"【{sec_name}】,持仓:{available}小于计划委托:{part_of_value},委托失败!") # 下单卖出函数(按持仓数量,在限价卖)【按量卖 基础版】 def sell_order_by_volume(symbol, volume, sec_name, current_price): # 自动卖开关监听方法 if not TradeSetting().get_auto_sell(): # 暂停自动卖 logger.info(f"在交易方法函数处 关闭了 自动卖") return # price = round(float(price), 2) # 调用笼子价计算工具计算下单价格 order_price = tool.get_buy_min_price(current_price) sell_order = huaxin_trade_api.order(2, symbol[-6:], volume, order_price, blocking=True) logger.info(f"current_price===={current_price} order_price===={order_price}") logger.info(f"sell_order===={sell_order}") orderStatusMsg = sell_order['data'].get('orderStatusMsg', None) statusMsg = sell_order['data'].get('statusMsg', None) logger.info(f"orderStatusMsg==={orderStatusMsg}") logger.info(f"statusMsg==={statusMsg}") # orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功 if statusMsg is not None and statusMsg == '': logger.info(f"卖票 下单成功:【{sec_name}】") # 每一次成功下单后要更新一下 缓存 的持仓数据 account_management.position_management() logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}") # 调用资金查询函数 查看资金变化 account_management.finance_management() logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}") # 下单卖出函数(按持仓数量的一定比例,在跌停价卖)【按量卖 高级版】 def sell_order_by_part_volume(part_of_volume, symbol, position_volume_yesterday, current_price, sec_name, index): """ :param symbol: 代码 :param position_volume_yesterday: 可用持仓数量 :param part_of_volume: 计划委卖持仓量的比例 :param current_price: 当前最新价 :param sec_name: 公司名称 :param index: 持仓对象列表中的个股对应序列号 :return: 尝试返回的订单数据 """ logger.info( f"当前个股持仓手数【当前函数被调用时传进来的同步数据data_cache中的持仓数据】==={position_volume_yesterday}") # sell_order_volume = int(position_volume_yesterday * part_of_volume) sell_order_volume = round(position_volume_yesterday * part_of_volume / 100) * 100 logger.info(f"当前计划比例==={part_of_volume},当前委托量==={sell_order_volume}") # 当委托量大于0 if sell_order_volume > 0: # 只有持仓数量大于委卖数量才进入买票流程 if position_volume_yesterday >= sell_order_volume: sell_order_by_volume(symbol, sell_order_volume, sec_name, current_price) logger.info(f"【十分之 {round(part_of_volume * 10)} 仓】委卖完毕") # 计算并更新剩余可用持仓数量 # data_cache.account_positions[index]['volume'] = position_volume_yesterday - sell_order_volume if data_cache.account_positions_dict[index]['currentPosition'] <= 0: logger.info(f"data_cache.account_positions == {data_cache.account_positions_dict}") logger.info( f"下单后,【{sec_name}】的剩余可用持仓数量==={data_cache.account_positions_dict[index]['currentPosition']}") # 本票本次卖票,可用仓位为0或小于0,,移除【可用持仓代码】集合 ''' 全局变量中的可用个股数量,由于只在【集合竞价】阶段用,如果移除会影响进入次数,暂不考虑使用 ''' # data_cache.available_symbols_set.remove(symbol) logger.info(f"【{sec_name}】当日可用仓位数卖完了") logger.info(f"卖后可用持仓票数:::{len(data_cache.available_symbols_set)}") else: logger.info( f"【{sec_name}】,可用持仓:{position_volume_yesterday}小于计划委托:{sell_order_volume},无法委托,直接平仓!") sell_order_by_volume(symbol, position_volume_yesterday, sec_name, current_price) # 计算并更新剩余可用持仓数量 # data_cache.account_positions_dict[index]['currentPosition'] = position_volume_yesterday - position_volume_yesterday else: logger.info(f"委托量小于等于零,委托失败!") logger.info( f"【{sec_name}】,可用持仓:{position_volume_yesterday},计划委托:{sell_order_volume}<=0 ?,无法委托,直接委卖100!") sell_order_by_volume(symbol, 100, sec_name, current_price) def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r): class MyTradeCallback(huaxin_trade_api.TradeCallback): def on_order(self, order_info): """ 订单状态改变回调 :param order_info: {'sinfo': 'b_603682_1736312765623', 'securityID': '603682', 'orderLocalID': '8100043081', 'direction': '0', 'orderSysID': '110018100043081', 'insertTime': '13:06:04', 'insertDate': '20250108', 'acceptTime': '13:05:46', 'cancelTime': '', 'limitPrice': 6.45, 'accountID': '00032047', 'orderRef': 130608, 'turnover': 6410.0, 'volume': 1000, 'volumeTraded': 1000, 'orderStatus': '4', 'orderSubmitStatus': '1', 'statusMsg': ''} :return: """ print(f"收到订单回调:{order_info}") async_log_util.info(logger_debug, f"收到订单回调:{order_info}") if huaxin_util.is_deal(order_info['orderStatus']): if order_info["direction"] == '0': # 买入成交 TodayBuyCodeManager().add_deal_code(order_info["securityID"], order_info.get("orderRef")) # 成交,需要更新持仓/委托/成交 huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_delegate_list("成交") huaxin_trade_data_update.add_deal_list() else: huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_delegate_list("订单状态变化") # 推送订单数据 threading.Thread(target=lambda: middle_api_protocol.push( middle_api_protocol.load_push_msg({"type": "order", "data": order_info})), daemon=True).start() huaxin_trade_api.run_trade(queue_strategy_r_trade_w, MyTradeCallback(), queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r) threading.Thread(target=data_server.run, daemon=True).start() @tool.singleton class TodayBuyCodeManager: """ 今日买入代码管理类 """ __db = 0 redisManager = redis_manager.RedisManager(0) def __init__(self): # 挂单中的代码 self.delegating_codes_info = {} self.deal_codes = set() self.__load_data() @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def __load_data(self): """ 加载数据 :return: """ codes = RedisUtils.smembers(self.__get_redis(), "buy_deal_codes") if codes: self.deal_codes = set(codes) def add_deal_code(self, code, order_ref=None): """ 添加买入成交的代码 :param order_ref: :param code: :return: """ if code in self.deal_codes: return async_log_util.info(logger_trade, f"买入成交:{code}") self.deal_codes.add(code) if order_ref and order_ref in self.delegating_codes_info: del self.delegating_codes_info[order_ref] RedisUtils.sadd_async(self.__db, "buy_deal_codes", code) RedisUtils.expire_async(self.__db, "buy_deal_codes", tool.get_expire()) def place_order(self, code, order_ref): """ 下单 :param code: 代码 :param order_ref: 索引 :return: """ async_log_util.info(logger_trade, f"下单:{code}-{order_ref}") self.delegating_codes_info[order_ref] = code def get_buy_codes(self): """ 获取买入的代码:成交代码+委托 :return: """ codes = set() if self.deal_codes: codes |= self.deal_codes if self.delegating_codes_info: codes |= set(self.delegating_codes_info.values()) return codes def buy_fail(self, order_ref): """ 买入失败 :param order_ref: :return: """ async_log_util.info(logger_trade, f"下单失败:{order_ref}") if order_ref in self.delegating_codes_info: del self.delegating_codes_info[order_ref] if __name__ == '__main__': # 测试代码 print(TodayBuyCodeManager().get_buy_codes())