import json import logging import multiprocessing import threading import time import psutil import requests from huaxin_client import l2_client_test, l1_subscript_codes_manager from log_module.log import logger_local_huaxin_l2_transaction_big_order from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager from utils import tool def run(): codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() codes = [x.decode() for x in codes_sh] codes.extend([x.decode() for x in codes_sz]) codes.sort() # 绑定CPU的核心是0-16 cpu_count = 16 page_size = int(len(codes) / cpu_count) + 1 big_order_queue = multiprocessing.Queue() for i in range(cpu_count): process = multiprocessing.Process(target=l2_client_test.run, args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,)) process.start() # 绑核运行 psutil.Process(process.pid).cpu_affinity([i]) while True: try: data = big_order_queue.get() CodeInMoneyManager().add_data(data) logger_local_huaxin_l2_transaction_big_order.info(f"{data}") except: pass def __compute_and_upload(): def __upload_data(type_, data_): root_data = { "type": type_, "data": data_ } requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data)) def __upload_codes_in_money(): """ 上传所有代码的流入 @param type_: @param data_: @return: """ root_data = { "data": json.dumps(CodeInMoneyManager().get_code_money_dict()) } requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data)) while True: try: if not tool.is_trade_time(): continue BlockInMoneyRankManager().compute() in_list = BlockInMoneyRankManager().get_in_list() out_list = BlockInMoneyRankManager().get_out_list() fins = [(0, x[0], 0, x[1]) for x in in_list[:20]] fouts = [(0, x[0], 0, x[1]) for x in out_list[:20]] # (代码,名称,强度,主力净额) # 上传 __upload_data("jingxuan_rank", json.dumps(fins)) __upload_data("jingxuan_rank_out", json.dumps(fouts)) __upload_codes_in_money() except Exception as e: logging.exception(e) finally: time.sleep(3) if __name__ == "__main__": threading.Thread(target=__compute_and_upload, daemon=True).start() run() while True: time.sleep(2)