Administrator
2025-06-09 1e16e3fdc6fafc66c4a0ae168d1b4e46b61e5a70
l2_test.py
@@ -1,18 +1,156 @@
import http
import json
import logging
import multiprocessing
import queue
import socketserver
import threading
import time
from http.server import BaseHTTPRequestHandler
import psutil
import requests
from code_attribute import global_data_loader
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_transaction_big_order
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
    logger_local_huaxin_l2_transaction_accurate_big_order
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
from utils import tool
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager
from utils import tool, middle_api_protocol, global_util
import urllib.parse as urlparse
from urllib.parse import parse_qs
class DataServer(BaseHTTPRequestHandler):
    # 禁用日志输出
    def log_message(self, format, *args):
        pass
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        response_data = ""
        if url.path == "/get_block_codes_money":
            # 获取板块对应的代码与该代码的净流入
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            block = ps_dict.get('block')
            try:
                fdatas = BlockInMoneyRankManager().get_block_codes_money(block)
                response_data = json.dumps({"code": 0, "data": fdatas})
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
        elif url.path == "/get_big_order_list":
            # 获获取代码的大买/卖单列表
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict.get('code')
            try:
                buy_datas = CodeInMoneyManager().get_big_buy_money_list(code)
                if buy_datas is None:
                    buy_datas = []
                sell_datas = CodeInMoneyManager().get_big_sell_money_list(code)
                if sell_datas is None:
                    sell_datas = []
                response_data = json.dumps({"code": 0, "data": {"buy": buy_datas, "sell": sell_datas}})
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
        elif url.path == "/get_code_money_info":
            # 获取代码金额
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict.get('code')
            money_info = CodeInMoneyManager().get_money_info(code)
            response_data = json.dumps({"code": 0, "data": money_info})
        elif url.path == "/get_codes_money_info":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            codes_str = ps_dict.get('codes')
            codes = json.loads(codes_str)
            fresults = {}
            for code in codes:
                money_info = CodeInMoneyManager().get_money_info(code)
                fresults[code] = money_info
            response_data = json.dumps({"code": 0, "data": fresults})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
def __run_server(addr, port):
    handler = DataServer
    try:
        httpd = ThreadedHTTPServer((addr, port), handler)
        print("L2_IN_MONEY HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{port} 启动失败")
def __run_upload_big_order_task(_queue: queue.Queue):
    # 运行上传大单任务
    while True:
        try:
            datas = []
            while not _queue.empty():
                datas.append(_queue.get())
            if datas:
                # 上传数据
                requests.post("http://192.168.84.71:12881/upload_deal_big_orders", json.dumps(datas))
        except:
            pass
        finally:
            time.sleep(1)
def __get_special_codes():
    """
    获取特殊的代码,需要订阅300w以上的大单
    @return: 代码集合
    """
    try:
        zylt_volume_map = global_util.zylt_volume_map
        codes = set()
        last_trade_day = HistoryKDatasUtils.get_latest_trading_date(1)[0]
        for code in zylt_volume_map:
            if code == '601288':
                print("")
            volume = zylt_volume_map.get(code)
            # 今日涨停价要突破昨日最高价
            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
            if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在10亿-300亿以上
                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
                if limit_up_price > k_bars[0]["high"] or True:
                    # 今日涨停价要突破昨日最高价
                    codes.add(code)
        # 获取辨识度的票
        special_codes = BlockSpecialCodesManager().get_origin_code_blocks_dict().keys()
        if special_codes:
            codes |= set(special_codes)
        return codes
    except Exception as e:
        logger_system.exception(e)
        return set()
def __save_accurate_big_order(big_accurate_order_queue):
    while True:
        try:
            data = big_accurate_order_queue.get()
            logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
        except:
            pass
def run():
    special_codes = __get_special_codes()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = [x.decode() for x in codes_sh]
    codes.extend([x.decode() for x in codes_sz])
@@ -21,19 +159,29 @@
    cpu_count = 16
    page_size = int(len(codes) / cpu_count) + 1
    big_order_queue = multiprocessing.Queue()
    big_order_queue = multiprocessing.Queue(maxsize=1024)
    big_accurate_order_queue = multiprocessing.Queue(maxsize=1024)
    # 大单上传队列
    big_order_upload_queue = queue.Queue(maxsize=1024)
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
                                          args=(
                                              codes[i * page_size:(i + 1) * page_size], big_order_queue,
                                              big_accurate_order_queue, special_codes,))
        process.start()
        # 绑核运行
        psutil.Process(process.pid).cpu_affinity([i])
    threading.Thread(target=__run_upload_big_order_task, args=(big_order_upload_queue,), daemon=True).start()
    threading.Thread(target=__save_accurate_big_order, args=(big_accurate_order_queue,), daemon=True).start()
    while True:
        try:
            data = big_order_queue.get()
            CodeInMoneyManager().add_data(data)
            # 添加上传数据
            big_order_upload_queue.put_nowait(data)
            logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
        except:
            pass
@@ -67,9 +215,9 @@
            BlockInMoneyRankManager().compute()
            in_list = BlockInMoneyRankManager().get_in_list()
            out_list = BlockInMoneyRankManager().get_out_list()
            fins = [(0, x[0], 0, x[1]) for x in in_list[:20]]
            fouts = [(0, x[0], 0, x[1]) for x in out_list[:20]]
            # (代码,名称,强度,主力净额)
            fins = [(0, x[0], 0, x[1]) for x in in_list[:100]]
            fouts = [(0, x[0], 0, x[1]) for x in out_list[:50]]
            # 上传
            __upload_data("jingxuan_rank", json.dumps(fins))
            __upload_data("jingxuan_rank_out", json.dumps(fouts))
@@ -80,8 +228,24 @@
            time.sleep(3)
def __update_today_limit_up_records():
    while True:
        try:
            BlockInMoneyRankManager().load_today_limit_up_codes()
        except:
            pass
        finally:
            time.sleep(3)
if __name__ == "__main__":
    # 载入自由流通量
    global_data_loader.load_zyltgb_volume_from_db()
    threading.Thread(target=__compute_and_upload, daemon=True).start()
    # 启动内部接口服务
    threading.Thread(target=__run_server, args=("0.0.0.0", 9005,), daemon=True).start()
    # 启用定时更新当日涨停
    threading.Thread(target=__update_today_limit_up_records, daemon=True).start()
    run()
    while True:
        time.sleep(2)