import http
|
import json
|
import logging
|
import multiprocessing
|
import socketserver
|
import threading
|
import time
|
from http.server import BaseHTTPRequestHandler
|
|
import psutil
|
import requests
|
|
from code_attribute import global_data_loader
|
from huaxin_client import l2_client_test, l1_subscript_codes_manager
|
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system
|
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
|
from utils import tool
|
import urllib.parse as urlparse
|
from urllib.parse import parse_qs
|
|
|
class DataServer(BaseHTTPRequestHandler):
|
# 禁用日志输出
|
def log_message(self, format, *args):
|
pass
|
|
def do_GET(self):
|
path = self.path
|
url = urlparse.urlparse(path)
|
response_data = ""
|
if url.path == "/get_block_codes_money":
|
# 获取板块对应的代码与该代码的净流入
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
block = ps_dict.get('block')
|
try:
|
fdatas = BlockInMoneyRankManager().get_block_codes_money(block)
|
response_data = json.dumps({"code": 0, "data": fdatas})
|
except Exception as e:
|
response_data = json.dumps({"code": 1, "msg": str(e)})
|
elif url.path == "/get_big_buy_order_list":
|
# 获获取代码的大买单列表
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
code = ps_dict.get('code')
|
try:
|
fdatas = CodeInMoneyManager().get_big_buy_money_list(code)
|
if fdatas is None:
|
fdatas = []
|
response_data = json.dumps({"code": 0, "data": fdatas})
|
except Exception as e:
|
response_data = json.dumps({"code": 1, "msg": str(e)})
|
elif url.path == "/get_code_money_info":
|
# 获取代码金额
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
code = ps_dict.get('code')
|
money_info = CodeInMoneyManager().get_money_info(code)
|
response_data = json.dumps({"code": 0, "data": money_info})
|
|
self.send_response(200)
|
# 发给请求客户端的响应数据
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(response_data.encode())
|
|
|
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
pass
|
|
|
def __run_server(addr, port):
|
handler = DataServer
|
try:
|
httpd = ThreadedHTTPServer((addr, port), handler)
|
print("L2_IN_MONEY HTTP server is at: http://%s:%d/" % (addr, port))
|
httpd.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
logger_system.error(f"端口服务器:{port} 启动失败")
|
|
|
def run():
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
|
codes = [x.decode() for x in codes_sh]
|
codes.extend([x.decode() for x in codes_sz])
|
codes.sort()
|
# 绑定CPU的核心是0-16
|
cpu_count = 16
|
page_size = int(len(codes) / cpu_count) + 1
|
|
big_order_queue = multiprocessing.Queue()
|
|
for i in range(cpu_count):
|
process = multiprocessing.Process(target=l2_client_test.run,
|
args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
|
|
process.start()
|
# 绑核运行
|
psutil.Process(process.pid).cpu_affinity([i])
|
while True:
|
try:
|
data = big_order_queue.get()
|
CodeInMoneyManager().add_data(data)
|
logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
|
except:
|
pass
|
|
|
def __compute_and_upload():
|
def __upload_data(type_, data_):
|
root_data = {
|
"type": type_,
|
"data": data_
|
}
|
requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
|
|
def __upload_codes_in_money():
|
"""
|
上传所有代码的流入
|
@param type_:
|
@param data_:
|
@return:
|
"""
|
|
root_data = {
|
"data": json.dumps(CodeInMoneyManager().get_code_money_dict())
|
}
|
requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data))
|
|
while True:
|
try:
|
if not tool.is_trade_time():
|
continue
|
BlockInMoneyRankManager().compute()
|
in_list = BlockInMoneyRankManager().get_in_list()
|
out_list = BlockInMoneyRankManager().get_out_list()
|
# (代码,名称,强度,主力净额)
|
fins = [(0, x[0], 0, x[1]) for x in in_list[:100]]
|
fouts = [(0, x[0], 0, x[1]) for x in out_list[:50]]
|
# 上传
|
__upload_data("jingxuan_rank", json.dumps(fins))
|
__upload_data("jingxuan_rank_out", json.dumps(fouts))
|
__upload_codes_in_money()
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(3)
|
|
|
def __update_today_limit_up_records():
|
while True:
|
try:
|
BlockInMoneyRankManager().load_today_limit_up_codes()
|
except:
|
pass
|
finally:
|
time.sleep(3)
|
|
|
if __name__ == "__main__":
|
# 载入自由流通量
|
global_data_loader.load_zyltgb_volume_from_db()
|
threading.Thread(target=__compute_and_upload, daemon=True).start()
|
# 启动内部接口服务
|
threading.Thread(target=__run_server, args=("0.0.0.0", 9005,), daemon=True).start()
|
# 启用定时更新当日涨停
|
threading.Thread(target=__update_today_limit_up_records, daemon=True).start()
|
run()
|
while True:
|
time.sleep(2)
|