Administrator
2023-08-29 770624f05f8dd627be15442f15c82033010a63cc
修改L2数据传输方式
4个文件已修改
1个文件已添加
226 ■■■■ 已修改文件
huaxin_client/l2_client.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_transaction_protocol.py 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_analyse.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 70 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -6,7 +6,7 @@
import threading
import time
from huaxin_client import command_manager
from huaxin_client import command_manager, l2_data_transaction_protocol
from huaxin_client import constant
from huaxin_client import l2_data_manager
import lev2mdapi
@@ -127,7 +127,7 @@
        for c in del_codes:
            l2_data_manager.target_codes.discard(c)
        for c in add_codes:
            l2_data_manager.run_upload_task(c)
            l2_data_manager.run_upload_task(c, pipe_strategy)
        self.__subscribe(add_codes)
        self.__unsubscribe(del_codes)
@@ -565,24 +565,31 @@
                data = json.loads(val)
                if data["data"]["type"] == "l2_cmd":
                    l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
                else:
                    l2_data_transaction_protocol.set_write_rece_data(data)
        except Exception as e:
            logging.exception(e)
def run(pipe_trade, pipe_strategy):
pipe_strategy = None
def run(pipe_trade, _pipe_strategy):
    logger_system.info("L2进程ID:{}", os.getpid())
    log.close_print()
    if pipe_trade is not None:
        t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True)
        t1.start()
    if pipe_strategy is not None:
        t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(pipe_strategy,), daemon=True)
    if _pipe_strategy is not None:
        global pipe_strategy
        pipe_strategy = _pipe_strategy
        t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True)
        t1.start()
    __init_l2()
    l2_data_manager.run_upload_common()
    l2_data_manager.run_upload_trading_canceled()
    l2_data_manager.run_log()
    # l2_data_manager.run_test()
    l2_data_manager.run_test()
    global l2CommandManager
    l2CommandManager = command_manager.L2CommandManager()
    l2CommandManager.init(MyL2ActionCallback())
huaxin_client/l2_data_manager.py
@@ -1,18 +1,20 @@
# -*- coding: utf-8 -*-
import contextlib
import json
import logging
import mmap
import queue
import random
import threading
import time
from huaxin_client import socket_util
from huaxin_client import socket_util, l2_data_transaction_protocol
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from log_module import log_export
from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \
    logger_local_huaxin_g_cancel
    logger_local_huaxin_g_cancel, hx_logger_contact_debug
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
@@ -159,31 +161,36 @@
# 循环读取上传数据
def __run_upload_order(code):
def __run_upload_order(code, pipe):
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    while True:
        # print("order task")
        try:
            if code not in target_codes:
                break
            order_detail_upload_active_time_dict[code] = time.time()
            udatas = []
            while not tmep_order_detail_queue_dict[code].empty():
                temp = tmep_order_detail_queue_dict[code].get()
                udatas.append(temp)
            if udatas:
                start_time = time.time()
                upload_data(code, "l2_order", udatas)
                use_time = int((time.time() - start_time) * 1000)
                if use_time > 20:
                    logger_local_huaxin_l2_upload.info(f"{code}-上传代码耗时:{use_time}ms")
            time.sleep(0.01)
    with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname=l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code), access=mmap.ACCESS_WRITE)) as _mmap:
        while True:
            # print("order task")
            try:
                if code not in target_codes:
                    break
                # 打开共享内存
                order_detail_upload_active_time_dict[code] = time.time()
                udatas = []
                while not tmep_order_detail_queue_dict[code].empty():
                    temp = tmep_order_detail_queue_dict[code].get()
                    udatas.append(temp)
                if udatas:
                    start_time = time.time()
                    # upload_data(code, "l2_order", udatas)
                    l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas)
                    use_time = int((time.time() - start_time) * 1000)
                    if use_time > 20:
                        logger_local_huaxin_l2_upload.info(f"{code}-上传代码耗时:{use_time}ms")
        except Exception as e:
            logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
            pass
                time.sleep(0.01)
            except Exception as e:
                hx_logger_contact_debug.exception(e)
                logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}")
                pass
