Administrator
2024-11-19 340927061f8be6e308927e76039e9917197a2479
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import json
import logging
import multiprocessing
import threading
import time
import psutil
import requests
 
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_transaction_big_order
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
 
 
def run():
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes = [x.decode() for x in codes_sh]
    codes.extend([x.decode() for x in codes_sz])
    codes.sort()
    # 绑定CPU的核心是0-16
    cpu_count = 16
    page_size = int(len(codes) / cpu_count) + 1
 
    big_order_queue = multiprocessing.Queue()
 
    for i in range(cpu_count):
        process = multiprocessing.Process(target=l2_client_test.run,
                                          args=(codes[i * page_size:(i + 1) * page_size], big_order_queue,))
 
        process.start()
        # 绑核运行
        psutil.Process(process.pid).cpu_affinity([i])
    while True:
        try:
            data = big_order_queue.get()
            CodeInMoneyManager().add_data(data)
            logger_local_huaxin_l2_transaction_big_order.info(f"{data}")
        except:
            pass
 
 
def __compute_and_upload():
    def __upload_data(type_, data_):
        root_data = {
            "type": type_,
            "data": data_
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
 
    def __upload_codes_in_money():
        """
        上传所有代码的流入
        @param type_:
        @param data_:
        @return:
        """
 
        root_data = {
            "data": json.dumps(CodeInMoneyManager().get_code_money_dict())
        }
        requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data))
 
    while True:
        try:
            BlockInMoneyRankManager().compute()
            in_list = BlockInMoneyRankManager().get_in_list()
            out_list = BlockInMoneyRankManager().get_out_list()
            fins = [(0, x[0], 0, x[1]) for x in in_list[:20]]
            fouts = [(0, x[0], 0, x[1]) for x in out_list[:20]]
            # (代码,名称,强度,主力净额)
            # 上传
            __upload_data("jingxuan_rank", json.dumps(fins))
            __upload_data("jingxuan_rank_out", json.dumps(fouts))
            __upload_codes_in_money()
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(3)
 
 
if __name__ == "__main__":
    threading.Thread(target=__compute_and_upload, daemon=True).start()
    run()
    while True:
        time.sleep(2)