Administrator
2024-07-25 9d39b293bde97f31f522010373aad1dd3f654c07
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
华鑫交易数据更新
"""
import json
import logging
import queue
import threading
import time
 
from code_attribute import gpcode_manager
from huaxin_client import constant as huaxin_client_constant
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, logger_system
from records import huaxin_trade_record_manager
from trade import huaxin_trade_api
from trade.trade_manager import CodeTradeStateManager
 
from utils import huaxin_util, tool
import concurrent.futures
 
trade_data_request_queue = queue.Queue()
__process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
 
 
# 主动更新数据
def __read_update_task_queue():
    logger_system.info("启动读取交易数据更新队列")
    while True:
        try:
            data = trade_data_request_queue.get()
            if data:
                type_ = data["type"]
                delay = data.get("delay")
                if delay and delay > 0:
                    time.sleep(delay)
                async_log_util.info(hx_logger_trade_debug, f"获取交易数据开始:{type_}")
                try:
                    if type_ == "delegate_list":
                        dataJSON = huaxin_trade_api.get_delegate_list(can_cancel=False, timeout=10)
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            __process_thread_pool.submit(huaxin_trade_record_manager.DelegateRecordManager.add, data)
                            # 是否可以撤单
                            if data:
                                codes = []
                                for d in data:
                                    code = d["securityID"]
                                    orderStatus = d["orderStatus"]
                                    orderSysID = d.get("orderSysID")
                                    orderRef = d["orderRef"]
                                    accountID = d["accountID"]
                                    insertTime = d.get('insertTime')
                                    acceptTime = d.get('acceptTime')
                                    insertDate = d.get('insertDate')
                                    direction = d.get("direction")
                                    limitPrice = d.get("limitPrice")
                                    volume = d.get("volume")
                                    # 获取涨停价
                                    is_shadow_order = False
                                    limit_up_price = gpcode_manager.get_limit_up_price_cache(code)
                                    if limit_up_price and volume == huaxin_client_constant.SHADOW_ORDER_VOLUME:
 
                                        if abs(float(limitPrice) - float(limit_up_price)) >= 0.01:
                                            is_shadow_order = True
                                    if is_shadow_order:
                                        continue
 
                                    if huaxin_util.is_can_cancel(orderStatus):
                                        codes.append(code)
                                if codes:
                                    pass
 
                    elif type_ == "money":
                        dataJSON = huaxin_trade_api.get_money()
                        if dataJSON["code"] == 0:
                            data = dataJSON["data"]
                            huaxin_trade_record_manager.MoneyManager.save_data(data)
                            # if data:
                            #     usefulMoney = data[0]["usefulMoney"]
                            #     # 设置可用资金
                            #     trade_manager.AccountAvailableMoneyManager().set_available_money(0, usefulMoney)
                            # 设置可用资金
                    elif type_ == "deal_list":
                        dataJSON = huaxin_trade_api.get_deal_list(timeout=10)
                        print("成交响应:", dataJSON)
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            # 今日成交了的票就不下单了
                            for d in datas:
                                if d["volume"] > 0 and int(d["direction"]) == 0:
                                    CodeTradeStateManager().set_trade_state(d["securityID"], 0,
                                                                            CodeTradeStateManager.TRADE_STATE_ALREADY_BUY)
                            huaxin_trade_record_manager.DealRecordManager.add(datas)
                    # 持仓股
                    elif type_ == "position_list":
                        dataJSON = huaxin_trade_api.get_position_list()
                        if dataJSON["code"] == 0:
                            datas = dataJSON["data"]
                            huaxin_trade_record_manager.PositionManager.cache(datas)
                            __process_thread_pool.submit(huaxin_trade_record_manager.PositionManager.add, datas)
                    async_log_util.info(hx_logger_trade_debug, f"获取交易数据成功:{type_}")
                except Exception as e1:
                    # if str(e1).find("超时") >= 0:
                    #     # 读取结果超时需要重新请求
                    #     trade_data_request_queue.put_nowait({"type": type_})
                    raise e1
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有0.1s的间隔
            time.sleep(0.01)
 
 
def __add_data(data):
    trade_data_request_queue.put_nowait(data)
 
 
def add_delegate_list(source, delay=0):
    __add_data({"type": "delegate_list", "delay": delay})
    async_log_util.info(hx_logger_trade_debug, f"请求委托列表,来源:{source}")
 
 
def add_deal_list():
    __add_data({"type": "deal_list"})
 
 
def add_money_list(delay=0):
    __add_data({"type": "money", "delay": delay})
 
 
def add_position_list():
    __add_data({"type": "position_list"})
 
 
# 运行
def run():
    """
    运行读取接收命令的队列
    :return:
    """
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()