import json
|
import logging
|
import multiprocessing
|
import threading
|
import time
|
import psutil
|
import requests
|
|
from huaxin_client import l2_client_test, l1_subscript_codes_manager
|
from log_module.log import logger_local_huaxin_l2_transaction_big_order
|
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
|
from utils import tool
|
|
|
def run():
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
|
codes = [x.decode() for x in codes_sh]
|
codes.extend([x.decode() for x in codes_sz])
|
codes.sort()
|
# 绑定CPU的核心是0-16
|
cpu_count = 16
|
page_size = int(len(codes) / cpu_count) + 1
|
|
big_order_queue = multiprocessing.Queue()
|
|
for i in range(cpu_count):
|
process = multiprocessing.Process(target=l2_client_test.run,
|
args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
|
|
process.start()
|
# 绑核运行
|
psutil.Process(process.pid).cpu_affinity([i])
|
while True:
|
try:
|
data = big_order_queue.get()
|
CodeInMoneyManager().add_data(data)
|
logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
|
except:
|
pass
|
|
|
def __compute_and_upload():
|
def __upload_data(type_, data_):
|
root_data = {
|
"type": type_,
|
"data": data_
|
}
|
requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
|
|
def __upload_codes_in_money():
|
"""
|
上传所有代码的流入
|
@param type_:
|
@param data_:
|
@return:
|
"""
|
|
root_data = {
|
"data": json.dumps(CodeInMoneyManager().get_code_money_dict())
|
}
|
requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data))
|
|
while True:
|
try:
|
if not tool.is_trade_time():
|
continue
|
BlockInMoneyRankManager().compute()
|
in_list = BlockInMoneyRankManager().get_in_list()
|
out_list = BlockInMoneyRankManager().get_out_list()
|
fins = [(0, x[0], 0, x[1]) for x in in_list[:20]]
|
fouts = [(0, x[0], 0, x[1]) for x in out_list[:20]]
|
# (代码,名称,强度,主力净额)
|
# 上传
|
__upload_data("jingxuan_rank", json.dumps(fins))
|
__upload_data("jingxuan_rank_out", json.dumps(fouts))
|
__upload_codes_in_money()
|
except Exception as e:
|
logging.exception(e)
|
finally:
|
time.sleep(3)
|
|
|
if __name__ == "__main__":
|
threading.Thread(target=__compute_and_upload, daemon=True).start()
|
run()
|
while True:
|
time.sleep(2)
|