import http import json import logging import multiprocessing import queue import socketserver import threading import time from http.server import BaseHTTPRequestHandler import psutil import requests from code_attribute import global_data_loader from huaxin_client import l2_client_test, l1_subscript_codes_manager from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \ logger_local_huaxin_l2_transaction_accurate_big_order from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import HistoryKDatasUtils from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager from utils import tool, middle_api_protocol, global_util import urllib.parse as urlparse from urllib.parse import parse_qs class DataServer(BaseHTTPRequestHandler): # 禁用日志输出 def log_message(self, format, *args): pass def do_GET(self): path = self.path url = urlparse.urlparse(path) response_data = "" if url.path == "/get_block_codes_money": # 获取板块对应的代码与该代码的净流入 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) block = ps_dict.get('block') try: fdatas = BlockInMoneyRankManager().get_block_codes_money(block) response_data = json.dumps({"code": 0, "data": fdatas}) except Exception as e: response_data = json.dumps({"code": 1, "msg": str(e)}) elif url.path == "/get_big_order_list": # 获获取代码的大买/卖单列表 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict.get('code') try: buy_datas = CodeInMoneyManager().get_big_buy_money_list(code) if buy_datas is None: buy_datas = [] sell_datas = CodeInMoneyManager().get_big_sell_money_list(code) if sell_datas is None: sell_datas = [] response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}}) except Exception as e: response_data = json.dumps({"code": 1, "msg": str(e)}) elif url.path == "/get_code_money_info": # 获取代码金额 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict.get('code') money_info = CodeInMoneyManager().get_money_info(code) response_data = json.dumps({"code": 0, "data": money_info}) elif url.path == "/get_codes_money_info": ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) codes_str = ps_dict.get('codes') codes = json.loads(codes_str) fresults = {} for code in codes: money_info = CodeInMoneyManager().get_money_info(code) fresults[code] = money_info response_data = json.dumps({"code": 0, "data": fresults}) self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def __run_server(addr, port): handler = DataServer try: httpd = ThreadedHTTPServer((addr, port), handler) print("L2_IN_MONEY HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{port} 启动失败") def __run_upload_big_order_task(_queue: queue.Queue): # 运行上传大单任务 while True: try: datas = [] while not _queue.empty(): datas.append(_queue.get()) if datas: # 上传数据 requests.post("http://192.168.84.71:12881/upload_deal_big_orders", json.dumps(datas)) except: pass finally: time.sleep(1) def __get_special_codes(): """ 获取特殊的代码,需要订阅300w以上的大单 @return: 代码集合 """ try: zylt_volume_map = global_util.zylt_volume_map codes = set() last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0] for code in zylt_volume_map: if code == '601288': print("") volume = zylt_volume_map.get(code) # 今日涨停价要突破昨日最高价 k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: # 自由流通市值在30亿-300亿以上 limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) if limit_up_price > k_bars[0]["high"]: # 今日涨停价要突破昨日最高价 codes.add(code) # 获取辨识度的票 special_codes = BlockSpecialCodesManager().get_origin_code_blocks_dict().keys() if special_codes: codes |= set(special_codes) return codes except Exception as e: logger_system.exception(e) return set() def __save_accurate_big_order(big_accurate_order_queue): while True: try: data = big_accurate_order_queue.get() logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}") except: pass def run(): special_codes = __get_special_codes() codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() codes = [x.decode() for x in codes_sh] codes.extend([x.decode() for x in codes_sz]) codes.sort() # 绑定CPU的核心是0-16 cpu_count = 16 page_size = int(len(codes) / cpu_count) + 1 big_order_queue = multiprocessing.Queue(maxsize=1024) big_accurate_order_queue = multiprocessing.Queue(maxsize=1024) # 大单上传队列 big_order_upload_queue = queue.Queue(maxsize=1024) for i in range(cpu_count): process = multiprocessing.Process(target=l2_client_test.run, args=( codes[i * page_size:(i + 1) * page_size], big_order_queue, big_accurate_order_queue, special_codes,)) process.start() # 绑核运行 psutil.Process(process.pid).cpu_affinity([i]) threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start() threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start() while True: try: data = big_order_queue.get() CodeInMoneyManager().add_data(data) # 添加上传数据 big_order_upload_queue.put_nowait(data) logger_local_huaxin_l2_transaction_big_order.info(f"{data}") except: pass def __compute_and_upload(): def __upload_data(type_, data_): root_data = { "type": type_, "data": data_ } requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data)) def __upload_codes_in_money(): """ 上传所有代码的流入 @param type_: @param data_: @return: """ root_data = { "data": json.dumps(CodeInMoneyManager().get_code_money_dict()) } requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data)) while True: try: if not tool.is_trade_time(): continue BlockInMoneyRankManager().compute() in_list = BlockInMoneyRankManager().get_in_list() out_list = BlockInMoneyRankManager().get_out_list() # (代码,名称,强度,主力净额) fins = [(0, x[0], 0, x[1]) for x in in_list[:100]] fouts = [(0, x[0], 0, x[1]) for x in out_list[:50]] # 上传 __upload_data("jingxuan_rank", json.dumps(fins)) __upload_data("jingxuan_rank_out", json.dumps(fouts)) __upload_codes_in_money() except Exception as e: logging.exception(e) finally: time.sleep(3) def __update_today_limit_up_records(): while True: try: BlockInMoneyRankManager().load_today_limit_up_codes() except: pass finally: time.sleep(3) if __name__ == "__main__": # 载入自由流通量 global_data_loader.load_zyltgb_volume_from_db() threading.Thread(target=__compute_and_upload, daemon=True).start() # 启动内部接口服务 threading.Thread(target=__run_server, args=("0.0.0.0", 9005,), daemon=True).start() # 启用定时更新当日涨停 threading.Thread(target=__update_today_limit_up_records, daemon=True).start() run() while True: time.sleep(2)