Administrator
2025-06-12 ab662be5c523b75c1bd28fc6bfcab2872b9623b3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
华鑫交易数据更新
"""
import logging
import queue
import threading
import time
 
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, logger_system, logger_debug
from trade import trade_manager, trade_data_manager
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager
 
from utils import huaxin_util
import concurrent.futures
 
trade_data_request_queue = queue.Queue(maxsize=1000)
__process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
 
 
# 主动更新数据
def __read_update_task_queue():
    logger_system.info("启动读取交易数据更新队列")
    while True:
        try:
            data = trade_data_request_queue.get()
            if data:
                type_ = data["type"]
                delay = data.get("delay")
                if delay and delay > 0:
                    time.sleep(delay)
                async_log_util.info(hx_logger_trade_debug, f"获取交易数据开始:{type_}")
                try:
                    if type_ == "delegate_list":
                        dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False, timeout=10)
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            __process_thread_pool.submit(huaxin_trade_record_manager.DelegateRecordManager.add, data)
 
                    elif type_ == "money":
                        dataJSON = huaxin_trade_api.get_money()
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.MoneyManager.save_data(data)
                            if data:
                                usefulMoney = data[0]["usefulMoney"]
                                commission = data[0]["commission"]
                                # 设置可用资金
                                trade_data_manager.AccountMoneyManager().set_available_money(0, usefulMoney)
                                trade_data_manager.AccountMoneyManager().set_commission(commission)
                            # 设置可用资金
                    elif type_ == "deal_list":
                        dataJSON = huaxin_trade_api.get_deal_list(timeout=10)
                        print("成交响应:", dataJSON)
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            if datas is None:
                                datas = []
                            try:
                                buy_deal_codes = set()
                                for d in datas:
                                    if str(d['direction']) == str(huaxin_util.TORA_TSTP_D_Buy):
                                        buy_deal_codes.add(d['securityID'])
                            except Exception as e:
                                logger_debug.exception(e)
                            huaxin_trade_record_manager.DealRecordManager.add(datas)
                            if datas:
                                for d in datas:
                                    if str(d['direction']) != str(huaxin_util.TORA_TSTP_D_Buy):
                                        continue
                                    trade_manager.DealCodesManager().add_deal_order(d['securityID'], d['volume'],
                                                                                    d['price'], d["tradeID"],
                                                                                    d["orderSysID"])
                    # 持仓股
                    elif type_ == "position_list":
                        dataJSON = huaxin_trade_api.get_position_list()
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            huaxin_trade_record_manager.PositionManager.cache(datas)
                            __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
                    async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
                except Exception as e1:
                    # if str(e1).find("超时") >= 0:
                    #     # 读取结果超时需要重新请求
                    #     trade_data_request_queue.put_nowait({"type": type_})
                    raise e1
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有0.1s的间隔
            time.sleep(0.01)
 
 
def get_request_queue_size():
    """
    获取请求队列的大小
    @return:
    """
    return trade_data_request_queue.qsize()
 
 
def repaire_task():
    """
    任务修复
    @return:
    """
    queue_size = get_request_queue_size()
    if queue_size < 2:
        return
    threading.Thread(target=lambda: __read_update_task_queue(), daemon=True).start()
 
 
def __add_data(data):
    trade_data_request_queue.put_nowait(data)
 
 
def add_delegate_list(source, delay=0):
    __add_data({"type": "delegate_list", "delay": delay})
    async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}")
 
 
def add_deal_list():
    __add_data({"type": "deal_list"})
 
 
def add_money_list(delay=0):
    __add_data({"type": "money", "delay": delay})
 
 
def add_position_list():
    __add_data({"type": "position_list"})
 
 
# 运行
def run():
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()