Administrator
2024-04-18 43606f4ba2813d173da7156439d25bab6c3ca690
代码优化
3个文件已修改
77 ■■■■■ 已修改文件
l2/l2_transaction_data_processor.py 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/place_order_single_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_mmap.py 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_processor.py
@@ -37,12 +37,6 @@
        __start_time = time.time()
        try:
            buyno_map = l2_data_util.local_today_buyno_map.get(code)
            # 暂时不需要重新加载获取
            # if not buyno_map:
            #     if trade_manager.CodesTradeStateManager().get_trade_state_cache(
            #             code) != trade_manager.TRADE_STATE_NOT_TRADE:
            #         l2_data_util.load_l2_data(code)
            #         buyno_map = l2_data_util.local_today_buyno_map.get(code)
            if buyno_map is None:
                buyno_map = {}
@@ -57,20 +51,19 @@
                    limit_up_price = round(float(limit_up_price), 2)
                # 统计卖单
                big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas, limit_up_price)
                need_cancel, cancel_msg = SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code,
                                                                                                     big_sell_order_info,
                                                                                                     order_begin_pos)
                if need_cancel:
                    cancel_msg = f"S撤:{cancel_msg}"
                if is_placed_order:
                    need_cancel, cancel_msg = SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code,
                                                                                                         big_sell_order_info,
                                                                                                         order_begin_pos)
                    if need_cancel:
                        cancel_msg = f"S撤:{cancel_msg}"
                    if not need_cancel:
                        need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code, big_sell_order_info,
                                                                                            order_begin_pos)
                    if need_cancel:
                        L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                if not need_cancel:
                    need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code, big_sell_order_info,
                                                                                        order_begin_pos)
                if need_cancel:
                    L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                GCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info)
                    GCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info)
            except Exception as e:
                async_log_util.error(logger_debug, f"卖单统计异常:{big_sell_order_info}")
                logger_debug.exception(e)
@@ -110,7 +103,7 @@
                LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index,
                                                           total_datas)
                if order_begin_pos and order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
                if is_placed_order:
                    cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, buy_progress_index)
                    if cancel_result[0]:
                        L2TradeDataProcessor.cancel_buy(code, f"F撤:{cancel_result[1]}")
@@ -121,7 +114,7 @@
                                                                     buy_progress_index)
            else:
                pass
            if order_begin_pos and order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
            if is_placed_order:
                # 触发L撤上重新计算
                LCancelBigNumComputer().re_compute_l_up_watch_indexes(code, order_begin_pos.buy_single_index)
@@ -130,5 +123,5 @@
            hx_logger_l2_debug.exception(e)
        finally:
            use_time = int((time.time() - __start_time) * 1000)
            if use_time > 10:
            if use_time > 5:
                async_log_util.info(hx_logger_l2_upload, f"{code}处理成交用时:{use_time}")
l2/place_order_single_data_manager.py
@@ -105,7 +105,7 @@
                if sell_info_num == deal_num:
                    use_time = round((time.time() - start_time) * 1000, 3)
                    l2_log.info(code, logger_l2_trade_buy,
                                f"找到最近的被动涨停卖单数据:{sell_info['val']['orderNo']}, 计算耗时:{use_time}ms, 可以触发下单")
                                f"找到最近的被动涨停卖单数据:{sell_info['val']['orderNo']}, 成交数据:{data} 计算耗时:{use_time}ms, 可以触发下单")
                    # 成交完成
                    L2TradeSingleDataManager.set_latest_sell_data(code, data)
                    l2_log.info(code, logger_l2_trade_buy, "被动卖数据处理完毕")
test/test_mmap.py
@@ -1,37 +1,33 @@
import ctypes
import mmap
import contextlib
import multiprocessing
import time
from multiprocessing import Process, Value, Array
from huaxin_client import l2_data_manager
def run_process_1(pipe):
    tag = 'l2-000333'
    with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname=tag, access=mmap.ACCESS_WRITE)) as m:
        for i in range(1, 100):
            start = time.time()
            m.seek(0)
            m.write((f"msg {i} " * 1).encode("utf-8"))
            m.flush()
            print("耗时", time.time() - start)
            time.sleep(1)
def run_process_2(pipe):
def run_process_1(arr:Array):
    while True:
        with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname='l2-000333', access=mmap.ACCESS_READ)) as m:
            s = m.read(1000 * 100)
            s = s.decode('utf-8').replace('\x00', '')
            if s:
                print(len(s), s)
            time.sleep(1)
        data_lenth = arr[0]
        data = bytes(arr[1:data_lenth+1]).decode('utf-8')
        if data:
            print(eval(data))
        time.sleep(1)
def run_process_2(arr:Array):
    str_=b"['000333',13.89]"
    arr[0] = len(str_)
    arr[1:len(str_)+1] = str_
if __name__ == '__main__':
    arr = Array(ctypes.c_byte, range(1024*1024))
    p1, p2 = multiprocessing.Pipe()
    serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,))
    jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,))
    serverProcess = multiprocessing.Process(target=run_process_1, args=(arr,))
    jueJinProcess = multiprocessing.Process(target=run_process_2, args=(arr,))
    serverProcess.start()
    jueJinProcess.start()