Administrator
2025-06-12 4e5eed2226fae6a057c454155565211dbc9349e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
华鑫交易数据更新
"""
import json
import queue
import threading
import time
 
import constant
from code_atrribute import gpcode_manager
from code_atrribute.history_k_data_util import HistoryKDatasUtils
from code_atrribute.position_code_data_manager import PositionCodeDataManager
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, logger_system, printlog
from trade import huaxin_trade_api, huaxin_trade_record_manager
 
import concurrent.futures
 
from utils import tool
 
trade_data_request_queue = queue.Queue()
__process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
 
 
# 主动更新数据
def __read_update_task_queue():
    logger_system.info("启动读取交易数据更新队列")
    while True:
        try:
            data = trade_data_request_queue.get()
            if data:
                type_ = data["type"]
                delay = data.get("delay")
                if delay and delay > 0:
                    time.sleep(delay)
                async_log_util.info(hx_logger_trade_debug, f"获取交易数据开始:{type_}")
                try:
                    # 持仓股
                    if type_ == "position_list":
                        dataJSON = huaxin_trade_api.get_position_list()
                        async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            huaxin_trade_record_manager.PositionManager.cache(datas)
                            # 获取持仓股的涨停价
                            position_codes = set()
                            # position_codes.add("113565")
                            need_update_codes = []
                            for d in datas:
                                if (d["prePosition"] > 0 and tool.is_can_sell_code(d["securityID"])) or d[
                                    "todayBSPos"] > 0:
                                    # 订阅以往持仓和今日买入
                                    position_codes.add(d["securityID"])
                                    # 获取昨日收盘价
                                    if not gpcode_manager.get_price_pre_cache(d["securityID"]):
                                        # 获取昨日的收盘价
                                        need_update_codes.append(d["securityID"])
                                    try:
                                        PositionCodeDataManager.request_pre_volume(d["securityID"])
                                    except:
                                        pass
                            if need_update_codes:
                                gpcode_manager.request_price_pre(need_update_codes)
                                async_log_util.info(hx_logger_trade_debug, f"获取收盘价:{type_}")
                            queue_l1_trade_r_strategy_w.put_nowait(
                                {"type": "set_target_codes", "data": list(position_codes)})
                            constant.SUBSCRIPT_L2_CODES |= position_codes
                            queue_strategy_w_l2_r.put_nowait(json.dumps(
                                {"type": "l2_cmd", "data": list(constant.SUBSCRIPT_L2_CODES)}))
                            # 9点25之前需要订阅持仓票
                            __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
                    async_log_util.info(hx_logger_trade_debug, f"获取交易数据结束:{type_}")
                except Exception as e1:
                    # if str(e1).find("超时") >= 0:
                    #     # 读取结果超时需要重新请求
                    #     trade_data_request_queue.put_nowait({"type": type_})
                    raise e1
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有0.1s的间隔
            time.sleep(0.01)
 
 
def __add_data(data):
    trade_data_request_queue.put_nowait(data)
 
 
def add_delegate_list(source, delay=0):
    __add_data({"type": "delegate_list", "delay": delay})
    async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}")
 
 
def add_deal_list():
    __add_data({"type": "deal_list"})
 
 
def add_money_list(delay=0):
    __add_data({"type": "money", "delay": delay})
 
 
def add_position_list():
    __add_data({"type": "position_list"})
 
 
def test():
    time.sleep(2)
    position_codes = ["000333"]
    queue_l1_trade_r_strategy_w.put_nowait(
        {"type": "set_target_codes", "data": list(position_codes)})
 
 
# 运行
def run(queue_l1_trade_r_strategy_w_, queue_strategy_w_l2_r_):
    global queue_l1_trade_r_strategy_w, queue_strategy_w_l2_r
    queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_
    queue_strategy_w_l2_r = queue_strategy_w_l2_r_
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()
    # threading.Thread(target=lambda: test(), daemon=True).start()
 
 
if __name__ == "__main__":
    printlog(HistoryKDatasUtils.get_gp_latest_info(["000333"]))