"""
|
交易方式模块(总览处理所有渠道的各种交易方法集合)
|
"""
|
import logging
|
import multiprocessing
|
import threading
|
|
from db import redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from log_module import async_log_util
|
from strategy import data_cache, account_management
|
import data_server
|
from log_module.log import logger_debug, logger_common, logger_trade
|
from strategy.trade_setting import TradeSetting
|
|
from trade import huaxin_trade_api, huaxin_trade_data_update, middle_api_protocol
|
from utils import huaxin_util, tool
|
|
# 引入日志模块
|
# 获取logger实例
|
logger = logger_common
|
|
|
# 下单买入函数(按金额,以限价买)【按金额买 基础版】
|
def buy_order_by_value(symbol, buy_order_value, sec_name, current_price):
|
if symbol[-6:] in TodayBuyCodeManager().get_buy_codes():
|
logger.info(f"{symbol}已经下过单")
|
return
|
|
# 自动买 开关监听方法
|
if not TradeSetting().get_auto_buy():
|
# 暂停自动买
|
logger.info(f"在交易方法函数处 关闭了 自动买")
|
return
|
# 限制交易标的的单价范围
|
if current_price < 2.5 or current_price > 30:
|
# 当前单价超出预设限制
|
logger.info(f"当前标的个股【{sec_name}】单价超出预设限制!预设值3 < current_price < 30,当前最新价{current_price}")
|
return
|
price = round(float(current_price), 2)
|
volume = (int(buy_order_value / price) // 100) * 100
|
if volume < 100:
|
volume = 100
|
# 调用笼子价计算工具计算下单价格
|
order_price = round(tool.get_buy_max_price(current_price), 2)
|
buy_order = huaxin_trade_api.order(1, symbol[-6:], volume, order_price, blocking=True)
|
logger.info(f"current_price===={current_price} order_price===={order_price}")
|
logger.info(f"buy_order===={buy_order}")
|
orderStatusMsg = buy_order['data'].get('orderStatusMsg', None)
|
orderRef = buy_order['data'].get('orderRef', None)
|
statusMsg = buy_order['data'].get('statusMsg', None)
|
logger.info(f"orderStatusMsg==={orderStatusMsg}")
|
logger.info(f"statusMsg==={statusMsg}")
|
# orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功
|
if statusMsg is not None and statusMsg == '':
|
TodayBuyCodeManager().place_order(symbol[-6:], orderRef)
|
logger.info(f"买票 下单成功:【{sec_name}】")
|
# 每一次成功下单后要更新一下 缓存 的持仓数据
|
account_management.position_management()
|
logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}")
|
# 调用资金查询函数 查看资金变化
|
account_management.finance_management()
|
logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}")
|
if symbol[-6:] in data_cache.account_positions_dict:
|
logger.info(f"该股已经持仓==》{sec_name}")
|
pass
|
|
# 检测持仓信息中有无下单个股且有该个股的当前持仓,只有当前持仓数量不为0时,才认为交易成功
|
for i in data_cache.account_positions_dict:
|
# print(i)
|
if i['securityID'] == symbol[-6:]:
|
# print(i['currentPosition'])
|
if i['currentPosition'] == 0:
|
logger.info(f"【{i['securityName']}】交易失败~")
|
else:
|
# 买票后添加 持仓代码集合
|
data_cache.position_symbols_set.add(symbol[-6:])
|
logger.info(f"【{i['securityName']}】交易成功!")
|
|
|
# 下单买入函数(按可用资金的一定比例,在涨停价买)【按金额买 高级版】
|
def buy_order_by_part_value(part_of_value, symbol, available, today_limit_up_price, sec_name, index):
|
"""
|
:param symbol: 代码
|
:param available: 可用资金
|
:param part_of_value: 计划委买 账户余额的 比例
|
:param today_limit_up_price: 今日涨停价
|
:param sec_name: 公司名称
|
:param index: 持仓对象列表中的个股对应序列号
|
:return: 尝试返回的订单数据
|
"""
|
logger.info(f"当前账户可用资金available==={available}")
|
buy_order_value = round(available * part_of_value, 2)
|
logger.info(f"当前计划比例==={part_of_value},当前委托金额==={buy_order_value}")
|
# 只有持仓数量大于委卖数量才进入买票流程
|
if available >= buy_order_value:
|
sell_order_by_volume(symbol, buy_order_value, sec_name, today_limit_up_price)
|
logger.info(f"【十分之{part_of_value * 10}可用资金】委买完毕")
|
data_cache.available = available - buy_order_value
|
logger.info(f"买票执行成功:【{sec_name}】")
|
logger.info(f"买票后剩余资金:{data_cache.account_finance_dict['usefulMoney']}")
|
else:
|
logger.info(f"【{sec_name}】,持仓:{available}小于计划委托:{part_of_value},委托失败!")
|
|
|
# 下单卖出函数(按持仓数量,在限价卖)【按量卖 基础版】
|
def sell_order_by_volume(symbol, volume, sec_name, current_price):
|
# 自动卖开关监听方法
|
if not TradeSetting().get_auto_sell():
|
# 暂停自动卖
|
logger.info(f"在交易方法函数处 关闭了 自动卖")
|
return
|
# price = round(float(price), 2)
|
# 调用笼子价计算工具计算下单价格
|
order_price = tool.get_buy_min_price(current_price)
|
sell_order = huaxin_trade_api.order(2, symbol[-6:], volume, order_price, blocking=True)
|
logger.info(f"current_price===={current_price} order_price===={order_price}")
|
logger.info(f"sell_order===={sell_order}")
|
orderStatusMsg = sell_order['data'].get('orderStatusMsg', None)
|
statusMsg = sell_order['data'].get('statusMsg', None)
|
logger.info(f"orderStatusMsg==={orderStatusMsg}")
|
logger.info(f"statusMsg==={statusMsg}")
|
# orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功
|
if statusMsg is not None and statusMsg == '':
|
logger.info(f"卖票 下单成功:【{sec_name}】")
|
# 每一次成功下单后要更新一下 缓存 的持仓数据
|
account_management.position_management()
|
logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}")
|
# 调用资金查询函数 查看资金变化
|
account_management.finance_management()
|
logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}")
|
|
|
# 下单卖出函数(按持仓数量的一定比例,在跌停价卖)【按量卖 高级版】
|
def sell_order_by_part_volume(part_of_volume, symbol, position_volume_yesterday, current_price, sec_name,
|
index):
|
"""
|
:param symbol: 代码
|
:param position_volume_yesterday: 可用持仓数量
|
:param part_of_volume: 计划委卖持仓量的比例
|
:param current_price: 当前最新价
|
:param sec_name: 公司名称
|
:param index: 持仓对象列表中的个股对应序列号
|
:return: 尝试返回的订单数据
|
"""
|
|
logger.info(
|
f"当前个股持仓手数【当前函数被调用时传进来的同步数据data_cache中的持仓数据】==={position_volume_yesterday}")
|
# sell_order_volume = int(position_volume_yesterday * part_of_volume)
|
sell_order_volume = round(position_volume_yesterday * part_of_volume / 100) * 100
|
logger.info(f"当前计划比例==={part_of_volume},当前委托量==={sell_order_volume}")
|
|
# 当委托量大于0
|
if sell_order_volume > 0:
|
# 只有持仓数量大于委卖数量才进入买票流程
|
if position_volume_yesterday >= sell_order_volume:
|
sell_order_by_volume(symbol, sell_order_volume, sec_name, current_price)
|
logger.info(f"【十分之 {round(part_of_volume * 10)} 仓】委卖完毕")
|
# 计算并更新剩余可用持仓数量
|
# data_cache.account_positions[index]['volume'] = position_volume_yesterday - sell_order_volume
|
if data_cache.account_positions_dict[index]['currentPosition'] <= 0:
|
logger.info(f"data_cache.account_positions == {data_cache.account_positions_dict}")
|
logger.info(
|
f"下单后,【{sec_name}】的剩余可用持仓数量==={data_cache.account_positions_dict[index]['currentPosition']}")
|
# 本票本次卖票,可用仓位为0或小于0,,移除【可用持仓代码】集合
|
'''
|
全局变量中的可用个股数量,由于只在【集合竞价】阶段用,如果移除会影响进入次数,暂不考虑使用
|
'''
|
# data_cache.available_symbols_set.remove(symbol)
|
logger.info(f"【{sec_name}】当日可用仓位数卖完了")
|
logger.info(f"卖后可用持仓票数:::{len(data_cache.available_symbols_set)}")
|
else:
|
logger.info(
|
f"【{sec_name}】,可用持仓:{position_volume_yesterday}小于计划委托:{sell_order_volume},无法委托,直接平仓!")
|
sell_order_by_volume(symbol, position_volume_yesterday, sec_name, current_price)
|
# 计算并更新剩余可用持仓数量
|
# data_cache.account_positions_dict[index]['currentPosition'] = position_volume_yesterday - position_volume_yesterday
|
else:
|
logger.info(f"委托量小于等于零,委托失败!")
|
logger.info(
|
f"【{sec_name}】,可用持仓:{position_volume_yesterday},计划委托:{sell_order_volume}<=0 ?,无法委托,直接委卖100!")
|
sell_order_by_volume(symbol, 100, sec_name, current_price)
|
|
|
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r):
|
class MyTradeCallback(huaxin_trade_api.TradeCallback):
|
def on_order(self, order_info):
|
"""
|
订单状态改变回调
|
:param order_info: {'sinfo': 'b_603682_1736312765623', 'securityID': '603682', 'orderLocalID': '8100043081', 'direction': '0', 'orderSysID': '110018100043081', 'insertTime': '13:06:04', 'insertDate': '20250108', 'acceptTime': '13:05:46', 'cancelTime': '', 'limitPrice': 6.45, 'accountID': '00032047', 'orderRef': 130608, 'turnover': 6410.0, 'volume': 1000, 'volumeTraded': 1000, 'orderStatus': '4', 'orderSubmitStatus': '1', 'statusMsg': ''}
|
:return:
|
"""
|
print(f"收到订单回调:{order_info}")
|
async_log_util.info(logger_debug, f"收到订单回调:{order_info}")
|
if huaxin_util.is_deal(order_info['orderStatus']):
|
if order_info["direction"] == '0':
|
# 买入成交
|
TodayBuyCodeManager().add_deal_code(order_info["securityID"], order_info.get("orderRef"))
|
# 成交,需要更新持仓/委托/成交
|
huaxin_trade_data_update.add_position_list()
|
huaxin_trade_data_update.add_delegate_list("成交")
|
huaxin_trade_data_update.add_deal_list()
|
else:
|
huaxin_trade_data_update.add_money_list()
|
huaxin_trade_data_update.add_delegate_list("订单状态变化")
|
# 推送订单数据
|
threading.Thread(target=lambda: middle_api_protocol.push(
|
middle_api_protocol.load_push_msg({"type": "order", "data": order_info})), daemon=True).start()
|
|
huaxin_trade_api.run_trade(queue_strategy_r_trade_w, MyTradeCallback(), queue_strategy_w_trade_r,
|
queue_strategy_w_trade_for_query_r)
|
threading.Thread(target=data_server.run, daemon=True).start()
|
|
|
@tool.singleton
|
class TodayBuyCodeManager:
|
"""
|
今日买入代码管理类
|
"""
|
__db = 0
|
redisManager = redis_manager.RedisManager(0)
|
|
def __init__(self):
|
# 挂单中的代码
|
self.delegating_codes_info = {}
|
self.deal_codes = set()
|
self.__load_data()
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
def __load_data(self):
|
"""
|
加载数据
|
:return:
|
"""
|
codes = RedisUtils.smembers(self.__get_redis(), "buy_deal_codes")
|
if codes:
|
self.deal_codes = set(codes)
|
|
def add_deal_code(self, code, order_ref=None):
|
"""
|
添加买入成交的代码
|
:param order_ref:
|
:param code:
|
:return:
|
"""
|
if code in self.deal_codes:
|
return
|
async_log_util.info(logger_trade, f"买入成交:{code}")
|
self.deal_codes.add(code)
|
if order_ref and order_ref in self.delegating_codes_info:
|
del self.delegating_codes_info[order_ref]
|
RedisUtils.sadd_async(self.__db, "buy_deal_codes", code)
|
RedisUtils.expire_async(self.__db, "buy_deal_codes", tool.get_expire())
|
|
def place_order(self, code, order_ref):
|
"""
|
下单
|
:param code: 代码
|
:param order_ref: 索引
|
:return:
|
"""
|
async_log_util.info(logger_trade, f"下单:{code}-{order_ref}")
|
self.delegating_codes_info[order_ref] = code
|
|
def get_buy_codes(self):
|
"""
|
获取买入的代码:成交代码+委托
|
:return:
|
"""
|
codes = set()
|
if self.deal_codes:
|
codes |= self.deal_codes
|
if self.delegating_codes_info:
|
codes |= set(self.delegating_codes_info.values())
|
return codes
|
|
def buy_fail(self, order_ref):
|
"""
|
买入失败
|
:param order_ref:
|
:return:
|
"""
|
async_log_util.info(logger_trade, f"下单失败:{order_ref}")
|
if order_ref in self.delegating_codes_info:
|
del self.delegating_codes_info[order_ref]
|
|
|
if __name__ == '__main__':
|
# 测试代码
|
print(TodayBuyCodeManager().get_buy_codes())
|