Administrator
2024-05-23 acabf336db0e36846aab914770bde7c71941a6ed
huaxin_client/l2_client_for_cb.py
@@ -21,8 +21,9 @@
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \
    logger_local_huaxin_l2_transaction, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error, printlog
from utils import tool
    logger_local_huaxin_l2_transaction, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error, printlog, \
    logger_trade
from utils import tool, l2_huaxin_util
###B类###
Front_Address = "tcp://10.0.1.101:6900"
@@ -47,6 +48,8 @@
SZ_Bond_Securities = []
set_codes_data_queue = queue.Queue()
market_code_dict = {}
l2_transaction_price_queue = queue.Queue()
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -76,7 +79,7 @@
            market_type = tool.get_market_type(code)
            if market_type == tool.MARKET_TYPE_SZSE:
                szse_codes.append(code.encode())
            elif  market_type == tool.MARKET_TYPE_SSE:
            elif market_type == tool.MARKET_TYPE_SSE:
                sse_codes.append(code.encode())
        return sse_codes, szse_codes
@@ -234,15 +237,15 @@
                logger_local_huaxin_l2_transaction.info(f"{item}")
                l2_transaction_price_queue.put_nowait(
                    (pTransaction['SecurityID'], pTransaction['TradePrice'], pTransaction['TradeTime']))
                if pTransaction['TradePrice'] == self.limit_up_price_dict.get(pTransaction['SecurityID']):
                    # TODO 成交价是涨停价才输出
                    huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}")
                    if not self.__is_limit_up_dict.get(pTransaction['SecurityID']):
                        huaxin_l2_log.info(logger_local_huaxin_l2_upload,
                                           f"{pTransaction['TradeTime']} {pTransaction['SecurityID']} 下单")
                        trade_call_back_queue.put_nowait((pTransaction['SecurityID'], pTransaction['TradeTime']))
                    self.__is_limit_up_dict[pTransaction['SecurityID']] = True
                else:
                    self.__is_limit_up_dict[pTransaction['SecurityID']] = False
@@ -281,7 +284,8 @@
            market_call_back_queue.put_nowait((pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], rate,
                                               pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'],
                                               pDepthMarketData['TotalVolumeTrade'], pDepthMarketData['TotalBidVolume'], pDepthMarketData['TotalAskVolume']))
                                               pDepthMarketData['TotalVolumeTrade'], pDepthMarketData['TotalBidVolume'],
                                               pDepthMarketData['TotalAskVolume']))
            code = pDepthMarketData['SecurityID']
            if code.find("00") == 0 or code.find("60") == 0:
                if rate >= 0.05:
@@ -434,6 +438,38 @@
        time.sleep(3)
__latest_transaction_price_dict = {}
def start_process_transactions():
    while True:
        try:
            # 代码, 成交价格, 成交时间
            result = l2_transaction_price_queue.get()
            code = result[0]
            if code not in __latest_transaction_price_dict:
                __latest_transaction_price_dict[code] = []
            if not __latest_transaction_price_dict[code] or __latest_transaction_price_dict[code][-1][0] != result[1]:
                __latest_transaction_price_dict[code].append((result[1], result[2]))
            # 删除1s之前的数据
            while True:
                end_time, start_time = __latest_transaction_price_dict[code][-1][1], \
                                       __latest_transaction_price_dict[code][0][1]
                if tool.trade_time_sub_with_ms(l2_huaxin_util.convert_time(end_time, with_ms=True),
                                               l2_huaxin_util.convert_time(start_time, with_ms=True)) <= 1000:
                    break
                else:
                    # 删除第一个元素
                    del __latest_transaction_price_dict[code][0]
            if __latest_transaction_price_dict[code][-1][0] - __latest_transaction_price_dict[code][0][0] >= 0.1:
                # 1s内升了10档
                async_log_util.info(logger_trade, f"1s内连升10档:{code} - {__latest_transaction_price_dict[code]}")
                trade_call_back_queue.put_nowait((code, __latest_transaction_price_dict[code][-1][1]))
        except:
            pass
def run(trade_call_back_queue_: multiprocessing.Queue, market_call_back_queue_: multiprocessing.Queue) -> None:
    """
    先订阅所有的L2market行情数据,筛选出比较大的涨幅(主板>5%,科创板/创业板>10%)的票,然后订阅其交成交L2数据
@@ -453,6 +489,8 @@
        threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start()
        threading.Thread(target=start_sub_high_price, daemon=True).start()
        threading.Thread(target=start_process_transactions, daemon=True).start()
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager