import concurrent.futures
|
import copy
|
import hashlib
|
import http
|
import json
|
import logging
|
import socketserver
|
import time
|
from http.server import BaseHTTPRequestHandler
|
import urllib.parse as urlparse
|
|
import psutil
|
|
import constant
|
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
|
from db.redis_manager_delegate import RedisUtils
|
from log_module import log_export, async_log_util
|
from log_module.log import hx_logger_l2_transaction, logger_debug, logger_request_api, logger_system
|
from strategy import data_cache
|
from strategy.forbidden_plates_manager import ForbiddenPlatesManager
|
from strategy.kpl_data_manager import KPLMarketsSiftPlateLogManager, KPLMarketStockHeatLogManager
|
from strategy.trade_setting import TradeSetting
|
from trade import huaxin_trade_api, huaxin_trade_data_update
|
from trade.huaxin_trade_record_manager import DelegateRecordManager, DealRecordManager, MoneyManager, PositionManager
|
from utils import tool, huaxin_util, socket_util
|
|
|
class DataServer(BaseHTTPRequestHandler):
|
__data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
# 禁用日志输出
|
def log_message(self, format, *args):
|
pass
|
|
def do_GET(self):
|
|
def get_params(url):
|
return dict([(k, v[0]) for k, v in urlparse.parse_qs(url.query).items()])
|
|
path = self.path
|
url = urlparse.urlparse(path)
|
params_dict = get_params(url)
|
response_data = ""
|
if url.path == "/get_position_list":
|
# 获取持仓列表
|
results = PositionManager.get_position_cache()
|
results = copy.deepcopy(results)
|
for r in results:
|
r["auto_sell"] = 1 if r["securityID"] in data_cache.LIMIT_UP_SELL_CODES else 0
|
response_data = json.dumps({"code": 0, "data": results})
|
elif url.path == "/get_money":
|
# 获取资金信息
|
result = MoneyManager.get_cache()
|
response_data = json.dumps({"code": 0, "data": result})
|
elif url.path == "/get_deal_list":
|
# 获取成交列表
|
results = DealRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"))
|
if results:
|
for d in results:
|
d["securityName"] = data_cache.DataCache().code_name_dict.get(tool.get_symbol(d["securityID"]))
|
|
response_data = json.dumps({"code": 0, "data": results})
|
elif url.path == "/get_delegate_list":
|
# 获取委托列表
|
# 是否可撤单,如果不传默认拉取所有
|
print("参数", params_dict)
|
can_cancel = params_dict.get("can_cancel")
|
order_status = []
|
if can_cancel is not None:
|
if int(can_cancel):
|
print("获取未结委托")
|
order_status = [huaxin_util.TORA_TSTP_OST_Cached, huaxin_util.TORA_TSTP_OST_Unknown,
|
huaxin_util.TORA_TSTP_OST_Accepted, huaxin_util.TORA_TSTP_OST_PartTraded]
|
else:
|
print("获取已结委托")
|
order_status = [huaxin_util.TORA_TSTP_OST_AllTraded, huaxin_util.TORA_TSTP_OST_PartTradeCanceled,
|
huaxin_util.TORA_TSTP_OST_AllCanceled, huaxin_util.TORA_TSTP_OST_Rejected]
|
results = DelegateRecordManager.list_by_day(tool.get_now_date_str("%Y%m%d"), None, orderStatus=order_status)
|
response_data = json.dumps({"code": 0, "data": results})
|
|
elif url.path == "/refresh_trade_data":
|
# 刷新交易数据
|
_type = params_dict.get("type")
|
if _type == "money":
|
huaxin_trade_data_update.add_money_list()
|
elif _type == "delegate":
|
huaxin_trade_data_update.add_delegate_list("手动刷新")
|
elif _type == "deal":
|
huaxin_trade_data_update.add_deal_list()
|
elif _type == "position":
|
huaxin_trade_data_update.add_position_list()
|
response_data = json.dumps({"code": 0, "data": {}})
|
|
elif url.path == "/get_market_info":
|
# 获取市场行情信息
|
codes_str = params_dict.get("codes")
|
codes = json.loads(codes_str)
|
fdatas = []
|
for code in codes:
|
data = data_cache.latest_code_market_info_dict.get(code)
|
# logger_debug.info(f"获取L1行情接口:{code}-{data}")
|
if data:
|
fdatas.append(data)
|
response_data = json.dumps({"code": 0, "data": fdatas})
|
elif url.path == "/get_buy_money":
|
# 获取每次买入的金额
|
money = data_cache.BUY_MONEY_PER_CODE
|
response_data = json.dumps({"code": 0, "data": {"money": money}})
|
elif url.path == "/get_trade_settings":
|
fdata = {"running": TradeSetting().get_running(), "auto_sell": TradeSetting().get_auto_sell(),
|
"auto_buy": TradeSetting().get_auto_buy()}
|
response_data = json.dumps({"code": 0, "data": fdata})
|
|
elif url.path == "/get_env":
|
request_id = params_dict.get("request_id")
|
use_time_list = []
|
try:
|
__start_time = time.time()
|
fdata = {}
|
# try:
|
# date = HistoryKDatasUtils.get_trading_dates(tool.date_sub(tool.get_now_date_str(), 10),
|
# tool.get_now_date_str())
|
# if date:
|
# fdata["juejin"] = 1
|
# except Exception as e:
|
# fdata["juejin"] = 0
|
# fdata["kpl"] = {}
|
# # 获取开盘啦数据
|
# kpl_types = [KPLDataType.LIMIT_UP.value, KPLDataType.JINGXUAN_RANK.value,
|
# KPLDataType.INDUSTRY_RANK.value]
|
# for kpl_type in kpl_types:
|
# if kpl_type in KPLDataManager.kpl_data_update_info:
|
# fdata["kpl"][kpl_type] = KPLDataManager.kpl_data_update_info.get(kpl_type)
|
|
try:
|
# 验证redis
|
RedisUtils.get(redis_manager.RedisManager(0).getRedis(), "test")
|
fdata["redis"] = 1
|
except:
|
fdata["redis"] = 0
|
use_time_list.append(("验证redis", time.time() - __start_time))
|
|
try:
|
# 验证mysql
|
mysql_data.Mysqldb().select_one("select 1")
|
fdata["mysql"] = 1
|
except:
|
fdata["mysql"] = 0
|
use_time_list.append(("验证mysql", time.time() - __start_time))
|
|
try:
|
# redis异步任务数量
|
fdata["redis_async_task_count"] = redis_manager.RedisUtils.get_async_task_count()
|
except:
|
pass
|
use_time_list.append(("验证异步任务数量", time.time() - __start_time))
|
|
# 获取交易通道
|
try:
|
can_access = huaxin_trade_api.test_trade_channel()
|
fdata["trade_channel_access"] = 1 if can_access else 0
|
except Exception as e:
|
logger_debug.exception(e)
|
fdata["trade_channel_access"] = 0
|
use_time_list.append(("验证交易通道", time.time() - __start_time))
|
|
# 获取CPU与内存适用情况
|
memory_info = psutil.virtual_memory()
|
cpu_percent = psutil.cpu_percent(interval=1)
|
fdata["device"] = {"cpu": cpu_percent, "memery": memory_info.percent}
|
|
use_time_list.append(("获取设备资源占用", time.time() - __start_time))
|
# 获取交易通道
|
result = {"code": 0, "data": fdata, "msg": ""}
|
# print("OnGetEnvInfo 成功")
|
response_data = json.dumps(result)
|
except Exception as e:
|
response_data = json.dumps({"code": 1, "msg": str(e)})
|
logger_debug.error(f"环境获取异常:{request_id}")
|
logger_debug.exception(e)
|
finally:
|
if use_time_list and use_time_list[-1][1] > 10:
|
logger_debug.warning(f"环境获取时间大于10s({request_id}):{use_time_list}")
|
# 获取板块强度数据
|
elif url.path == "/load_kpl_market_sift_plate":
|
# 加载数据
|
KPLMarketsSiftPlateLogManager().load_data()
|
response_data = json.dumps({"code": 0, "msg": "暂无内容"})
|
elif url.path == "/get_kpl_market_sift_plate":
|
# 获取开盘啦流入板块详细信息
|
print("==========get_kpl_market_sift_plate==========")
|
try:
|
time_str = params_dict.get("time")
|
if not time_str:
|
time_str = tool.get_now_time_str()
|
fdatas = KPLMarketsSiftPlateLogManager().get_filter_log_datas()
|
response_data = json.dumps({"code": 1, "msg": "暂无内容"})
|
for i in range(len(fdatas) - 1, -1, -1):
|
if fdatas[i][0] <= time_str:
|
response_data = json.dumps({"code": 0, "data": fdatas[i]})
|
break
|
except Exception as e:
|
logging.exception(e)
|
response_data = json.dumps({"code": 1, "msg": str(e)})
|
|
# 获取个股强度数据
|
elif url.path == "/load_kpl_market_stock_heat":
|
# 加载数据
|
KPLMarketStockHeatLogManager().load_data()
|
response_data = json.dumps({"code": 0, "msg": "暂无内容"})
|
elif url.path == "/get_kpl_market_stock_heat":
|
# 获取开盘啦流入板块详细信息
|
print("==========get_kpl_stock_of_markets_plate==========")
|
try:
|
time_str = params_dict.get("time")
|
if not time_str:
|
time_str = tool.get_now_time_str()
|
fdatas = KPLMarketStockHeatLogManager().get_filter_log_datas()
|
response_data = json.dumps({"code": 1, "msg": "暂无内容"})
|
for i in range(len(fdatas) - 1, -1, -1):
|
if fdatas[i][0] <= time_str:
|
response_data = json.dumps({"code": 0, "data": fdatas[i]})
|
break
|
except Exception as e:
|
logging.exception(e)
|
response_data = json.dumps({"code": 1, "msg": str(e)})
|
elif url.path == "/get_kpl_market_strong_records":
|
# 获取开盘啦市场强度记录
|
time_str = params_dict.get("time")
|
if not time_str:
|
time_str = tool.get_now_time_str()
|
datas = log_export.load_kpl_market_strong()
|
fdatas = []
|
for data in datas:
|
# (距离09:15:00的秒数, 时间, 强度)
|
fdatas.append((tool.trade_time_sub(data[0], "09:15:00"), data[0], data[1]))
|
response_data = json.dumps({"code": 0, "data": fdatas})
|
elif url.path == "/get_place_order_records":
|
datas = data_cache.purchased_stocks_details_list
|
response_data = json.dumps({"code": 0, "data": datas})
|
elif url.path == "/get_forbidden_plates":
|
datas = ForbiddenPlatesManager().list_plates()
|
# human 认为设置 fixed: 固定的
|
response_data = json.dumps(
|
{"code": 0, "data": {"human": list(datas), "fixed": list(constant.check_plate_list)}})
|
elif url.path == "/add_forbidden_plate":
|
plate = params_dict.get("plate")
|
ForbiddenPlatesManager().add_plate(plate)
|
response_data = json.dumps({"code": 0, "data": {}})
|
elif url.path == "/remove_forbidden_plate":
|
plate = params_dict.get("plate")
|
ForbiddenPlatesManager().remove_plate(plate)
|
response_data = json.dumps({"code": 0, "data": {}})
|
elif url.path == "/get_market_sift_plate_stock_dict":
|
# 获取开盘啦板块精选流入
|
data = data_cache.market_sift_plate_stock_dict
|
response_data = json.dumps({"code": 0, "data": data})
|
|
self.send_response(200)
|
# 发给请求客户端的响应数据
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(response_data.encode())
|
|
@classmethod
|
def __is_sign_right(cls, params):
|
ps = []
|
for k, v in params.items():
|
if k == 'sign':
|
continue
|
ps.append(f"{k}={v}")
|
ps.sort()
|
source_str = "&".join(ps) + "!@#lowSU*^cTion8888"
|
md5_hash = hashlib.md5()
|
# 将字符串编码为字节并更新哈希对象
|
md5_hash.update(source_str.encode('utf-8'))
|
# 获取十六进制表示的哈希值
|
md5_hexdigest = md5_hash.hexdigest()
|
if md5_hexdigest == params.get("sign"):
|
return True
|
return False
|
|
def do_POST(self):
|
result_str = ""
|
try:
|
path = self.path
|
print("接收到POST请求:", str(path))
|
url = urlparse.urlparse(path)
|
if url.path == "/trade_callback":
|
if constant.IS_SIMULATED_TRADE:
|
# 接受开盘啦数据
|
body = self.__parse_request()
|
if type(body) != str:
|
huaxin_trade_api.add_trade_callback_data(json.dumps(body))
|
else:
|
huaxin_trade_api.add_trade_callback_data(body)
|
result_str = json.dumps({"code": 0})
|
elif url.path == "/set_trade_settings":
|
params = self.__parse_request()
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
logger_debug.info(f"set_trade_settings: {params}")
|
running = params.get("running")
|
auto_sell = params.get("auto_sell")
|
auto_buy = params.get("auto_buy")
|
if running is not None:
|
TradeSetting().set_running(int(running))
|
if auto_sell is not None:
|
TradeSetting().set_auto_sell(int(auto_sell))
|
if auto_buy is not None:
|
TradeSetting().set_auto_buy(int(auto_buy))
|
result_str = json.dumps({"code": 0, "data": {}})
|
|
elif url.path == "/buy":
|
# 签名验证
|
params = self.__parse_request()
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
print("买入", params)
|
logger_request_api.info(f"买入:{params}")
|
# 买入
|
code = params.get("code") # 代码
|
volume = params.get("volume") # 量
|
price = params.get("price")
|
if not price:
|
# 没有上传价格,就需要获取最近的价格进行买入
|
data = data_cache.latest_code_market_info_dict.get(code)
|
if not data:
|
raise Exception("没有获取到L1数据")
|
pre_price = data[1]
|
current_price = data[2] if data[2] else data[5][0][0]
|
price = tool.get_buy_max_price(current_price)
|
price = min(price, tool.get_limit_up_price(code, pre_price))
|
else:
|
price = round(float(params.get("price")), 2) # 价格
|
result = huaxin_trade_api.order(1, code, volume, price, blocking=True)
|
result_str = json.dumps(result)
|
elif url.path == "/sell":
|
params = self.__parse_request()
|
# 签名验证
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
# 卖出
|
try:
|
print("卖出", params)
|
code = params.get("code") # 代码
|
volume = params.get("volume") # 量
|
price = params.get("price")
|
if not price:
|
# 没有上传价格,就需要获取最近的价格进行买入
|
data = data_cache.latest_code_market_info_dict.get(code)
|
if not data:
|
raise Exception("没有获取到L1数据")
|
pre_price = data[1]
|
current_price = data[2] if data[2] else data[5][0][0]
|
# 获取最新成交价格
|
latest_deal_price = data_cache.latest_deal_price_dict.get(code)
|
if latest_deal_price:
|
current_price = round(float(latest_deal_price), 2)
|
async_log_util.info(logger_debug, f"根据成交价卖出:{code}-{latest_deal_price}")
|
|
price = tool.get_buy_min_price(current_price)
|
price = max(price, tool.get_limit_down_price(code, pre_price))
|
else:
|
price = round(params.get("price"), 2) # 价格
|
result = huaxin_trade_api.order(2, code, volume, price, blocking=True)
|
result_str = json.dumps(result)
|
finally:
|
logger_request_api.info(f"卖出:{params}")
|
|
elif url.path == "/set_buy_money":
|
# 设置每次买入的金额
|
params = self.__parse_request()
|
# 签名验证
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
# 卖出
|
print("每次买入的金额", params)
|
money = params.get("money") # 金额
|
if money is None:
|
result_str = json.dumps({"code": 1, "msg": "未上传金额"})
|
return
|
money = int(money)
|
|
logger_debug.info(f"设置开仓金额:{money}")
|
data_cache.BUY_MONEY_PER_CODE = money
|
result_str = json.dumps({"code": 0})
|
|
elif url.path == "/set_limit_up_sell":
|
# 设置每次买入的金额
|
params = self.__parse_request()
|
# 签名验证
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
# 卖出
|
print("每次买入的金额", params)
|
code = params.get("code") #代码
|
enable = params.get("enable") # 是否开启
|
if code is None or enable is None:
|
result_str = json.dumps({"code": 1, "msg": "上传数据缺失"})
|
return
|
enable = int(enable)
|
if enable:
|
data_cache.LIMIT_UP_SELL_CODES.add(code)
|
else:
|
data_cache.LIMIT_UP_SELL_CODES.discard(code)
|
result_str = json.dumps({"code": 0})
|
|
elif url.path == "/cancel_order":
|
params = self.__parse_request()
|
# 签名验证
|
if not self.__is_sign_right(params):
|
result_str = json.dumps({"code": 1001, "msg": "签名错误"})
|
return
|
# 卖出
|
print("撤单", params)
|
direction = params.get("direction")
|
code = params.get("code") # 代码
|
orderSysID = params.get("orderSysID") # 系统订单编号
|
result = huaxin_trade_api.cancel_order(direction, code, orderSysID, blocking=True)
|
result_str = json.dumps(result)
|
elif url.path == "/upload_deal_big_orders":
|
# 成交大单传递
|
datas = self.rfile.read(int(self.headers['content-length']))
|
_str = str(datas, encoding="gbk")
|
datas = json.loads(_str)
|
for d in datas:
|
if d[1] != 0:
|
continue
|
code, data = d[0], d[2]
|
if code not in data_cache.big_order_deal_dict:
|
data_cache.big_order_deal_dict[code] = []
|
data_cache.big_order_deal_dict[code].append(d)
|
# 获取买大单数量
|
len(data_cache.big_order_deal_dict.get(code, []))
|
hx_logger_l2_transaction.info(_str)
|
# 记录日志
|
result_str = json.dumps({"code": 0})
|
except Exception as e:
|
result_str = json.dumps({"code": 1, "msg": str(e)})
|
finally:
|
self.__send_response(result_str)
|
|
def __send_response(self, data):
|
# 发给请求客户端的响应数据
|
self.send_response(200)
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(data.encode())
|
|
def __parse_request(self):
|
params = {}
|
datas = self.rfile.read(int(self.headers['content-length']))
|
_str = str(datas, encoding="gbk")
|
# print(_str)
|
try:
|
params = json.loads(_str)
|
return params
|
except:
|
return _str
|
|
|
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
pass
|
|
|
def run(addr="0.0.0.0", port=12881):
|
handler = DataServer
|
try:
|
httpd = ThreadedHTTPServer((addr, port), handler)
|
print("HTTP server is at: http://%s:%d/" % (addr, port))
|
httpd.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
|
|
if __name__ == "__main__":
|
run()
|