1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
华鑫交易数据更新
"""
import json
import logging
import queue
import threading
import time
 
from huaxin_client import constant as huaxin_client_constant
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, logger_system
from trade import huaxin_trade_api, huaxin_trade_record_manager
 
from utils import huaxin_util, tool
import concurrent.futures
 
trade_data_request_queue = queue.Queue()
__process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
 
 
# 主动更新数据
def __read_update_task_queue():
    logger_system.info("启动读取交易数据更新队列")
    while True:
        try:
            data = trade_data_request_queue.get()
            if data:
                type_ = data["type"]
                delay = data.get("delay")
                if delay and delay > 0:
                    time.sleep(delay)
                async_log_util.info(hx_logger_trade_debug, f"获取交易数据开始:{type_}")
                try:
                    if type_ == "delegate_list":
                        dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False, timeout=10)
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.DelegateRecordManager.add(data)
                    elif type_ == "money":
                        dataJSON = huaxin_trade_api.get_money()
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.MoneyManager.save_data(data)
                            # if data:
                            #     usefulMoney = data[0]["usefulMoney"]
                            #     # 设置可用资金
                            #     trade_manager.AccountAvailableMoneyManager().set_available_money(0, usefulMoney)
                            # 设置可用资金
                    elif type_ == "deal_list":
                        dataJSON = huaxin_trade_api.get_deal_list(timeout=10)
                        print("成交响应:", dataJSON)
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            # TODO 今日成交了的票就不下单了
                            # for d in datas:
                            #     if d["volume"] > 0 and int(d["direction"]) == 0:
                            #         CodeTradeStateManager().set_trade_state(d["securityID"], 0,
                            #                                                 CodeTradeStateManager.TRADE_STATE_ALREADY_BUY)
                            huaxin_trade_record_manager.DealRecordManager.add(datas)
                    # 持仓股
                    elif type_ == "position_list":
                        dataJSON = huaxin_trade_api.get_position_list()
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
                    async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
                except Exception as e1:
                    print("出错:", str(e1))
                    logging.exception(e1)
                    # if str(e1).find("超时") >= 0:
                    #     # 读取结果超时需要重新请求
                    #     trade_data_request_queue.put_nowait({"type": type_})
                    raise e1
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有0.1s的间隔
            time.sleep(0.01)
 
 
def __add_data(data):
    trade_data_request_queue.put_nowait(data)
 
 
def add_delegate_list(source, delay=0):
    __add_data({"type": "delegate_list", "delay": delay})
    # async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}")
 
 
def add_deal_list():
    __add_data({"type": "deal_list"})
 
 
def add_money_list(delay=0):
    __add_data({"type": "money", "delay": delay})
 
 
def add_position_list():
    __add_data({"type": "position_list"})
 
 
# 运行
def run():
    """
    运行读取接收命令的队列
    :return:
    """
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()