| | |
| | | cls.pipe_l2 = pipe_l2 |
| | | |
| | | @classmethod |
| | | def __process_command(cls, _type, client_id, result_json): |
| | | def process_command(cls, _type, client_id, result_json): |
| | | try: |
| | | data = result_json["data"] |
| | | print("接收内容", result_json) |
| | |
| | | elif _type == "test": |
| | | cls.action_callback.OnTest(client_id, request_id, data) |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.debug(f"__process_command出错:{result_json}") |
| | | logger_local_huaxin_trade_debug.debug(f"process_command出错:{result_json}") |
| | | logging.exception(e) |
| | | logging.error(result_json) |
| | | |
| | |
| | | if pipe_strategy is None: |
| | | return |
| | | # 本地命令接收 |
| | | while True: |
| | | try: |
| | | val = pipe_strategy.recv() |
| | | if val: |
| | | val = json.loads(val) |
| | | print("run_process_command", val) |
| | | _type = val["type"] |
| | | _data = val["data"] |
| | | # 查看是否是设置L2的代码 |
| | | if _type == CLIENT_TYPE_CMD_L2: |
| | | cls.pipe_l2.send( |
| | | json.dumps({"type": "set_l2_codes", "data": _data})) |
| | | else: |
| | | threading.Thread(target=lambda: cls.__process_command(_type, None, val), daemon=True).start() |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | logging.exception(e) |
| | | try: |
| | | while True: |
| | | try: |
| | | val = pipe_strategy.recv() |
| | | if val: |
| | | val = json.loads(val) |
| | | print("run_process_command", val) |
| | | _type = val["type"] |
| | | _data = val["data"] |
| | | # 查看是否是设置L2的代码 |
| | | if _type == CLIENT_TYPE_CMD_L2: |
| | | cls.pipe_l2.send( |
| | | json.dumps({"type": "set_l2_codes", "data": _data})) |
| | | else: |
| | | threading.Thread(target=lambda: cls.process_command(_type, None, val), daemon=True).start() |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | logging.exception(e) |
| | | except Exception as e: |
| | | logger_local_huaxin_trade_debug.exception(e) |
| | | |
| | | # 维护连接数的稳定 |
| | | def run(self, blocking=True): |
New file |
| | |
| | | import json |
| | | import socket |
| | | import socketserver |
| | | |
| | | from huaxin_client.command_manager import TradeCommandManager |
| | | from utils import socket_util |
| | | |
| | | |
| | | class MyTCPServer(socketserver.TCPServer): |
| | | def __init__(self, server_address, RequestHandlerClass): |
| | | socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) |
| | | |
| | | |
| | | # 如果使用异步的形式则需要再重写ThreadingTCPServer |
| | | class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass |
| | | |
| | | |
| | | class MyBaseRequestHandle(socketserver.BaseRequestHandler): |
| | | |
| | | def setup(self): |
| | | pass |
| | | |
| | | def handle(self): |
| | | host = self.client_address[0] |
| | | super().handle() |
| | | sk: socket.socket = self.request |
| | | while True: |
| | | try: |
| | | # data = sk.recv(1024*1024, socket.MSG_WAITALL) |
| | | data, header = socket_util.recv_data(sk) |
| | | if data: |
| | | # TODO 处理数据 |
| | | data_json = json.loads(data) |
| | | type_ = data_json['type'] |
| | | TradeCommandManager.process_command(type_, None, data_json) |
| | | finally: |
| | | pass |
New file |
| | |
| | | import mmap |
| | | import contextlib |
| | | import multiprocessing |
| | | import time |
| | | |
| | | |
| | | def run_process_1(pipe): |
| | | with contextlib.closing(mmap.mmap(-1, 1000*100, tagname='l2-000333', access=mmap.ACCESS_WRITE)) as m: |
| | | for i in range(1, 10001): |
| | | m.seek(0) |
| | | m.write(("msg " + str(i)).encode("utf-8")) |
| | | m.flush() |
| | | time.sleep(1) |
| | | |
| | | |
| | | def run_process_2(pipe): |
| | | while True: |
| | | with contextlib.closing(mmap.mmap(-1, 6, tagname='l2-000333', access=mmap.ACCESS_READ)) as m: |
| | | s = m.read(1024) |
| | | s = s.decode('utf-8').replace('\x00', '') |
| | | if s: |
| | | print(s) |
| | | time.sleep(1) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | p1, p2 = multiprocessing.Pipe() |
| | | serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,)) |
| | | jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,)) |
| | | serverProcess.start() |
| | | jueJinProcess.start() |
| | | |
| | | while True: |
| | | time.sleep(2) |
New file |
| | |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | |
| | | import concurrent.futures |
| | | import time |
| | | |
| | | from utils import tool |
| | | |
| | | |
| | | def task_function(task_param, j): |
| | | # 这里是任务的具体逻辑,task_param是任务的参数 |
| | | # 可以在这里进行耗时的计算、IO操作等 |
| | | |
| | | print(time.time(), task_param, j, id(threading.current_thread())) |
| | | |
| | | time.sleep(2) |
| | | |
| | | # 返回任务的结果 |
| | | return f"result:{task_param}" |
| | | |
| | | |
| | | def call_back(result): |
| | | print(time.time(), "call_back", id(threading.current_thread())) |
| | | |
| | | |
| | | def run_process_1(pipe): |
| | | thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) |
| | | |
| | | def process_data(val): |
| | | print("处理任务", val) |
| | | raise Exception("测试异常") |
| | | |
| | | def recv_data(): |
| | | while True: |
| | | try: |
| | | val = pipe.recv() |
| | | if val: |
| | | thread_pool.submit(process_data, val) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | threading.Thread(target=lambda: recv_data(), daemon=True).start() |
| | | while True: |
| | | time.sleep(2) |
| | | |
| | | |
| | | def run_process_2(pipe): |
| | | def send_data(): |
| | | while True: |
| | | time.sleep(2) |
| | | for i in range(1): |
| | | pipe.send("test: " + tool.get_now_time_str()) |
| | | |
| | | threading.Thread(target=lambda: send_data(), daemon=True).start() |
| | | threading.Thread(target=lambda: send_data(), daemon=True).start() |
| | | while True: |
| | | time.sleep(2) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | p1, p2 = multiprocessing.Pipe() |
| | | serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,)) |
| | | jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,)) |
| | | serverProcess.start() |
| | | jueJinProcess.start() |
| | | |
| | | while True: |
| | | time.sleep(2) |
| | |
| | | code = data["code"] |
| | | timestamp = data.get("time") |
| | | datas = data["data"] |
| | | now_timestamp = int(time.time() * 1000) |
| | | async_log_util.info(hx_logger_l2_orderdetail, |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}#{datas}") |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{datas}") |
| | | l2_log.threadIds[code] = random.randint(0, 100000) |
| | | l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, datas) |
| | | finally: |
| | |
| | | logger_l2_g_cancel.info(f"{code}-需要撤单:{msg}") |
| | | else: |
| | | logger_l2_g_cancel.info(f"{code}-不需要撤单:{msg}") |
| | | |
| | | |
| | | except Exception as e: |
| | | logger_l2_g_cancel.error(f"{code}-撤单异常:{str(e)}") |
| | | logger_l2_g_cancel.exception(e) |
| | | finally: |
| | | sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8'))) |
| | | |
| | |
| | | # 获取时间计算范围,返回s |
| | | def get_time_range(self): |
| | | # ts = [pow(3, 1), pow(3, 1), pow(3, 1), pow(3, 2), pow(3, 2), pow(3, 3), pow(3, 3), pow(3, 3)] |
| | | ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1)] |
| | | ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 2), pow(2, 2), pow(2, 2), pow(2, 2)] |
| | | # 暂时去除分的影响 |
| | | # if -1 < self.score_index < 3: |
| | | # return ts[0] |