Administrator
2023-08-31 d47fbd65ab8197348ad293b7948fcdd2f8995594
L2数据上传任务守护线程
3个文件已修改
67 ■■■■■ 已修改文件
huaxin_client/l2_client.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -123,9 +123,9 @@
        del_codes = self.subscripted_codes - codes
        print("add del codes", add_codes, del_codes)
        for c in codes:
            l2_data_manager.target_codes.add(c)
            l2_data_manager.add_target_code(c)
        for c in del_codes:
            l2_data_manager.target_codes.discard(c)
            l2_data_manager.del_target_code(c)
        for c in add_codes:
            l2_data_manager.run_upload_task(c, l2_data_callback)
        self.__subscribe(add_codes)
@@ -592,6 +592,7 @@
        l2_data_manager.run_upload_common(l2_data_callback)
        l2_data_manager.run_upload_trading_canceled(l2_data_callback)
        l2_data_manager.run_log()
        l2_data_manager.run_upload_daemon(l2_data_callback)
        # l2_data_manager.run_test(l2_data_callback)
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
huaxin_client/l2_data_manager.py
@@ -23,6 +23,7 @@
tmep_order_detail_queue_dict = {}
tmep_transaction_queue_dict = {}
target_codes = set()
target_codes_add_time = {}
common_queue = queue.Queue()
trading_canceled_queue = queue.Queue()
log_buy_no_queue = queue.Queue()
@@ -30,6 +31,18 @@
buy_order_nos_dict = {}
# 最近的大单成交单号
latest_big_order_transaction_orders_dict = {}
def add_target_code(code):
    target_codes.add(code)
    # 记录代码加入时间
    target_codes_add_time[code] = time.time()
def del_target_code(code):
    target_codes.discard(code)
    if code in target_codes_add_time:
        target_codes_add_time.pop(code)
# 获取最近的大单成交订单号
@@ -257,8 +270,13 @@
            pass
__upload_order_threads = {}
__upload_trans_threads = {}
# 运行上传任务
def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None:
    try:
    # 如果代码没有在目标代码中就不需要运行
    if code not in target_codes:
        return
@@ -266,10 +284,14 @@
    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True)
        t.start()
            __upload_order_threads[code] = t
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True)
        t.start()
            __upload_trans_threads[code] = t
    finally:
        pass
def run_upload_common(l2_data_callback: L2DataCallBack):
@@ -290,6 +312,29 @@
    t.start()
# 运行守护线程
def run_upload_daemon(_l2_data_callback):
    def upload_daemon():
        while True:
            try:
                for code in target_codes_add_time:
                    # 目标代码加入2s之后启动守护
                    if time.time() - target_codes_add_time[code] > 2:
                        if code not in __upload_order_threads or not __upload_order_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True)
                            t.start()
                            __upload_order_threads[code] = t
                        if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive():
                            t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback), daemon=True)
                            t.start()
                            __upload_trans_threads[code] = t
                    time.sleep(3)
            except:
                pass
    t = threading.Thread(target=lambda: upload_daemon(), daemon=True)
    t.start()
def __test(_l2_data_callback):
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
test/test.py
@@ -1,4 +1,5 @@
import multiprocessing
import queue
import threading
import time
@@ -6,22 +7,32 @@
from log_module import async_log_util
from log_module.log import logger_debug
__queue = queue.Queue()
def read(pipe):
    while True:
        val = pipe.recv()
        if not __queue.empty():
            val = __queue.get(block=False)
        if val:
            print("read:", val)
def write(pipe):
    while True:
        pipe.send("test")
        __queue.put_nowait("123")
        time.sleep(1)
        break
if __name__ == "__main__":
    p1, p2 = multiprocessing.Pipe()
    threading.Thread(target=lambda: write(p1), daemon=True).start()
    t1 = threading.Thread(target=lambda: write(p1), daemon=True)
    t1.start()
    print("是否alive:", t1.is_alive())
    threading.Thread(target=lambda: read(p2), daemon=True).start()
    while True:
        print("是否alive:", t1.is_alive())
        time.sleep(1)
    input()