""" 交易方式模块(总览处理所有渠道的各种交易方法集合) """ import logging import multiprocessing import threading from strategy import data_cache, account_management import data_server from log_module.log import logger_debug, logger_common from trade import huaxin_trade_api, huaxin_trade_data_update, middle_api_protocol from utils import huaxin_util, tool # 引入日志模块 # 获取logger实例 logger = logger_common # 下单买入函数(按金额,以限价买)【按金额买 基础版】 def buy_order_by_value(symbol, buy_order_value, sec_name, current_price): price = round(float(current_price), 2) volume = (int(buy_order_value / price) // 100) * 100 if volume < 100: volume = 100 # 调用笼子价计算工具计算下单价格 order_price = tool.get_buy_max_price(current_price) buy_order = huaxin_trade_api.order(1, symbol[-6:], volume, order_price, blocking=True) logger.info(f"current_price===={current_price} order_price===={order_price}") logger.info(f"buy_order===={buy_order}") orderStatusMsg = buy_order['data'].get('orderStatusMsg', None) statusMsg = buy_order['data'].get('statusMsg', None) logger.info(f"orderStatusMsg==={orderStatusMsg}") logger.info(f"statusMsg==={statusMsg}") # orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功 if statusMsg is not None and statusMsg == '': logger.info(f"买票 下单成功:【{sec_name}】") # 每一次成功下单后要更新一下 缓存 的持仓数据 account_management.position_management() logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}") # 调用资金查询函数 查看资金变化 account_management.finance_management() logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}") # 买票后添加 持仓代码集合 data_cache.position_symbols_set.add(symbol) # 买票后添加 今日新增持仓代码集合 data_cache.addition_position_symbols_set.add(symbol) logger.info(f"当前持仓数量:::{len(data_cache.position_symbols_set)}") logger.info(f"今日新增持仓数量:::{len(data_cache.addition_position_symbols_set)}") # 下单买入函数(按可用资金的一定比例,在涨停价买)【按金额买 高级版】 def buy_order_by_part_value(part_of_value, symbol, available, today_limit_up_price, sec_name, index): """ :param symbol: 代码 :param available: 可用资金 :param part_of_value: 计划委买 账户余额的 比例 :param today_limit_up_price: 今日涨停价 :param sec_name: 公司名称 :param index: 持仓对象列表中的个股对应序列号 :return: 尝试返回的订单数据 """ logger.info(f"当前账户可用资金available==={available}") buy_order_value = round(available * part_of_value, 2) logger.info(f"当前计划比例==={part_of_value},当前委托金额==={buy_order_value}") # 只有持仓数量大于委卖数量才进入买票流程 if available >= buy_order_value: sell_order_by_volume(symbol, buy_order_value, sec_name, today_limit_up_price) logger.info(f"【十分之{part_of_value * 10}可用资金】委买完毕") data_cache.available = available - buy_order_value logger.info(f"买票执行成功:【{sec_name}】") logger.info(f"买票后剩余资金:{data_cache.account_finance_dict['usefulMoney']}") else: logger.info(f"【{sec_name}】,持仓:{available}小于计划委托:{part_of_value},委托失败!") # 下单卖出函数(按持仓数量,在限价卖)【按量卖 基础版】 def sell_order_by_volume(symbol, volume, sec_name, current_price): # price = round(float(price), 2) # 调用笼子价计算工具计算下单价格 order_price = tool.get_buy_min_price(current_price) sell_order = huaxin_trade_api.order(2, symbol[-6:], volume, order_price, blocking=True) logger.info(f"current_price===={current_price} order_price===={order_price}") logger.info(f"sell_order===={sell_order}") orderStatusMsg = sell_order['data'].get('orderStatusMsg', None) statusMsg = sell_order['data'].get('statusMsg', None) logger.info(f"orderStatusMsg==={orderStatusMsg}") logger.info(f"statusMsg==={statusMsg}") # orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功 if statusMsg is not None and statusMsg == '': logger.info(f"卖票 下单成功:【{sec_name}】") # 每一次成功下单后要更新一下 缓存 的持仓数据 account_management.position_management() logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}") # 调用资金查询函数 查看资金变化 account_management.finance_management() logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}") # 下单卖出函数(按持仓数量的一定比例,在跌停价卖)【按量卖 高级版】 def sell_order_by_part_volume(part_of_volume, symbol, position_volume_yesterday, current_price, sec_name, index): """ :param symbol: 代码 :param position_volume_yesterday: 可用持仓数量 :param part_of_volume: 计划委卖持仓量的比例 :param current_price: 当前最新价 :param sec_name: 公司名称 :param index: 持仓对象列表中的个股对应序列号 :return: 尝试返回的订单数据 """ logger.info(f"当前个股持仓手数【当前函数被调用时传进来的同步数据data_cache中的持仓数据】==={position_volume_yesterday}") # sell_order_volume = int(position_volume_yesterday * part_of_volume) sell_order_volume = round(position_volume_yesterday * part_of_volume / 100) * 100 logger.info(f"当前计划比例==={part_of_volume},当前委托量==={sell_order_volume}") # 当委托量大于0 if sell_order_volume > 0: # 只有持仓数量大于委卖数量才进入买票流程 if position_volume_yesterday >= sell_order_volume: sell_order_by_volume(symbol, sell_order_volume, sec_name, current_price) logger.info(f"【十分之 {round(part_of_volume * 10)} 仓】委卖完毕") # todo data_cache.account_positions 数据内容已经变化需重新写,似乎不用计算剩余持仓量,直接请求就行了 # 计算并更新剩余可用持仓数量 # data_cache.account_positions[index]['volume'] = position_volume_yesterday - sell_order_volume if data_cache.account_positions_dict[index]['currentPosition'] <= 0: logger.info(f"data_cache.account_positions == {data_cache.account_positions_dict}") logger.info(f"下单后,【{sec_name}】的剩余可用持仓数量==={data_cache.account_positions_dict[index]['currentPosition']}") # 本票本次卖票,可用仓位为0或小于0,,移除【可用持仓代码】集合 ''' 全局变量中的可用个股数量,由于只在【集合竞价】阶段用,如果移除会影响进入次数,暂不考虑使用 ''' # data_cache.available_symbols_set.remove(symbol) logger.info(f"【{sec_name}】当日可用仓位数卖完了") logger.info(f"卖后可用持仓票数:::{len(data_cache.available_symbols_set)}") else: logger.info( f"【{sec_name}】,可用持仓:{position_volume_yesterday}小于计划委托:{sell_order_volume},无法委托,直接平仓!") sell_order_by_volume(symbol, position_volume_yesterday, sec_name, current_price) # 计算并更新剩余可用持仓数量 # data_cache.account_positions_dict[index]['currentPosition'] = position_volume_yesterday - position_volume_yesterday else: logger.info(f"委托量小于等于零,委托失败!") logger.info( f"【{sec_name}】,可用持仓:{position_volume_yesterday},计划委托:{sell_order_volume}<=0 ?,无法委托,直接委卖100!") sell_order_by_volume(symbol, 100, sec_name, current_price) def run(): class MyTradeCallback(huaxin_trade_api.TradeCallback): def on_order(self, order_info): """ 订单状态改变回调 :param order_info: {'sinfo': 'b_603682_1736312765623', 'securityID': '603682', 'orderLocalID': '8100043081', 'direction': '0', 'orderSysID': '110018100043081', 'insertTime': '13:06:04', 'insertDate': '20250108', 'acceptTime': '13:05:46', 'cancelTime': '', 'limitPrice': 6.45, 'accountID': '00032047', 'orderRef': 130608, 'turnover': 6410.0, 'volume': 1000, 'volumeTraded': 1000, 'orderStatus': '4', 'orderSubmitStatus': '1', 'statusMsg': ''} :return: """ print(f"收到订单回调:{order_info}") logger_debug.info(f"收到订单回调:{order_info}") if huaxin_util.is_deal(order_info['orderStatus']): # 成交,需要更新持仓/委托/成交 huaxin_trade_data_update.add_position_list() huaxin_trade_data_update.add_delegate_list("成交") huaxin_trade_data_update.add_deal_list() else: huaxin_trade_data_update.add_money_list() huaxin_trade_data_update.add_delegate_list("订单状态变化") # 推送订单数据 threading.Thread(target=lambda: middle_api_protocol.push( middle_api_protocol.load_push_msg({"type": "order", "data": order_info})), daemon=True).start() queue = multiprocessing.Queue() huaxin_trade_api.run_trade(queue, MyTradeCallback()) threading.Thread(target=data_server.run, daemon=True).start()