1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
import concurrent.futures
import copy
import hashlib
import http
import json
import logging
import socketserver
import time
from http.server import BaseHTTPRequestHandler
import urllib.parse as urlparse
 
import psutil
 
import constant
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from db.redis_manager_delegate import RedisUtils
from log_module import log_export, async_log_util
from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api
from strategy import data_cache
from strategy.kpl_data_manager import KPLStockOfMarketsPlateLogManager, KPLMarketStockHeatLogManager
from strategy.trade_setting import TradeSetting
from trade import huaxin_trade_api, huaxin_trade_data_update
from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager
from utils import tool, huaxin_util, socket_util
 
 
class DataServer(BaseHTTPRequestHandler):
    __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
 
    # 禁用日志输出
    def log_message(self, format, *args):
        pass
 
    def do_GET(self):
 
        def get_params(url):
            return dict([(k, v[0]) for k, v in urlparse.parse_qs(url.query).items()])
 
        path = self.path
        url = urlparse.urlparse(path)
        params_dict = get_params(url)
        response_data = ""
        if url.path == "/get_position_list":
            # 获取持仓列表
            results = PositionManager.get_position_cache()
            results = copy.deepcopy(results)
            for r in results:
                r["auto_sell"] = 1 if r["securityID"] in data_cache.LIMIT_UP_SELL_CODES else 0
            response_data = json.dumps({"code": 0, "data": results})
        elif url.path == "/get_money":
            # 获取资金信息
            result = MoneyManager.get_cache()
            response_data = json.dumps({"code": 0, "data": result})
        elif url.path == "/get_deal_list":
            # 获取成交列表
            results = DealRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"))
            if results:
                for d in results:
                    d["securityName"] = data_cache.DataCache().code_name_dict.get(tool.get_symbol(d["securityID"]))
 
            response_data = json.dumps({"code": 0, "data": results})
        elif url.path == "/get_delegate_list":
            # 获取委托列表
            # 是否可撤单,如果不传默认拉取所有
            print("参数", params_dict)
            can_cancel = params_dict.get("can_cancel")
            order_status = []
            if can_cancel is not None:
                if int(can_cancel):
                    print("获取未结委托")
                    order_status = [huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Unknown,
                                    huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]
                else:
                    print("获取已结委托")
                    order_status = [huaxin_util.TORA_TSTP_OST_AllTraded, huaxin_util.TORA_TSTP_OST_PartTradeCanceled,
                                    huaxin_util.TORA_TSTP_OST_AllCanceled, huaxin_util.TORA_TSTP_OST_Rejected]
            results = DelegateRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=order_status)
            response_data = json.dumps({"code": 0, "data": results})
 
        elif url.path == "/refresh_trade_data":
            # 刷新交易数据
            _type = params_dict.get("type")
            if _type == "money":
                huaxin_trade_data_update.add_money_list()
            elif _type == "delegate":
                huaxin_trade_data_update.add_delegate_list("手动刷新")
            elif _type == "deal":
                huaxin_trade_data_update.add_deal_list()
            elif _type == "position":
                huaxin_trade_data_update.add_position_list()
            response_data = json.dumps({"code": 0, "data": {}})
 
        elif url.path == "/get_market_info":
            # 获取市场行情信息
            codes_str = params_dict.get("codes")
            codes = json.loads(codes_str)
            fdatas = []
            for code in codes:
                data = data_cache.latest_code_market_info_dict.get(code)
                # logger_debug.info(f"获取L1行情接口:{code}-{data}")
                if data:
                    fdatas.append(data)
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/get_buy_money":
            # 获取每次买入的金额
            money = data_cache.BUY_MONEY_PER_CODE
            response_data = json.dumps({"code": 0, "data": {"money": money}})
        elif url.path == "/get_trade_settings":
            fdata = {"running": TradeSetting().get_running(), "auto_sell": TradeSetting().get_auto_sell(),
                     "auto_buy": TradeSetting().get_auto_buy()}
            response_data = json.dumps({"code": 0, "data": fdata})
 
        elif url.path == "/set_trade_settings":
            running = params_dict.get("running")
            auto_sell = params_dict.get("auto_sell")
            auto_buy = params_dict.get("auto_buy")
            if running is not None:
                TradeSetting().set_running(int(running))
            if auto_sell is not None:
                TradeSetting().set_auto_sell(int(auto_sell))
            if auto_buy is not None:
                TradeSetting().set_auto_buy(int(auto_buy))
            response_data = json.dumps({"code": 0, "data": {}})
 
        elif url.path == "/get_env":
            request_id = params_dict.get("request_id")
            use_time_list = []
            try:
                __start_time = time.time()
                fdata = {}
                # try:
                #     date = HistoryKDatasUtils.get_trading_dates(tool.date_sub(tool.get_now_date_str(), 10),
                #                                                 tool.get_now_date_str())
                #     if date:
                #         fdata["juejin"] = 1
                # except Exception as e:
                #     fdata["juejin"] = 0
                # fdata["kpl"] = {}
                # # 获取开盘啦数据
                # kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
                #              KPLDataType.INDUSTRY_RANK.value]
                # for kpl_type in kpl_types:
                #     if kpl_type in KPLDataManager.kpl_data_update_info:
                #         fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
 
                try:
                    # 验证redis
                    RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
                    fdata["redis"] = 1
                except:
                    fdata["redis"] = 0
                use_time_list.append(("验证redis", time.time() - __start_time))
 
                try:
                    # 验证mysql
                    mysql_data.Mysqldb().select_one("select 1")
                    fdata["mysql"] = 1
                except:
                    fdata["mysql"] = 0
                use_time_list.append(("验证mysql", time.time() - __start_time))
 
                try:
                    # redis异步任务数量
                    fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
                except:
                    pass
                use_time_list.append(("验证异步任务数量", time.time() - __start_time))
 
                # 获取交易通道
                try:
                    can_access = huaxin_trade_api.test_trade_channel()
                    fdata["trade_channel_access"] = 1 if can_access else 0
                except Exception as e:
                    logger_debug.exception(e)
                    fdata["trade_channel_access"] = 0
                use_time_list.append(("验证交易通道", time.time() - __start_time))
 
                # 获取CPU与内存适用情况
                memory_info = psutil.virtual_memory()
                cpu_percent = psutil.cpu_percent(interval=1)
                fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
 
                use_time_list.append(("获取设备资源占用", time.time() - __start_time))
                # 获取交易通道
                result = {"code": 0, "data": fdata, "msg": ""}
                # print("OnGetEnvInfo 成功")
                response_data = json.dumps(result)
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
                logger_debug.error(f"环境获取异常:{request_id}")
                logger_debug.exception(e)
            finally:
                if use_time_list and use_time_list[-1][1] > 10:
                    logger_debug.warning(f"环境获取时间大于10s({request_id}):{use_time_list}")
        # 获取板块强度数据
        elif url.path == "/load_get_kpl_market_sift_plate":
            # 加载数据
            KPLStockOfMarketsPlateLogManager().load_data()
            response_data = json.dumps({"code": 0, "msg": "暂无内容"})
        elif url.path == "/get_kpl_market_sift_plate":
            # 获取开盘啦流入板块详细信息
            print("==========get_kpl_market_sift_plate==========")
            try:
                time_str = params_dict.get("time")
                if not time_str:
                    time_str = tool.get_now_time_str()
                fdatas = KPLStockOfMarketsPlateLogManager().get_filter_log_datas()
                response_data = json.dumps({"code": 1, "msg": "暂无内容"})
                for i in range(len(fdatas) - 1, -1, -1):
                    if fdatas[i][0] <= time_str:
                        response_data = json.dumps({"code": 0, "data": fdatas[i]})
                        break
            except Exception as e:
                logging.exception(e)
                response_data = json.dumps({"code": 1, "msg": str(e)})
 
        # 获取个股强度数据
        elif url.path == "/load_get_kpl_market_stock_heat":
            # 加载数据
            KPLMarketStockHeatLogManager().load_data()
            response_data = json.dumps({"code": 0, "msg": "暂无内容"})
        elif url.path == "/get_get_kpl_market_stock_heat":
            # 获取开盘啦流入板块详细信息
            print("==========get_kpl_stock_of_markets_plate==========")
            try:
                time_str = params_dict.get("time")
                if not time_str:
                    time_str = tool.get_now_time_str()
                fdatas = KPLMarketStockHeatLogManager().get_filter_log_datas()
                response_data = json.dumps({"code": 1, "msg": "暂无内容"})
                for i in range(len(fdatas) - 1, -1, -1):
                    if fdatas[i][0] <= time_str:
                        response_data = json.dumps({"code": 0, "data": fdatas[i]})
                        break
            except Exception as e:
                logging.exception(e)
                response_data = json.dumps({"code": 1, "msg": str(e)})
        elif url.path == "/get_kpl_market_strong_records":
            # 获取开盘啦市场强度记录
            time_str = params_dict.get("time")
            if not time_str:
                time_str = tool.get_now_time_str()
            datas = log_export.load_kpl_market_strong()
            fdatas = []
            for data in datas:
                # (距离09:15:00的秒数, 时间, 强度)
                fdatas.append((tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1]))
            response_data = json.dumps({"code": 0, "data": fdatas})
 
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
 
    @classmethod
    def __is_sign_right(cls, params):
        ps = []
        for k, v in params.items():
            if k == 'sign':
                continue
            ps.append(f"{k}={v}")
        ps.sort()
        source_str = "&".join(ps) + "!@#lowSU*^cTion8888"
        md5_hash = hashlib.md5()
        # 将字符串编码为字节并更新哈希对象
        md5_hash.update(source_str.encode('utf-8'))
        # 获取十六进制表示的哈希值
        md5_hexdigest = md5_hash.hexdigest()
        if md5_hexdigest == params.get("sign"):
            return True
        return False
 
    def do_POST(self):
        result_str = ""
        try:
            path = self.path
            print("接收到POST请求:", str(path))
            url = urlparse.urlparse(path)
            if url.path == "/trade_callback":
                if constant.IS_SIMULATED_TRADE:
                    # 接受开盘啦数据
                    body = self.__parse_request()
                    if type(body) != str:
                        huaxin_trade_api.add_trade_callback_data(json.dumps(body))
                    else:
                        huaxin_trade_api.add_trade_callback_data(body)
                result_str = json.dumps({"code": 0})
            elif url.path == "/buy":
                # 签名验证
                params = self.__parse_request()
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                print("买入", params)
                logger_request_api.info(f"买入:{params}")
                # 买入
                code = params.get("code")  # 代码
                volume = params.get("volume")  # 量
                price = params.get("price")
                if not price:
                    # 没有上传价格,就需要获取最近的价格进行买入
                    data = data_cache.latest_code_market_info_dict.get(code)
                    if not data:
                        raise Exception("没有获取到L1数据")
                    pre_price = data[1]
                    current_price = data[2] if data[2] else data[5][0][0]
                    price = tool.get_buy_max_price(current_price)
                    price = min(price, tool.get_limit_up_price(code, pre_price))
                else:
                    price = round(float(params.get("price")), 2)  # 价格
                result = huaxin_trade_api.order(1, code, volume, price, blocking=True)
                result_str = json.dumps(result)
            elif url.path == "/sell":
                params = self.__parse_request()
                # 签名验证
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                try:
                    print("卖出", params)
                    code = params.get("code")  # 代码
                    volume = params.get("volume")  # 量
                    price = params.get("price")
                    if not price:
                        # 没有上传价格,就需要获取最近的价格进行买入
                        data = data_cache.latest_code_market_info_dict.get(code)
                        if not data:
                            raise Exception("没有获取到L1数据")
                        pre_price = data[1]
                        current_price = data[2] if data[2] else data[5][0][0]
                        # 获取最新成交价格
                        latest_deal_price = data_cache.latest_deal_price_dict.get(code)
                        if latest_deal_price:
                            current_price = round(float(latest_deal_price), 2)
                            async_log_util.info(logger_debug, f"根据成交价卖出:{code}-{latest_deal_price}")
 
                        price = tool.get_buy_min_price(current_price)
                        price = max(price, tool.get_limit_down_price(code, pre_price))
                    else:
                        price = round(params.get("price"), 2)  # 价格
                    result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
                    result_str = json.dumps(result)
                finally:
                    logger_request_api.info(f"卖出:{params}")
 
            elif url.path == "/set_buy_money":
                # 设置每次买入的金额
                params = self.__parse_request()
                # 签名验证
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("每次买入的金额", params)
                money = params.get("money")  # 金额
                if money is None:
                    result_str = json.dumps({"code": 1, "msg": "未上传金额"})
                    return
                money = int(money)
 
                logger_debug.info(f"设置开仓金额:{money}")
                data_cache.BUY_MONEY_PER_CODE = money
                result_str = json.dumps({"code": 0})
 
            elif url.path == "/set_limit_up_sell":
                # 设置每次买入的金额
                params = self.__parse_request()
                # 签名验证
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("每次买入的金额", params)
                code = params.get("code")  #代码
                enable = params.get("enable")  # 是否开启
                if code is None or enable is None:
                    result_str = json.dumps({"code": 1, "msg": "上传数据缺失"})
                    return
                enable = int(enable)
                if enable:
                    data_cache.LIMIT_UP_SELL_CODES.add(code)
                else:
                    data_cache.LIMIT_UP_SELL_CODES.discard(code)
                result_str = json.dumps({"code": 0})
 
            elif url.path == "/cancel_order":
                params = self.__parse_request()
                # 签名验证
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("撤单", params)
                direction = params.get("direction")
                code = params.get("code")  # 代码
                orderSysID = params.get("orderSysID")  # 系统订单编号
                result = huaxin_trade_api.cancel_order(direction, code, orderSysID, blocking=True)
                result_str = json.dumps(result)
            elif url.path == "/upload_deal_big_orders":
                # 成交大单传递
                datas = self.rfile.read(int(self.headers['content-length']))
                _str = str(datas, encoding="gbk")
                datas = json.loads(_str)
                for d in datas:
                    if d[1] != 0:
                        continue
                    code, data = d[0], d[2]
                    if code not in data_cache.big_order_deal_dict:
                        data_cache.big_order_deal_dict[code] = []
                    data_cache.big_order_deal_dict[code].append(d)
                    # 获取买大单数量
                    len(data_cache.big_order_deal_dict.get(code, []))
                hx_logger_l2_transaction.info(_str)
                # 记录日志
                result_str = json.dumps({"code": 0})
        except Exception as e:
            result_str = json.dumps({"code": 1, "msg": str(e)})
        finally:
            self.__send_response(result_str)
 
    def __send_response(self, data):
        # 发给请求客户端的响应数据
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(data.encode())
 
    def __parse_request(self):
        params = {}
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        # print(_str)
        try:
            params = json.loads(_str)
            return params
        except:
            return _str
 
 
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
 
 
def run(addr="0.0.0.0", port=12881):
    handler = DataServer
    try:
        httpd = ThreadedHTTPServer((addr, port), handler)
        print("HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        pass
 
 
if __name__ == "__main__":
    run()