1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
 交易方式模块(总览处理所有渠道的各种交易方法集合)
"""
import logging
import multiprocessing
import threading
 
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from strategy import data_cache, account_management
import data_server
from log_module.log import logger_debug, logger_common, logger_trade
from strategy.trade_setting import TradeSetting
 
from trade import huaxin_trade_api, huaxin_trade_data_update, middle_api_protocol
from utils import huaxin_util, tool
 
# 引入日志模块
# 获取logger实例
logger = logger_common
 
 
# 下单买入函数(按金额,以限价买)【按金额买 基础版】
def buy_order_by_value(symbol, buy_order_value, sec_name, current_price):
    if symbol[-6:] in TodayBuyCodeManager().get_buy_codes():
        logger.info(f"{symbol}已经下过单")
        return
 
    # 自动买 开关监听方法
    if not TradeSetting().get_auto_buy():
        # 暂停自动买
        logger.info(f"在交易方法函数处 关闭了 自动买")
        return
    # 限制交易标的的单价范围
    if current_price < 3 or current_price > 30:
        # 当前单价超出预设限制
        logger.info(f"当前标的个股【{sec_name}】单价超出预设限制!预设值3 < current_price < 30,当前最新价{current_price}")
        return
    price = round(float(current_price), 2)
    volume = (int(buy_order_value / price) // 100) * 100
    if volume < 100:
        volume = 100
    # 调用笼子价计算工具计算下单价格
    order_price = round(tool.get_buy_max_price(current_price), 2)
    buy_order = huaxin_trade_api.order(1, symbol[-6:], volume, order_price, blocking=True)
    logger.info(f"current_price===={current_price}    order_price===={order_price}")
    logger.info(f"buy_order===={buy_order}")
    orderStatusMsg = buy_order['data'].get('orderStatusMsg', None)
    orderRef = buy_order['data'].get('orderRef', None)
    statusMsg = buy_order['data'].get('statusMsg', None)
    logger.info(f"orderStatusMsg==={orderStatusMsg}")
    logger.info(f"statusMsg==={statusMsg}")
    # orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功
    if statusMsg is not None and statusMsg == '':
        TodayBuyCodeManager().place_order(symbol[-6:], orderRef)
        logger.info(f"买票 下单成功:【{sec_name}】")
        # 每一次成功下单后要更新一下 缓存 的持仓数据
        account_management.position_management()
        logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}")
        # 调用资金查询函数 查看资金变化
        account_management.finance_management()
        logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}")
        if symbol[-6:] in data_cache.account_positions_dict:
            logger.info(f"该股已经持仓==》{sec_name}")
            pass
 
        # 检测持仓信息中有无下单个股且有该个股的当前持仓,只有当前持仓数量不为0时,才认为交易成功
        for i in data_cache.account_positions_dict:
            # print(i)
            if i['securityID'] == symbol[-6:]:
                # print(i['currentPosition'])
                if i['currentPosition'] == 0:
                    logger.info(f"【{i['securityName']}】交易失败~")
                else:
                    # 买票后添加 持仓代码集合
                    data_cache.position_symbols_set.add(symbol[-6:])
                    logger.info(f"【{i['securityName']}】交易成功!")
 
 
# 下单买入函数(按可用资金的一定比例,在涨停价买)【按金额买 高级版】
def buy_order_by_part_value(part_of_value, symbol, available, today_limit_up_price, sec_name, index):
    """
    :param symbol: 代码
    :param available: 可用资金
    :param part_of_value: 计划委买 账户余额的 比例
    :param today_limit_up_price: 今日涨停价
    :param sec_name: 公司名称
    :param index:  持仓对象列表中的个股对应序列号
    :return:  尝试返回的订单数据
    """
    logger.info(f"当前账户可用资金available==={available}")
    buy_order_value = round(available * part_of_value, 2)
    logger.info(f"当前计划比例==={part_of_value},当前委托金额==={buy_order_value}")
    # 只有持仓数量大于委卖数量才进入买票流程
    if available >= buy_order_value:
        sell_order_by_volume(symbol, buy_order_value, sec_name, today_limit_up_price)
        logger.info(f"【十分之{part_of_value * 10}可用资金】委买完毕")
        data_cache.available = available - buy_order_value
        logger.info(f"买票执行成功:【{sec_name}】")
        logger.info(f"买票后剩余资金:{data_cache.account_finance_dict['usefulMoney']}")
    else:
        logger.info(f"【{sec_name}】,持仓:{available}小于计划委托:{part_of_value},委托失败!")
 
 
# 下单卖出函数(按持仓数量,在限价卖)【按量卖 基础版】
def sell_order_by_volume(symbol, volume, sec_name, current_price):
    # 自动卖开关监听方法
    if not TradeSetting().get_auto_sell():
        # 暂停自动卖
        logger.info(f"在交易方法函数处 关闭了 自动卖")
        return
    # price = round(float(price), 2)
    # 调用笼子价计算工具计算下单价格
    order_price = tool.get_buy_min_price(current_price)
    sell_order = huaxin_trade_api.order(2, symbol[-6:], volume, order_price, blocking=True)
    logger.info(f"current_price===={current_price}    order_price===={order_price}")
    logger.info(f"sell_order===={sell_order}")
    orderStatusMsg = sell_order['data'].get('orderStatusMsg', None)
    statusMsg = sell_order['data'].get('statusMsg', None)
    logger.info(f"orderStatusMsg==={orderStatusMsg}")
    logger.info(f"statusMsg==={statusMsg}")
    # orderStatusMsg 不在buy_order的下单回调中,那么才认为下单成功
    if statusMsg is not None and statusMsg == '':
        logger.info(f"卖票 下单成功:【{sec_name}】")
        # 每一次成功下单后要更新一下 缓存 的持仓数据
        account_management.position_management()
        logger.info(f"更新的持仓数据data_cache.account_positions_dict=={data_cache.account_positions_dict}")
        # 调用资金查询函数 查看资金变化
        account_management.finance_management()
        logger.info(f"更新的资金数据data_cache.account_finance_dict=={data_cache.account_finance_dict}")
 
 
# 下单卖出函数(按持仓数量的一定比例,在跌停价卖)【按量卖 高级版】
def sell_order_by_part_volume(part_of_volume, symbol, position_volume_yesterday, current_price, sec_name,
                              index):
    """
    :param symbol: 代码
    :param position_volume_yesterday: 可用持仓数量
    :param part_of_volume: 计划委卖持仓量的比例
    :param current_price: 当前最新价
    :param sec_name: 公司名称
    :param index:  持仓对象列表中的个股对应序列号
    :return:  尝试返回的订单数据
    """
 
    logger.info(
        f"当前个股持仓手数【当前函数被调用时传进来的同步数据data_cache中的持仓数据】==={position_volume_yesterday}")
    # sell_order_volume = int(position_volume_yesterday * part_of_volume)
    sell_order_volume = round(position_volume_yesterday * part_of_volume / 100) * 100
    logger.info(f"当前计划比例==={part_of_volume},当前委托量==={sell_order_volume}")
 
    # 当委托量大于0
    if sell_order_volume > 0:
        # 只有持仓数量大于委卖数量才进入买票流程
        if position_volume_yesterday >= sell_order_volume:
            sell_order_by_volume(symbol, sell_order_volume, sec_name, current_price)
            logger.info(f"【十分之 {round(part_of_volume * 10)} 仓】委卖完毕")
            # 计算并更新剩余可用持仓数量
            # data_cache.account_positions[index]['volume'] = position_volume_yesterday - sell_order_volume
            if data_cache.account_positions_dict[index]['currentPosition'] <= 0:
                logger.info(f"data_cache.account_positions == {data_cache.account_positions_dict}")
                logger.info(
                    f"下单后,【{sec_name}】的剩余可用持仓数量==={data_cache.account_positions_dict[index]['currentPosition']}")
                # 本票本次卖票,可用仓位为0或小于0,,移除【可用持仓代码】集合
                '''
                全局变量中的可用个股数量,由于只在【集合竞价】阶段用,如果移除会影响进入次数,暂不考虑使用
                '''
                # data_cache.available_symbols_set.remove(symbol)
                logger.info(f"【{sec_name}】当日可用仓位数卖完了")
                logger.info(f"卖后可用持仓票数:::{len(data_cache.available_symbols_set)}")
        else:
            logger.info(
                f"【{sec_name}】,可用持仓:{position_volume_yesterday}小于计划委托:{sell_order_volume},无法委托,直接平仓!")
            sell_order_by_volume(symbol, position_volume_yesterday, sec_name, current_price)
            # 计算并更新剩余可用持仓数量
            # data_cache.account_positions_dict[index]['currentPosition'] = position_volume_yesterday - position_volume_yesterday
    else:
        logger.info(f"委托量小于等于零,委托失败!")
        logger.info(
            f"【{sec_name}】,可用持仓:{position_volume_yesterday},计划委托:{sell_order_volume}<=0 ?,无法委托,直接委卖100!")
        sell_order_by_volume(symbol, 100, sec_name, current_price)
 
 
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r):
    class MyTradeCallback(huaxin_trade_api.TradeCallback):
        def on_order(self, order_info):
            """
            订单状态改变回调
            :param order_info: {'sinfo': 'b_603682_1736312765623', 'securityID': '603682', 'orderLocalID': '8100043081', 'direction': '0', 'orderSysID': '110018100043081', 'insertTime': '13:06:04', 'insertDate': '20250108', 'acceptTime': '13:05:46', 'cancelTime': '', 'limitPrice': 6.45, 'accountID': '00032047', 'orderRef': 130608, 'turnover': 6410.0, 'volume': 1000, 'volumeTraded': 1000, 'orderStatus': '4', 'orderSubmitStatus': '1', 'statusMsg': ''}
            :return:
            """
            print(f"收到订单回调:{order_info}")
            async_log_util.info(logger_debug, f"收到订单回调:{order_info}")
            if huaxin_util.is_deal(order_info['orderStatus']):
                if order_info["direction"] == '0':
                    # 买入成交
                    TodayBuyCodeManager().add_deal_code(order_info["securityID"], order_info.get("orderRef"))
                # 成交,需要更新持仓/委托/成交
                huaxin_trade_data_update.add_position_list()
                huaxin_trade_data_update.add_delegate_list("成交")
                huaxin_trade_data_update.add_deal_list()
            else:
                huaxin_trade_data_update.add_money_list()
                huaxin_trade_data_update.add_delegate_list("订单状态变化")
            # 推送订单数据
            threading.Thread(target=lambda: middle_api_protocol.push(
                middle_api_protocol.load_push_msg({"type": "order", "data": order_info})), daemon=True).start()
 
    huaxin_trade_api.run_trade(queue_strategy_r_trade_w, MyTradeCallback(), queue_strategy_w_trade_r,
                               queue_strategy_w_trade_for_query_r)
    threading.Thread(target=data_server.run, daemon=True).start()
 
 
@tool.singleton
class TodayBuyCodeManager:
    """
    今日买入代码管理类
    """
    __db = 0
    redisManager = redis_manager.RedisManager(0)
 
    def __init__(self):
        # 挂单中的代码
        self.delegating_codes_info = {}
        self.deal_codes = set()
        self.__load_data()
 
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
 
    def __load_data(self):
        """
        加载数据
        :return:
        """
        codes = RedisUtils.smembers(self.__get_redis(), "buy_deal_codes")
        if codes:
            self.deal_codes = set(codes)
 
    def add_deal_code(self, code, order_ref=None):
        """
        添加买入成交的代码
        :param order_ref:
        :param code:
        :return:
        """
        if code in self.deal_codes:
            return
        async_log_util.info(logger_trade, f"买入成交:{code}")
        self.deal_codes.add(code)
        if order_ref and order_ref in self.delegating_codes_info:
            del self.delegating_codes_info[order_ref]
        RedisUtils.sadd_async(self.__db, "buy_deal_codes", code)
        RedisUtils.expire_async(self.__db, "buy_deal_codes", tool.get_expire())
 
    def place_order(self, code, order_ref):
        """
        下单
        :param code: 代码
        :param order_ref: 索引
        :return:
        """
        async_log_util.info(logger_trade, f"下单:{code}-{order_ref}")
        self.delegating_codes_info[order_ref] = code
 
    def get_buy_codes(self):
        """
        获取买入的代码:成交代码+委托
        :return:
        """
        codes = set()
        if self.deal_codes:
            codes |= self.deal_codes
        if self.delegating_codes_info:
            codes |= set(self.delegating_codes_info.values())
        return codes
 
    def buy_fail(self, order_ref):
        """
        买入失败
        :param order_ref:
        :return:
        """
        async_log_util.info(logger_trade, f"下单失败:{order_ref}")
        if order_ref in self.delegating_codes_info:
            del self.delegating_codes_info[order_ref]
 
 
if __name__ == '__main__':
    # 测试代码
    print(TodayBuyCodeManager().get_buy_codes())