| | |
| | | import threading |
| | | import time |
| | | |
| | | from huaxin_client import command_manager, l2_data_transaction_protocol |
| | | from huaxin_client import command_manager, l2_data_transform_protocol |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | |
| | | for c in del_codes: |
| | | l2_data_manager.target_codes.discard(c) |
| | | for c in add_codes: |
| | | l2_data_manager.run_upload_task(c, pipe_strategy) |
| | | l2_data_manager.run_upload_task(c, l2_data_callback) |
| | | self.__subscribe(add_codes) |
| | | self.__unsubscribe(del_codes) |
| | | |
| | |
| | | data = json.loads(val) |
| | | if data["data"]["type"] == "l2_cmd": |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | else: |
| | | l2_data_transaction_protocol.set_write_rece_data(data) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def run(pipe_trade, _pipe_strategy): |
| | | def run(pipe_trade, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack)->None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | if pipe_trade is not None: |
| | |
| | | t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True) |
| | | t1.start() |
| | | __init_l2() |
| | | |
| | | global l2_data_callback |
| | | l2_data_callback = _l2_data_callback |
| | | |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_upload_trading_canceled() |
| | | l2_data_manager.run_log() |
| | | # l2_data_manager.run_test(_pipe_strategy) |
| | | l2_data_manager.run_test(l2_data_callback) |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | |
| | | import random |
| | | import threading |
| | | import time |
| | | from huaxin_client import socket_util, l2_data_transaction_protocol |
| | | from huaxin_client import socket_util, l2_data_transform_protocol |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import log_export |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ |
| | | logger_local_huaxin_g_cancel, hx_logger_contact_debug |
| | |
| | | |
| | | |
| | | # 循环读取上传数据 |
| | | def __run_upload_order(code, pipe): |
| | | def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code) |
| | | # with contextlib.closing( |
| | | # mmap.mmap(-1, 1000 * 100, tag, |
| | | # access=mmap.ACCESS_WRITE)) as _mmap: |
| | | if True: |
| | | while True: |
| | | # print("order task") |
| | |
| | | udatas.append(temp) |
| | | if udatas: |
| | | start_time = time.time() |
| | | upload_data(code, "l2_order", udatas) |
| | | # upload_data(code, "l2_order", udatas) |
| | | l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) |
| | | # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | if use_time > 20: |
| | |
| | | |
| | | |
| | | # 运行上传任务 |
| | | def run_upload_task(code, pipe_strategy): |
| | | def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | # 如果代码没有在目标代码中就不需要运行 |
| | | if code not in target_codes: |
| | | return |
| | | # 如果最近的活动时间小于2s就不需要运行 |
| | | if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2: |
| | |
| | | t.start() |
| | | |
| | | |
| | | def __test(pipe_strategy): |
| | | def __test(_l2_data_callback): |
| | | code = "002073" |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | target_codes.add(code) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True) |
| | | t.start() |
| | | while True: |
| | | try: |
| | |
| | | pass |
| | | |
| | | |
| | | def run_test(pipe_strage): |
| | | t = threading.Thread(target=lambda: __test(pipe_strage), daemon=True) |
| | | def run_test(_l2_data_callback): |
| | | t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def test(): |
| | | # upload_data("000798", "trading_order_canceled", 30997688, new_sk=True) |
| | | code = "000333" |
| | | tag = l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code) |
| | | with contextlib.closing( |
| | | mmap.mmap(-1, 1000 * 100, tag, |
| | | access=mmap.ACCESS_WRITE)) as _mmap: |
| | | pass |
| | | pass |
New file |
| | |
| | | """ |
| | | L2数据传输协议 |
| | | """ |
| | | |
| | | |
| | | class L2DataCallBack: |
| | | # L2委托明细 |
| | | def OnL2Order(self, code, datas, timestamp): |
| | | pass |
| | | |
| | | def OnL2Transaction(self, code, datas, timestamp): |
| | | pass |
| | | |
| | | def OnMarketData(self, code, datas, timestamp): |
| | | pass |
| | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | |
| | | |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2): |
| | | def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2,ptl2_l2,psl2_l2): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | |
| | | # redis后台服务 |
| | | t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, daemon=True) |
| | | t1.start() |
| | | |
| | | # 启动L2订阅服务 |
| | | t1 = threading.Thread(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2, trade_server.my_l2_data_callback), daemon=True) |
| | | t1.start() |
| | | |
| | | # 交易服务 |
| | |
| | | logger_system.info("主进程ID:{}", os.getpid()) |
| | | |
| | | tradeServerProcess = multiprocessing.Process(target=createTradeServer, |
| | | args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy)) |
| | | args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy,ptl2_l2, psl2_l2)) |
| | | tradeServerProcess.start() |
| | | |
| | | # 交易进程与L2进程 |
| | | |
| | | l2Process = multiprocessing.Process(target=huaxin_client.l2_client.run, args=(ptl2_l2, psl2_l2,)) |
| | | l2Process.start() |
| | | # |
| | | tradeProcess = multiprocessing.Process(target=huaxin_client.trade_client.run, args=(ptl2_trade, pst_trade,)) |
| | | tradeProcess.start() |
| | |
| | | # 将tradeServer作为主进程 |
| | | tradeServerProcess.join() |
| | | tradeProcess.join() |
| | | l2Process.join() |
| | |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | from huaxin_client import l2_data_manager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | |
| | | |
| | | def read(pipe): |
| | | while True: |
| | | val = pipe.recv() |
| | | if val: |
| | | print("read:", val) |
| | | |
| | | |
| | | def write(pipe): |
| | | while True: |
| | | pipe.send("test") |
| | | time.sleep(1) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | l2_data_manager.upload_data("000798", "trading_order_canceled", 30997688, new_sk=True) |
| | | p1, p2 = multiprocessing.Pipe() |
| | | threading.Thread(target=lambda: write(p1), daemon=True).start() |
| | | threading.Thread(target=lambda: read(p2), daemon=True).start() |
| | | input() |
| | |
| | | from code_attribute import gpcode_manager, code_volumn_manager |
| | | from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from huaxin_client import l1_subscript_codes_manager, l2_data_transaction_protocol |
| | | from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \ |
| | |
| | | logging.exception(e) |
| | | |
| | | |
| | | l2_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20) |
| | | |
| | | |
| | | def __recv_pipe_l2(pipe_l2): |
| | | def process_l2_order(code, request_id): |
| | | # 读取共享内存中的数据 |
| | | with contextlib.closing( |
| | | mmap.mmap(-1, 1000 * 100, l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code), |
| | | access=mmap.ACCESS_READ)) as m: |
| | | s = m.read(1000 * 100) |
| | | s = s.decode('utf-8').replace('\x00', '') |
| | | hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):读取到共享内存数据", code, request_id) |
| | | if s: |
| | | print(len(s), s) |
| | | data = json.loads(s) |
| | | code = data["code"] |
| | | timestamp = data.get("time") |
| | | datas = data["data"] |
| | | try: |
| | | # TradeServerProcessor.l2_order(code, datas, timestamp) |
| | | pass |
| | | finally: |
| | | hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):数据处理完毕", code, request_id) |
| | | l2_data_transaction_protocol.set_read_l2_order(pipe_l2, request_id) |
| | | |
| | | if pipe_l2 is not None: |
| | | while True: |
| | | try: |
| | | val = pipe_l2.recv() |
| | | if val: |
| | | val = json.loads(val) |
| | | print("收到来自L2的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | if type_ == l2_data_transaction_protocol.TYPE_L2_ORDER: |
| | | request_id = val["request_id"] |
| | | # 处理l2数据 |
| | | code = val["data"]["code"] |
| | | hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):接受到来自L2客户端的数据", code, request_id) |
| | | l2_order_thread_pool.submit(process_l2_order, code, request_id) |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback): |
| | | @classmethod |
| | | def __send_response(cls, data_bytes): |
| | |
| | | logger_debug.error(e) |
| | | |
| | | |
| | | class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack): |
| | | def OnL2Order(self, code, datas, timestamp): |
| | | TradeServerProcessor.l2_order(code, datas, timestamp) |
| | | |
| | | def OnL2Transaction(self, code, datas, timestamp): |
| | | pass |
| | | |
| | | def OnMarketData(self, code, datas, timestamp): |
| | | pass |
| | | |
| | | |
| | | # 回调 |
| | | my_l2_data_callback = MyL2DataCallback() |
| | | |
| | | |
| | | def run(pipe_trade, pipe_l1, pipe_l2): |
| | | # 执行一些初始化数据 |
| | | block_info.init() |