| | |
| | | import queue |
| | | import threading |
| | | import time |
| | | import concurrent.futures |
| | | from typing import List |
| | | |
| | | from huaxin_client import command_manager, l2_data_transform_protocol |
| | |
| | | class MyL2ActionCallback(L2ActionCallback): |
| | | |
| | | def OnSetL2Position(self, codes_data): |
| | | print("L2订阅数量:", len(codes_data)) |
| | | logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data)) |
| | | huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data)) |
| | | try: |
| | | spi.set_codes_data(codes_data) |
| | | except Exception as e: |
| | |
| | | api.Init() |
| | | |
| | | |
| | | __l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) |
| | | |
| | | |
| | | def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | |
| | | code = data["data"]["code"] |
| | | spi.set_code_special_watch_volume(code, volume) |
| | | elif _type == "l2_cmd": |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | # 线程池 |
| | | __l2_cmd_thread_pool.submit( |
| | | lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)) |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | def run_process_1(pipe): |
| | | tag = 'l2-000333' |
| | | with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname=tag, access=mmap.ACCESS_WRITE)) as m: |
| | | for i in range(1, 10001): |
| | | for i in range(1, 100): |
| | | start = time.time() |
| | | m.seek(0) |
| | | m.write((f"msg {i} " * 10000).encode("utf-8")) |
| | | m.write((f"msg {i} " * 1).encode("utf-8")) |
| | | m.flush() |
| | | print("耗时", time.time() - start) |
| | | time.sleep(1) |
| | |
| | | print(len(s), s) |
| | | time.sleep(1) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | l2_data_manager.test() |
| | | pass |
| | | |
| | | |
| | | if __name__ == '__main__1': |
| | | p1, p2 = multiprocessing.Pipe() |
| | | serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,)) |
| | | jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,)) |
| | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | q = multiprocessing.Queue() |
| | | while True: |
| | | try: |
| | | val = q.get_nowait() |
| | | print(val) |
| | | except: |
| | | time.sleep(0.005) |
| | | q.put_nowait("123123") |
| | | print("出错") |
| | | |
| | | if __name__ == "__main__1": |
| | | q = multiprocessing.Queue() |
| | | p1 = multiprocessing.Process(target=run_process1, args=(q,)) |
| | | p2 = multiprocessing.Process(target=run_process2, args=(q,)) |
| | | p3 = multiprocessing.Process(target=run_process3, args=(q,)) |
| | |
| | | deal_list = [deal_order_system_id_infos[k] for k in deal_order_system_id_infos] |
| | | deal_list.sort(key=lambda x: x[1]) |
| | | |
| | | # TODO 测试 |
| | | deal_list.clear() |
| | | fdata["available"] = fdata["total"] |
| | | |
| | | fdata["sell_orders"] = [k[0] for k in deal_list] |
| | | break |
| | | async_log_util.info(logger_trade_position_api_request, f"{fdata}") |