import concurrent.futures
|
import hashlib
|
import http
|
import json
|
import socketserver
|
from http.server import BaseHTTPRequestHandler
|
import urllib.parse as urlparse
|
|
from strategy import data_cache
|
from trade import huaxin_trade_api, huaxin_trade_data_update
|
from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager
|
from utils import tool, huaxin_util
|
|
|
class DataServer(BaseHTTPRequestHandler):
|
__data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
# 禁用日志输出
|
def log_message(self, format, *args):
|
pass
|
|
def do_GET(self):
|
|
def get_params(url):
|
return dict([(k, v[0]) for k, v in urlparse.parse_qs(url.query).items()])
|
|
path = self.path
|
url = urlparse.urlparse(path)
|
params_dict = get_params(url)
|
response_data = ""
|
if url.path == "/get_position_list":
|
# 获取持仓列表
|
results = PositionManager.get_position_cache()
|
response_data = json.dumps({"code": 0, "data": results})
|
elif url.path == "/get_money":
|
# 获取资金信息
|
result = MoneyManager.get_data()
|
response_data = json.dumps({"code": 0, "data": result})
|
elif url.path == "/get_deal_list":
|
# 获取成交列表
|
results = DealRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"))
|
response_data = json.dumps({"code": 0, "data": results})
|
elif url.path == "/get_delegate_list":
|
# 获取委托列表
|
# 是否可撤单,如果不传默认拉取所有
|
can_cancel = params_dict.get("can_cancel")
|
order_status = []
|
if can_cancel is None:
|
if can_cancel:
|
order_status = [huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Unknown,
|
huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]
|
else:
|
order_status = [huaxin_util.TORA_TSTP_OST_AllTraded, huaxin_util.TORA_TSTP_OST_PartTradeCanceled,
|
huaxin_util.TORA_TSTP_OST_AllCanceled, huaxin_util.TORA_TSTP_OST_Rejected]
|
results = DelegateRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=order_status)
|
response_data = json.dumps({"code": 0, "data": results})
|
|
elif url.path == "/refresh_trade_data":
|
# 刷新交易数据
|
_type = params_dict.get("type")
|
if _type == "money":
|
huaxin_trade_data_update.add_money_list()
|
elif _type == "delegate":
|
huaxin_trade_data_update.add_delegate_list("手动刷新")
|
elif _type == "deal":
|
huaxin_trade_data_update.add_deal_list()
|
elif _type == "position":
|
huaxin_trade_data_update.add_position_list()
|
response_data = json.dumps({"code": 0, "data": {}})
|
|
elif url.path == "/get_market_info":
|
# 获取市场行情信息
|
codes_str = params_dict.get("codes")
|
codes = json.loads(codes_str)
|
fdatas = []
|
for code in codes:
|
data = data_cache.latest_code_market_info_dict.get(code)
|
if data:
|
fdatas.append(data)
|
response_data = json.dumps({"code": 0, "data": fdatas})
|
self.send_response(200)
|
# 发给请求客户端的响应数据
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(response_data.encode())
|
|
def __is_sign_right(self, params):
|
ps = []
|
for k, v in params.items():
|
if k == 'sign':
|
continue
|
ps.append(f"{k}={v}")
|
ps.sort()
|
source_str = "&".join(ps) + "!@#lowSU*^cTion8888"
|
md5_hash = hashlib.md5()
|
# 将字符串编码为字节并更新哈希对象
|
md5_hash.update(source_str.encode('utf-8'))
|
# 获取十六进制表示的哈希值
|
md5_hexdigest = md5_hash.hexdigest()
|
if md5_hexdigest == params.get("sign"):
|
return True
|
return False
|
|
def do_POST(self):
|
result_str = ""
|
try:
|
path = self.path
|
url = urlparse.urlparse(path)
|
if url.path == "/trade_callback":
|
# 接受开盘啦数据
|
body = self.__parse_request()
|
if type(body) != str:
|
huaxin_trade_api.add_trade_callback_data(json.dumps(body))
|
else:
|
huaxin_trade_api.add_trade_callback_data(body)
|
result_str = json.dumps({"code": 0})
|
elif url.path == "/buy":
|
# 签名验证
|
params = self.__parse_request()
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
print("买入", params)
|
# 买入
|
code = params.get("code") # 代码
|
volume = params.get("volume") # 量
|
price = round(params.get("price"), 2) # 价格
|
result = huaxin_trade_api.order(1, code, volume, price)
|
result_str = json.dumps({"code": 0, "data": result})
|
elif url.path == "/sell":
|
params = self.__parse_request()
|
# 签名验证
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
# 卖出
|
print("卖出", params)
|
code = params.get("code") # 代码
|
volume = params.get("volume") # 量
|
price = round(params.get("price"), 2) # 价格
|
result = huaxin_trade_api.order(2, code, volume, price)
|
result_str = json.dumps({"code": 0, "data": result})
|
except Exception as e:
|
result_str = json.dumps({"code": 1, "msg": str(e)})
|
finally:
|
self.__send_response(result_str)
|
|
def __send_response(self, data):
|
# 发给请求客户端的响应数据
|
self.send_response(200)
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(data.encode())
|
|
def __parse_request(self):
|
params = {}
|
datas = self.rfile.read(int(self.headers['content-length']))
|
_str = str(datas, encoding="gbk")
|
# print(_str)
|
try:
|
params = json.loads(_str)
|
return params
|
except:
|
return _str
|
|
|
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
pass
|
|
|
def run(addr="0.0.0.0", port=12881):
|
handler = DataServer
|
try:
|
httpd = ThreadedHTTPServer((addr, port), handler)
|
print("HTTP server is at: http://%s:%d/" % (addr, port))
|
httpd.serve_forever()
|
except Exception as e:
|
pass
|