import http
|
import json
|
import logging
|
import multiprocessing
|
import queue
|
import socketserver
|
import threading
|
import time
|
from http.server import BaseHTTPRequestHandler
|
|
import psutil
|
import requests
|
|
from api import low_suction_data_pusher
|
from code_attribute import global_data_loader
|
from huaxin_client import l2_client_test, l1_subscript_codes_manager
|
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
|
logger_local_huaxin_l2_transaction_accurate_big_order
|
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
|
from third_data.history_k_data_manager import HistoryKDataManager
|
from third_data.history_k_data_util import HistoryKDatasUtils
|
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
|
from utils import tool, middle_api_protocol, global_util
|
import urllib.parse as urlparse
|
from urllib.parse import parse_qs
|
|
|
class DataServer(BaseHTTPRequestHandler):
|
# 禁用日志输出
|
def log_message(self, format, *args):
|
pass
|
|
def do_GET(self):
|
path = self.path
|
url = urlparse.urlparse(path)
|
response_data = ""
|
if url.path == "/get_block_codes_money":
|
# 获取板块对应的代码与该代码的净流入
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
block = ps_dict.get('block')
|
try:
|
fdatas = BlockInMoneyRankManager().get_block_codes_money(block)
|
response_data = json.dumps({"code": 0, "data": fdatas})
|
except Exception as e:
|
response_data = json.dumps({"code": 1, "msg": str(e)})
|
elif url.path == "/get_big_order_list":
|
# 获获取代码的大买/卖单列表
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
code = ps_dict.get('code')
|
try:
|
buy_datas = CodeInMoneyManager().get_big_buy_money_list(code)
|
if buy_datas is None:
|
buy_datas = []
|
sell_datas = CodeInMoneyManager().get_big_sell_money_list(code)
|
if sell_datas is None:
|
sell_datas = []
|
response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}})
|
except Exception as e:
|
response_data = json.dumps({"code": 1, "msg": str(e)})
|
elif url.path == "/get_code_money_info":
|
# 获取代码金额
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
code = ps_dict.get('code')
|
money_info = CodeInMoneyManager().get_money_info(code)
|
response_data = json.dumps({"code": 0, "data": money_info})
|
elif url.path == "/get_codes_money_info":
|
ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
|
codes_str = ps_dict.get('codes')
|
codes = json.loads(codes_str)
|
fresults = {}
|
for code in codes:
|
money_info = CodeInMoneyManager().get_money_info(code)
|
fresults[code] = money_info
|
response_data = json.dumps({"code": 0, "data": fresults})
|
self.send_response(200)
|
# 发给请求客户端的响应数据
|
self.send_header('Content-type', 'application/json')
|
self.end_headers()
|
self.wfile.write(response_data.encode())
|
|
|
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
pass
|
|
|
def __run_server(addr, port):
|
handler = DataServer
|
try:
|
httpd = ThreadedHTTPServer((addr, port), handler)
|
print("L2_IN_MONEY HTTP server is at: http://%s:%d/" % (addr, port))
|
httpd.serve_forever()
|
except Exception as e:
|
logger_system.exception(e)
|
logger_system.error(f"端口服务器:{port} 启动失败")
|
|
|
def __run_upload_big_order_task(_queue: queue.Queue):
|
# 运行上传大单任务
|
while True:
|
try:
|
datas = []
|
while not _queue.empty():
|
datas.append(_queue.get())
|
if datas:
|
# 上传数据
|
requests.post("http://192.168.84.71:12881/upload_deal_big_orders", json.dumps(datas))
|
except:
|
pass
|
finally:
|
time.sleep(1)
|
|
|
def __get_special_codes():
|
"""
|
获取特殊的代码,需要订阅300w以上的大单
|
@return: 代码集合
|
"""
|
try:
|
zylt_volume_map = global_util.zylt_volume_map
|
codes = set()
|
last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0]
|
for code in zylt_volume_map:
|
if code == '601288':
|
print("")
|
volume = zylt_volume_map.get(code)
|
# 今日涨停价要突破昨日最高价
|
k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
|
if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
|
# 自由流通市值在10亿-300亿以上
|
limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
|
if limit_up_price > k_bars[0]["high"] or True:
|
# 今日涨停价要突破昨日最高价
|
codes.add(code)
|
# 获取辨识度的票
|
special_codes = BlockSpecialCodesManager().get_origin_code_blocks_dict().keys()
|
if special_codes:
|
codes |= set(special_codes)
|
return codes
|
except Exception as e:
|
logger_system.exception(e)
|
return set()
|
|
|
def __save_accurate_big_order(big_accurate_order_queue):
|
while True:
|
try:
|
datas = []
|
while not big_accurate_order_queue.empty():
|
data = big_accurate_order_queue.get()
|
datas.append(data)
|
low_suction_data_pusher.push_big_order(datas)
|
for data in datas:
|
logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
|
except:
|
pass
|
|
|
def run():
|
special_codes = __get_special_codes()
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
|
codes = [x.decode() for x in codes_sh]
|
codes.extend([x.decode() for x in codes_sz])
|
codes.sort()
|
# 绑定CPU的核心是0-16
|
cpu_count = 16
|
page_size = int(len(codes) / cpu_count) + 1
|
|
big_order_queue = multiprocessing.Queue(maxsize=1024)
|
big_accurate_order_queue = multiprocessing.Queue(maxsize=1024)
|
# 大单上传队列
|
big_order_upload_queue = queue.Queue(maxsize=1024)
|
|
for i in range(cpu_count):
|
process = multiprocessing.Process(target=l2_client_test.run,
|
args=(
|
codes[i * page_size:(i + 1) * page_size], big_order_queue,
|
big_accurate_order_queue, special_codes,))
|
|
process.start()
|
# 绑核运行
|
psutil.Process(process.pid).cpu_affinity([i])
|
threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
|
threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start()
|
|
while True:
|
try:
|
data = big_order_queue.get()
|
CodeInMoneyManager().add_data(data)
|
# 添加上传数据
|
big_order_upload_queue.put_nowait(data)
|
logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
|
except:
|
pass
|
|
|
def __compute_and_upload():
|
def __upload_data(type_, data_):
|
root_data = {
|
"type": type_,
|
"data": data_
|
}
|
requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
|
|
def __upload_codes_in_money():
|
"""
|
上传所有代码的流入
|
@param type_:
|
@param data_:
|
@return:
|
"""
|
|
root_data = {
|
"data": json.dumps(CodeInMoneyManager().get_code_money_dict())
|
}
|
requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data))
|
|
while True:
|
try:
|
if not tool.is_trade_time():
|
continue
|
BlockInMoneyRankManager().compute()
|
in_list = BlockInMoneyRankManager().get_in_list()
|
out_list = BlockInMoneyRankManager().get_out_list()
|
# (代码,名称,强度,主力净额)
|
fins = [(0, x[0], 0, x[1]) for x in in_list[:100]]
|
fouts = [(0, x[0], 0, x[1]) for x in out_list[:50]]
|
# 上传
|
__upload_data("jingxuan_rank", json.dumps(fins))
|
__upload_data("jingxuan_rank_out", json.dumps(fouts))
|
__upload_codes_in_money()
|
try:
|
low_suction_data_pusher.push_block_in(in_list)
|
except:
|
pass
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(3)
|
|
|
def __update_today_limit_up_records():
|
while True:
|
try:
|
BlockInMoneyRankManager().load_today_limit_up_codes()
|
except:
|
pass
|
finally:
|
time.sleep(3)
|
|
|
if __name__ == "__main__":
|
# 载入自由流通量
|
global_data_loader.load_zyltgb_volume_from_db()
|
threading.Thread(target=__compute_and_upload, daemon=True).start()
|
# 启动内部接口服务
|
threading.Thread(target=__run_server, args=("0.0.0.0", 9005,), daemon=True).start()
|
# 启用定时更新当日涨停
|
threading.Thread(target=__update_today_limit_up_records, daemon=True).start()
|
run()
|
while True:
|
time.sleep(2)
|