def __run_upload_trans(code):
@@ -245,13 +252,13 @@
# 运行上传任务
def run_upload_task(code):
def run_upload_task(code, pipe_strategy):
    # 如果代码没有在目标代码中就不需要运行
    if code not in target_codes:
        return
    # 如果最近的活动时间小于2s就不需要运行
    if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[code] > 2:
        t = threading.Thread(target=lambda: __run_upload_order(code), daemon=True)
        t = threading.Thread(target=lambda: __run_upload_order(code, pipe_strategy), daemon=True)
        t.start()
    if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[code] > 2:
@@ -278,9 +285,13 @@
def __test():
    code = "002073"
    if code not in tmep_order_detail_queue_dict:
        tmep_order_detail_queue_dict[code] = queue.Queue()
    target_codes.add(code)
    while True:
        try:
            trading_order_canceled("603106", 6114878)
            tmep_order_detail_queue_dict[code].put_nowait(['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224])
            time.sleep(5)
        except:
            pass
huaxin_client/l2_data_transaction_protocol.py
New file
@@ -0,0 +1,70 @@
"""
L2数据传输协议
"""
import json
# 写入结果字典
import random
import time
from log_module.log import hx_logger_contact_debug
__write_result_dict = {}
def __create_request_id(code, type_):
    return f"{code}_{type_}_{time.time() * 1000}_{random.randint(0, 100000)}"
TYPE_L2_ORDER = "l2_order"
def __get_mmap_tag_name(code, type):
    return f"{type}_{code}"
def get_mmap_tag_name_for_l2_order(code):
    return __get_mmap_tag_name(code, TYPE_L2_ORDER)
# 发送l2订单明细
def send_l2_order_detail(pipe, _mmap, code, data_json):
    _mmap.seek(0)
    _mmap.write(json.dumps({"code": code, "data": data_json, "time": int(time.time() * 1000)}).encode("utf-8"))
    _mmap.flush()
    request_id = __create_request_id(code, "l2_order")
    hx_logger_contact_debug.info("L2客户端(code-{} request_id-{}):开始发送数据", code, request_id)
    pipe.send(json.dumps({"type": TYPE_L2_ORDER, "data": {"code": code}, "request_id": request_id}))
    count = 0
    try:
        while True:
            if request_id in __write_result_dict:
                hx_logger_contact_debug.info("L2客户端(code-{} request_id-{}):读取发送结果成功", code, request_id)
                break
            time.sleep(0.001)
            count += 1
            if count >= 1000:
                hx_logger_contact_debug.info("L2客户端(code-{} request_id-{}):读取发送结果超时", code, request_id)
                raise Exception("读取内容超时")
    finally:
        # 清空共享内存数据
        _mmap.seek(0)
        _mmap.write(b"")
        _mmap.flush()
        if request_id in __write_result_dict:
            __write_result_dict.pop(request_id)
# 设置接受数据
def set_write_rece_data(data_json):
    request_id = data_json.get("request_id")
    if request_id:
        hx_logger_contact_debug.info("L2客户端(code- request_id-{}):获取到反馈结果", request_id)
        __write_result_dict[request_id] = data_json
def set_read_l2_order(pipe, request_id):
    hx_logger_contact_debug.info("策略客户端(code- request_id-{}):开始反馈结果", request_id)
    pipe.send(json.dumps({"request_id": request_id, "data": {"type": "l2_order"}}))
    pass
log_module/log_analyse.py
@@ -73,7 +73,7 @@
                data = datas[2].split(" - ")[1].strip()
                contents = data.split("#")
                code = contents[0]
                use_time = int(contents[1].strip().split(":")[1])
                use_time = int(contents[1].strip().split(":")[1].split("-")[-2])
                l2_data = contents[2]
                l2_data = eval(l2_data)
                max_time_data = None
@@ -103,4 +103,4 @@
if __name__ == "__main__":
    analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-24.log")
    analyze_l2_data_transformation("D:\\logs\\huaxin_l2\\orderdetail.2023-08-29.log")
trade/huaxin/trade_server.py
@@ -1,8 +1,11 @@
import concurrent.futures
import contextlib
import datetime
import hashlib
import io
import json
import logging
import mmap
import queue
import random
import socket
@@ -21,7 +24,7 @@
from code_attribute import gpcode_manager, code_volumn_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l1_subscript_codes_manager
from huaxin_client import l1_subscript_codes_manager, l2_data_transaction_protocol
from huaxin_client.client_network import SendResponseSkManager
from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \
@@ -185,11 +188,7 @@
                            code = data["code"]
                            timestamp = data.get("time")
                            datas = data["data"]
                            now_timestamp = int(time.time() * 1000)
                            async_log_util.info(hx_logger_l2_orderdetail,
                                                f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{datas}")
                            l2_log.threadIds[code] = random.randint(0, 100000)
                            l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, datas)
                            TradeServerProcessor.l2_order(code, datas, timestamp)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
@@ -362,7 +361,9 @@
                            is_normal = l2_data_util.load_l2_data(code, load_latest=False)
                            volume_rate = code_volumn_manager.get_volume_rate(code)
                            volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
                            m_val = L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[0]
                            m_val = \
                                L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[
                                    0]
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            m_val_num = int(m_val / (float(limit_up_price) * 100))
@@ -407,6 +408,14 @@
        datas = data["data"]
        HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas)
    @classmethod
    def l2_order(cls, code, _datas, timestamp):
        now_timestamp = int(time.time() * 1000)
        async_log_util.info(hx_logger_l2_orderdetail,
                            f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}")
        l2_log.threadIds[code] = random.randint(0, 100000)
        l2_data_manager_new.L2TradeDataProcessor.process_huaxin(code, _datas)
def clear_invalid_client():
    while True:
@@ -430,6 +439,51 @@
                    type_ = val["type"]
                    if type_ == "set_target_codes":
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logging.exception(e)
l2_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
def __recv_pipe_l2(pipe_l2):
    def process_l2_order(code, request_id):
        # 读取共享内存中的数据
        with contextlib.closing(
                mmap.mmap(-1, 1000 * 100, tagname=l2_data_transaction_protocol.get_mmap_tag_name_for_l2_order(code),
                          access=mmap.ACCESS_READ)) as m:
            s = m.read(1000 * 100)
            s = s.decode('utf-8').replace('\x00', '')
            hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):读取到共享内存数据", code, request_id)
            if s:
                print(len(s), s)
                data = json.loads(s)
                code = data["code"]
                timestamp = data.get("time")
                datas = data["data"]
                try:
                    # TradeServerProcessor.l2_order(code, datas, timestamp)
                    pass
                finally:
                    hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):数据处理完毕", code, request_id)
                    l2_data_transaction_protocol.set_read_l2_order(pipe_l2, request_id)
    if pipe_l2 is not None:
        while True:
            try:
                val = pipe_l2.recv()
                if val:
                    val = json.loads(val)
                    print("收到来自L2的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    if type_ == l2_data_transaction_protocol.TYPE_L2_ORDER:
                        request_id = val["request_id"]
                        # 处理l2数据
                        code = val["data"]["code"]
                        hx_logger_contact_debug.info("策略客户端(code-{} request_id-{}):接受到来自L2客户端的数据", code, request_id)
                        l2_order_thread_pool.submit(process_l2_order, code, request_id)
            except Exception as e:
                logging.exception(e)
@@ -751,7 +805,7 @@
            logger_debug.error(e)
def run(pipe_trade, pipe_l1):
def run(pipe_trade, pipe_l1, pipe_l2):
    # 执行一些初始化数据
    block_info.init()