admin
4 天以前 40258b39a5d2a7883e735a2327d3f79f67dec0cf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# -*- coding: utf-8 -*-
import logging
import marshal
import queue
import threading
import time
 
from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager
from log_module.async_log_util import huaxin_l2_log
import collections
 
from log_module.log import logger_local_huaxin_contact_debug
 
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
# 临时数据
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
 
 
# L2上传数据管理器
class L2DataUploadManager:
    def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager):
        self.data_callback_distribute_manager = data_callback_distribute_manager
        # 代码分配的对象
        self.temp_order_queue_dict = {}
        self.temp_transaction_queue_dict = {}
        self.temp_log_queue_dict = {}
 
        self.filter_order_condition_dict = {}
        self.upload_l2_data_task_dict = {}
        self.l2_order_codes = set()
        self.l2_transaction_codes = set()
        self.__real_time_buy1_data = {}
 
    # 过滤订单
    def __filter_order(self, item):
        if item[1] * item[2] < 500000:
            return None
        return item
        # 过滤订单
 
    def __filter_transaction(self, item):
        return item
 
    # 添加委托详情
    def add_l2_order_detail(self, data, start_time=0, istransaction=False):
        code = data["SecurityID"]
        if code in self.__real_time_buy1_data:
            if self.__real_time_buy1_data[code][1] == data["Price"]:
                # 与买的价格一致
                if data["Side"] == '1':
                    if data["OrderStatus"] == 'D':
                        # 买撤
                        self.__real_time_buy1_data[code][3] -= data["Volume"]
                    else:
                        # 买
                        self.__real_time_buy1_data[code][3] += data["Volume"]
        q: collections.deque = self.temp_order_queue_dict.get(code)
        q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
                  data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time))
 
    # 添加逐笔成交
    def add_transaction_detail(self, data):
        code = data["SecurityID"]
        if code in self.__real_time_buy1_data:
            if self.__real_time_buy1_data[code][1] == data["TradePrice"]:
                # 与买的价格一致
                self.__real_time_buy1_data[code][3] -= data["TradeVolume"]
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                  data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                  data['SellNo'], data['ExecType']))
 
    def add_market_data(self, data):
        code = data["securityID"]
        # [时间,买1价格,原始买1量,计算后的买1量]}
        self.__real_time_buy1_data[code] = [data["dataTimeStamp"], data["buy"][0][0], data["buy"][0][1],
                                            data["buy"][0][1]]
 
        self.data_callback_distribute_manager.get_distributed_callback(code).OnMarketData(code, [data])
 
    # 分配上传队列
    def distribute_upload_queue(self, code):
        if not self.data_callback_distribute_manager.get_distributed_callback(code):
            self.data_callback_distribute_manager.distribute_callback(code)
 
        if code not in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code] = collections.deque()
        if code not in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code] = collections.deque()
        if code not in self.temp_log_queue_dict:
            self.temp_log_queue_dict[code] = queue.Queue()
        if code not in self.upload_l2_data_task_dict:
            t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True)
            t1.start()
            t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True)
            t2.start()
            t3 = threading.Thread(target=lambda: self.__run_upload_real_time_buy1_task(code), daemon=True)
            t3.start()
            self.upload_l2_data_task_dict[code] = (t1, t2, t3)
        # 释放已经分配的队列
 
    def release_distributed_upload_queue(self, code):
        self.data_callback_distribute_manager.release_distribute_callback(code)
        if code in self.temp_order_queue_dict:
            self.temp_order_queue_dict[code].clear()
            self.temp_order_queue_dict.pop(code)
        if code in self.temp_transaction_queue_dict:
            self.temp_transaction_queue_dict[code].clear()
            self.temp_transaction_queue_dict.pop(code)
        if code in self.temp_log_queue_dict:
            self.temp_log_queue_dict.pop(code)
 
        if code in self.upload_l2_data_task_dict:
            self.upload_l2_data_task_dict.pop(code)
 
    def __upload_l2_data(self, code, _queue, datas):
        _queue.put_nowait(marshal.dumps([code, datas, time.time()]))
 
    # 处理订单数据并上传
    def __run_upload_order_task(self, code):
        q: collections.deque = self.temp_order_queue_dict.get(code)
        temp_list = []
        while True:
            try:
                while len(q) > 0:
                    data = q.popleft()
                    # 前置数据处理,过滤掉无用的数据
                    data = self.__filter_order(data)
                    if data:
                        temp_list.append(data)
 
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    # self.__upload_l2_order_data(code, temp_list)
                    __start_time = time.time()
                    last_data = temp_list[-1]
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list,
                                                                                                   time.time())
                    use_time = time.time() - __start_time
                    if use_time > 0.01:
                        # 记录10ms以上的数据
                        huaxin_l2_log.info(logger_local_huaxin_contact_debug, f"耗时:{use_time}s  结束数据:{last_data}")
                    temp_list = []
                else:
                    if code not in self.temp_order_queue_dict:
                        self.l2_order_codes.discard(code)
                        break
                    self.l2_order_codes.add(code)
                    time.sleep(0.001)
 
            except Exception as e:
                logging.exception(e)
            finally:
                pass
 
    # 处理成交数据并上传
    def __run_upload_transaction_task(self, code):
        q: collections.deque = self.temp_transaction_queue_dict.get(code)
        temp_list = []
        while True:
            try:
                while len(q) > 0:
                    data = q.popleft()
                    data = self.__filter_transaction(data)
                    if data:
                        temp_list.append(data)
                if temp_list:
                    # 上传数据
                    # self.__upload_l2_data(code, upload_queue, temp_list)
                    self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code,
                                                                                                         temp_list)
                    temp_list = []
                else:
                    if code not in self.temp_transaction_queue_dict:
                        self.l2_transaction_codes.discard(code)
                        break
                    self.l2_transaction_codes.add(code)
                    time.sleep(0.001)
            except:
                pass
            finally:
                pass
 
    # 处理实时买1数据
    def __run_upload_real_time_buy1_task(self, code):
        while True:
            try:
                if code in self.__real_time_buy1_data:
                    data = self.__real_time_buy1_data[code]
                    # 如果最新的买1是原来买1的1/2时开始上传
                    if data[2] > 0 and data[3] / data[2] <= 0.5:
                        self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code, data)
            except:
                pass
            finally:
                time.sleep(0.1)
 
 
def add_target_code(code):
    target_codes.add(code)
    # 记录代码加入时间
    target_codes_add_time[code] = time.time()
 
 
def del_target_code(code):
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
 
 
def add_subscript_codes(codes):
    pass
 
 
if __name__ == "__main__":
    add_subscript_codes(["000333"])