| | |
| | | import random |
| | | import threading |
| | | import time |
| | | from huaxin_client import socket_util, l2_data_transaction_protocol |
| | | from huaxin_client import socket_util, l2_data_transform_protocol |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import log_export |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ |
| | | logger_local_huaxin_g_cancel, hx_logger_contact_debug |
| | |
| | | |
| | | |
| | | # 循环读取上传数据 |
| | | def __run_upload_order(code, pipe): |
| | | def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code) |
| | | # with contextlib.closing( |
| | | # mmap.mmap(-1, 1000 * 100, tag, |
| | | # access=mmap.ACCESS_WRITE)) as _mmap: |
| | | if True: |
| | | while True: |
| | | # print("order task") |
| | |
| | | udatas.append(temp) |
| | | if udatas: |
| | | start_time = time.time() |
| | | upload_data(code, "l2_order", udatas) |
| | | # upload_data(code, "l2_order", udatas) |
| | | l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) |
| | | # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | if use_time > 20: |
| | |
| | | |
| | | |
| | | # 运行上传任务 |
| | | def run_upload_task(code, pipe_strategy): |
| | | def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | # 如果代码没有在目标代码中就不需要运行 |
| | | if code not in target_codes: |
| | | return |
| | | # 如果最近的活动时间小于2s就不需要运行 |
| | | if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: |
| | |
| | | t.start() |
| | | |
| | | |
| | | def __test(pipe_strategy): |
| | | def __test(_l2_data_callback): |
| | | code = "002073" |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | target_codes.add(code) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True) |
| | | t.start() |
| | | while True: |
| | | try: |
| | |
| | | pass |
| | | |
| | | |
| | | def run_test(pipe_strage): |
| | | t = threading.Thread(target=lambda: __test(pipe_strage), daemon=True) |
| | | def run_test(_l2_data_callback): |
| | | t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def test(): |
| | | # upload_data("000798", "trading_order_canceled", 30997688, new_sk=True) |
| | | code = "000333" |
| | | tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code) |
| | | with contextlib.closing( |
| | | mmap.mmap(-1, 1000 * 100, tag, |
| | | access=mmap.ACCESS_WRITE)) as _mmap: |
| | | pass |
| | | pass |