Administrator
2024-01-11 7f026a4d59ee64f80f86cd5ee8601e96f27bd2ad
添加测试
4个文件已修改
38 ■■■■ 已修改文件
huaxin_client/l2_client.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mmap.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mul_queue.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -6,6 +6,7 @@
import queue
import threading
import time
import concurrent.futures
from typing import List
from huaxin_client import command_manager, l2_data_transform_protocol
@@ -491,8 +492,7 @@
class MyL2ActionCallback(L2ActionCallback):
    def OnSetL2Position(self, codes_data):
        print("L2订阅数量:", len(codes_data))
        logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        try:
            spi.set_codes_data(codes_data)
        except Exception as e:
@@ -532,6 +532,9 @@
    api.Init()
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
@@ -547,7 +550,10 @@
                    code = data["data"]["code"]
                    spi.set_code_special_watch_volume(code, volume)
                elif _type == "l2_cmd":
                    l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
                    # 线程池
                    __l2_cmd_thread_pool.submit(
                        lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data))
        except Exception as e:
            logging.exception(e)
test/test_mmap.py
@@ -9,10 +9,10 @@
def run_process_1(pipe):
    tag = 'l2-000333'
    with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname=tag, access=mmap.ACCESS_WRITE)) as m:
        for i in range(1, 10001):
        for i in range(1, 100):
            start = time.time()
            m.seek(0)
            m.write((f"msg {i} " * 10000).encode("utf-8"))
            m.write((f"msg {i} " * 1).encode("utf-8"))
            m.flush()
            print("耗时", time.time() - start)
            time.sleep(1)
@@ -27,12 +27,8 @@
                print(len(s), s)
            time.sleep(1)
if __name__ == '__main__':
    l2_data_manager.test()
    pass
if __name__ == '__main__1':
    p1, p2 = multiprocessing.Pipe()
    serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,))
    jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,))
test/test_mul_queue.py
@@ -29,9 +29,19 @@
        except Exception as e:
            logging.exception(e)
if __name__ == "__main__":
    q = multiprocessing.Queue()
    while True:
        try:
            val = q.get_nowait()
            print(val)
        except:
            time.sleep(0.005)
            q.put_nowait("123123")
            print("出错")
if __name__ == "__main__1":
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=run_process1, args=(q,))
    p2 = multiprocessing.Process(target=run_process2, args=(q,))
    p3 = multiprocessing.Process(target=run_process3, args=(q,))
trade/huaxin/huaxin_trade_server.py
@@ -1085,6 +1085,10 @@
                    deal_list = [deal_order_system_id_infos[k] for k in deal_order_system_id_infos]
                    deal_list.sort(key=lambda x: x[1])
                    # TODO 测试
                    deal_list.clear()
                    fdata["available"] = fdata["total"]
                    fdata["sell_orders"] = [k[0] for k in deal_list]
                    break
            async_log_util.info(logger_trade_position_api_request, f"{fdata}")