| | |
| | | __start_time = time.time() |
| | | try: |
| | | buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | # 暂时不需要重新加载获取 |
| | | # if not buyno_map: |
| | | # if trade_manager.CodesTradeStateManager().get_trade_state_cache( |
| | | # code) != trade_manager.TRADE_STATE_NOT_TRADE: |
| | | # l2_data_util.load_l2_data(code) |
| | | # buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | if buyno_map is None: |
| | | buyno_map = {} |
| | | |
| | |
| | | limit_up_price = round(float(limit_up_price), 2) |
| | | # 统计卖单 |
| | | big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas, limit_up_price) |
| | | need_cancel, cancel_msg = SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code, |
| | | big_sell_order_info, |
| | | order_begin_pos) |
| | | if need_cancel: |
| | | cancel_msg = f"S撤:{cancel_msg}" |
| | | if is_placed_order: |
| | | need_cancel, cancel_msg = SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code, |
| | | big_sell_order_info, |
| | | order_begin_pos) |
| | | if need_cancel: |
| | | cancel_msg = f"S撤:{cancel_msg}" |
| | | if not need_cancel: |
| | | need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code, big_sell_order_info, |
| | | order_begin_pos) |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, cancel_msg) |
| | | |
| | | if not need_cancel: |
| | | need_cancel, cancel_msg = FCancelBigNumComputer().need_cancel_for_p(code, big_sell_order_info, |
| | | order_begin_pos) |
| | | |
| | | if need_cancel: |
| | | L2TradeDataProcessor.cancel_buy(code, cancel_msg) |
| | | |
| | | GCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info) |
| | | GCancelBigNumComputer().set_big_sell_order_info(code, big_sell_order_info) |
| | | except Exception as e: |
| | | async_log_util.error(logger_debug, f"卖单统计异常:{big_sell_order_info}") |
| | | logger_debug.exception(e) |
| | |
| | | LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index, |
| | | total_datas) |
| | | |
| | | if order_begin_pos and order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1: |
| | | if is_placed_order: |
| | | cancel_result = FCancelBigNumComputer().need_cancel_for_deal_fast(code, buy_progress_index) |
| | | if cancel_result[0]: |
| | | L2TradeDataProcessor.cancel_buy(code, f"F撤:{cancel_result[1]}") |
| | |
| | | buy_progress_index) |
| | | else: |
| | | pass |
| | | if order_begin_pos and order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1: |
| | | if is_placed_order: |
| | | # 触发L撤上重新计算 |
| | | LCancelBigNumComputer().re_compute_l_up_watch_indexes(code, order_begin_pos.buy_single_index) |
| | | |
| | |
| | | hx_logger_l2_debug.exception(e) |
| | | finally: |
| | | use_time = int((time.time() - __start_time) * 1000) |
| | | if use_time > 10: |
| | | if use_time > 5: |
| | | async_log_util.info(hx_logger_l2_upload, f"{code}处理成交用时:{use_time}") |
| | |
| | | if sell_info_num == deal_num: |
| | | use_time = round((time.time() - start_time) * 1000, 3) |
| | | l2_log.info(code, logger_l2_trade_buy, |
| | | f"找到最近的被动涨停卖单数据:{sell_info['val']['orderNo']}, 计算耗时:{use_time}ms, 可以触发下单") |
| | | f"找到最近的被动涨停卖单数据:{sell_info['val']['orderNo']}, 成交数据:{data} 计算耗时:{use_time}ms, 可以触发下单") |
| | | # 成交完成 |
| | | L2TradeSingleDataManager.set_latest_sell_data(code, data) |
| | | l2_log.info(code, logger_l2_trade_buy, "被动卖数据处理完毕") |
| | |
| | | import ctypes |
| | | import mmap |
| | | import contextlib |
| | | import multiprocessing |
| | | import time |
| | | from multiprocessing import Process, Value, Array |
| | | |
| | | from huaxin_client import l2_data_manager |
| | | |
| | | |
| | | def run_process_1(pipe): |
| | | tag = 'l2-000333' |
| | | with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname=tag, access=mmap.ACCESS_WRITE)) as m: |
| | | for i in range(1, 100): |
| | | start = time.time() |
| | | m.seek(0) |
| | | m.write((f"msg {i} " * 1).encode("utf-8")) |
| | | m.flush() |
| | | print("耗时", time.time() - start) |
| | | time.sleep(1) |
| | | |
| | | |
| | | def run_process_2(pipe): |
| | | def run_process_1(arr:Array): |
| | | while True: |
| | | with contextlib.closing(mmap.mmap(-1, 1000 * 100, tagname='l2-000333', access=mmap.ACCESS_READ)) as m: |
| | | s = m.read(1000 * 100) |
| | | s = s.decode('utf-8').replace('\x00', '') |
| | | if s: |
| | | print(len(s), s) |
| | | time.sleep(1) |
| | | data_lenth = arr[0] |
| | | data = bytes(arr[1:data_lenth+1]).decode('utf-8') |
| | | if data: |
| | | print(eval(data)) |
| | | time.sleep(1) |
| | | |
| | | |
| | | def run_process_2(arr:Array): |
| | | str_=b"['000333',13.89]" |
| | | arr[0] = len(str_) |
| | | arr[1:len(str_)+1] = str_ |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | arr = Array(ctypes.c_byte, range(1024*1024)) |
| | | p1, p2 = multiprocessing.Pipe() |
| | | serverProcess = multiprocessing.Process(target=run_process_1, args=(p1,)) |
| | | jueJinProcess = multiprocessing.Process(target=run_process_2, args=(p2,)) |
| | | serverProcess = multiprocessing.Process(target=run_process_1, args=(arr,)) |
| | | jueJinProcess = multiprocessing.Process(target=run_process_2, args=(arr,)) |
| | | serverProcess.start() |
| | | jueJinProcess.start() |
| | | |