admin
2025-01-16 9d9323c582bea82196b6813fa7331bea8494b5ea
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import concurrent.futures
import hashlib
import http
import json
import socketserver
from http.server import BaseHTTPRequestHandler
import urllib.parse as urlparse
 
from strategy import data_cache
from trade import huaxin_trade_api, huaxin_trade_data_update
from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager
from utils import tool, huaxin_util
 
 
class DataServer(BaseHTTPRequestHandler):
    __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
 
    # 禁用日志输出
    def log_message(self, format, *args):
        pass
 
    def do_GET(self):
 
        def get_params(url):
            return dict([(k, v[0]) for k, v in urlparse.parse_qs(url.query).items()])
 
        path = self.path
        url = urlparse.urlparse(path)
        params_dict = get_params(url)
        response_data = ""
        if url.path == "/get_position_list":
            # 获取持仓列表
            results = PositionManager.get_position_cache()
            response_data = json.dumps({"code": 0, "data": results})
        elif url.path == "/get_money":
            # 获取资金信息
            result = MoneyManager.get_cache()
            response_data = json.dumps({"code": 0, "data": result})
        elif url.path == "/get_deal_list":
            # 获取成交列表
            results = DealRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"))
            response_data = json.dumps({"code": 0, "data": results})
        elif url.path == "/get_delegate_list":
            # 获取委托列表
            # 是否可撤单,如果不传默认拉取所有
            can_cancel = params_dict.get("can_cancel")
            order_status = []
            if can_cancel is None:
                if can_cancel:
                    order_status = [huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Unknown,
                                    huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]
                else:
                    order_status = [huaxin_util.TORA_TSTP_OST_AllTraded, huaxin_util.TORA_TSTP_OST_PartTradeCanceled,
                                    huaxin_util.TORA_TSTP_OST_AllCanceled, huaxin_util.TORA_TSTP_OST_Rejected]
            results = DelegateRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=order_status)
            response_data = json.dumps({"code": 0, "data": results})
 
        elif url.path == "/refresh_trade_data":
            # 刷新交易数据
            _type = params_dict.get("type")
            if _type == "money":
                huaxin_trade_data_update.add_money_list()
            elif _type == "delegate":
                huaxin_trade_data_update.add_delegate_list("手动刷新")
            elif _type == "deal":
                huaxin_trade_data_update.add_deal_list()
            elif _type == "position":
                huaxin_trade_data_update.add_position_list()
            response_data = json.dumps({"code": 0, "data": {}})
 
        elif url.path == "/get_market_info":
            # 获取市场行情信息
            codes_str = params_dict.get("codes")
            codes = json.loads(codes_str)
            fdatas = []
            for code in codes:
                data = data_cache.latest_code_market_info_dict.get(code)
                if data:
                    fdatas.append(data)
            response_data = json.dumps({"code": 0, "data": fdatas})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
 
    @classmethod
    def __is_sign_right(cls, params):
        ps = []
        for k, v in params.items():
            if k == 'sign':
                continue
            ps.append(f"{k}={v}")
        ps.sort()
        source_str = "&".join(ps) + "!@#lowSU*^cTion8888"
        md5_hash = hashlib.md5()
        # 将字符串编码为字节并更新哈希对象
        md5_hash.update(source_str.encode('utf-8'))
        # 获取十六进制表示的哈希值
        md5_hexdigest = md5_hash.hexdigest()
        if md5_hexdigest == params.get("sign"):
            return True
        return False
 
    def do_POST(self):
        result_str = ""
        try:
            path = self.path
            print("接收到POST请求:", str(path))
            url = urlparse.urlparse(path)
            if url.path == "/trade_callback":
                # 接受开盘啦数据
                body = self.__parse_request()
                if type(body) != str:
                    huaxin_trade_api.add_trade_callback_data(json.dumps(body))
                else:
                    huaxin_trade_api.add_trade_callback_data(body)
                result_str = json.dumps({"code": 0})
            elif url.path == "/buy":
                # 签名验证
                params = self.__parse_request()
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                print("买入", params)
                # 买入
                code = params.get("code")  # 代码
                volume = params.get("volume")  # 量
                price = round(params.get("price"), 2)  # 价格
                result = huaxin_trade_api.order(1, code, volume, price, blocking=True)
                result_str = json.dumps({"code": 0, "data": result})
            elif url.path == "/sell":
                params = self.__parse_request()
                # 签名验证
                if not self.__is_sign_right(params):
                    result_str = json.dumps({"code": 1001, "msg": "签名错误"})
                    return
                # 卖出
                print("卖出", params)
                code = params.get("code")  # 代码
                volume = params.get("volume")  # 量
                price = round(params.get("price"), 2)  # 价格
                result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
                result_str = json.dumps({"code": 0, "data": result})
        except Exception as e:
            result_str = json.dumps({"code": 1, "msg": str(e)})
        finally:
            self.__send_response(result_str)
 
    def __send_response(self, data):
        # 发给请求客户端的响应数据
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(data.encode())
 
    def __parse_request(self):
        params = {}
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        # print(_str)
        try:
            params = json.loads(_str)
            return params
        except:
            return _str
 
 
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
 
 
def run(addr="0.0.0.0", port=12881):
    handler = DataServer
    try:
        httpd = ThreadedHTTPServer((addr, port), handler)
        print("HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        pass