Administrator
2024-05-23 acabf336db0e36846aab914770bde7c71941a6ed
bug修复
1个文件已添加
2个文件已修改
199 ■■■■■ 已修改文件
huaxin_client/l2_client_for_cb.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/l2_huaxin_util.py 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_for_cb.py
@@ -21,8 +21,9 @@
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, \
    logger_local_huaxin_l2_transaction, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error, printlog
from utils import tool
    logger_local_huaxin_l2_transaction, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error, printlog, \
    logger_trade
from utils import tool, l2_huaxin_util
###B类###
Front_Address = "tcp://10.0.1.101:6900"
@@ -47,6 +48,8 @@
SZ_Bond_Securities = []
set_codes_data_queue = queue.Queue()
market_code_dict = {}
l2_transaction_price_queue = queue.Queue()
class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -234,15 +237,15 @@
                logger_local_huaxin_l2_transaction.info(f"{item}")
                l2_transaction_price_queue.put_nowait(
                    (pTransaction['SecurityID'], pTransaction['TradePrice'], pTransaction['TradeTime']))
                if pTransaction['TradePrice'] == self.limit_up_price_dict.get(pTransaction['SecurityID']):
                    # TODO 成交价是涨停价才输出
                    huaxin_l2_log.info(logger_local_huaxin_l2_transaction, f"{item}")
                    if not self.__is_limit_up_dict.get(pTransaction['SecurityID']):
                        huaxin_l2_log.info(logger_local_huaxin_l2_upload,
                                           f"{pTransaction['TradeTime']} {pTransaction['SecurityID']} 下单")
                        trade_call_back_queue.put_nowait((pTransaction['SecurityID'], pTransaction['TradeTime']))
                    self.__is_limit_up_dict[pTransaction['SecurityID']] = True
                else:
                    self.__is_limit_up_dict[pTransaction['SecurityID']] = False
@@ -281,7 +284,8 @@
            market_call_back_queue.put_nowait((pDepthMarketData['SecurityID'], pDepthMarketData['LastPrice'], rate,
                                               pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1'],
                                               pDepthMarketData['TotalVolumeTrade'], pDepthMarketData['TotalBidVolume'], pDepthMarketData['TotalAskVolume']))
                                               pDepthMarketData['TotalVolumeTrade'], pDepthMarketData['TotalBidVolume'],
                                               pDepthMarketData['TotalAskVolume']))
            code = pDepthMarketData['SecurityID']
            if code.find("00") == 0 or code.find("60") == 0:
                if rate >= 0.05:
@@ -434,6 +438,38 @@
        time.sleep(3)
__latest_transaction_price_dict = {}
def start_process_transactions():
    while True:
        try:
            # 代码, 成交价格, 成交时间
            result = l2_transaction_price_queue.get()
            code = result[0]
            if code not in __latest_transaction_price_dict:
                __latest_transaction_price_dict[code] = []
            if not __latest_transaction_price_dict[code] or __latest_transaction_price_dict[code][-1][0] != result[1]:
                __latest_transaction_price_dict[code].append((result[1], result[2]))
            # 删除1s之前的数据
            while True:
                end_time, start_time = __latest_transaction_price_dict[code][-1][1], \
                                       __latest_transaction_price_dict[code][0][1]
                if tool.trade_time_sub_with_ms(l2_huaxin_util.convert_time(end_time, with_ms=True),
                                               l2_huaxin_util.convert_time(start_time, with_ms=True)) <= 1000:
                    break
                else:
                    # 删除第一个元素
                    del __latest_transaction_price_dict[code][0]
            if __latest_transaction_price_dict[code][-1][0] - __latest_transaction_price_dict[code][0][0] >= 0.1:
                # 1s内升了10档
                async_log_util.info(logger_trade, f"1s内连升10档:{code} - {__latest_transaction_price_dict[code]}")
                trade_call_back_queue.put_nowait((code, __latest_transaction_price_dict[code][-1][1]))
        except:
            pass
def run(trade_call_back_queue_: multiprocessing.Queue, market_call_back_queue_: multiprocessing.Queue) -> None:
    """
    先订阅所有的L2market行情数据,筛选出比较大的涨幅(主板>5%,科创板/创业板>10%)的票,然后订阅其交成交L2数据
@@ -453,6 +489,8 @@
        threading.Thread(target=huaxin_l2_log.run_sync, daemon=True).start()
        threading.Thread(target=start_sub_high_price, daemon=True).start()
        threading.Thread(target=start_process_transactions, daemon=True).start()
        # TODO 测试
        # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start()
        global l2CommandManager
main.py
@@ -127,7 +127,7 @@
        for r in results:
            cb_code = r["securityID"]
            underlying_code = target_codes_manager.get_underlying_code(cb_code)
            cb_market = code_market_manager.get_market_info(code)
            cb_market = code_market_manager.get_market_info(cb_code)
            underlying_market = code_market_manager.get_market_info(underlying_code)
            if cb_market:
                r["marketInfo"] = {"code": cb_market.code, "name": r["securityName"],
@@ -180,6 +180,8 @@
    # print("下单:", huaxin_trade_api.order(1, "127075", 10, 140.5, blocking=True))
__deal_codes = set()
def read_l2_results(trade_call_back_queue):
    while True:
        try:
@@ -188,6 +190,9 @@
                async_log_util.info(logger_trade, f"正股涨停,准备买入可转债:{result}")
                # 获取可以买的代码
                code, trade_time = result[0], result[1]
                if code in __deal_codes:
                    async_log_util.info(logger_trade, f"已经下单过:{result}")
                    continue
                # 获取股票代码的可转债代码
                cb_code = target_codes_manager.get_cb_code(code)
                # 获取可转债的涨停价
@@ -196,6 +201,7 @@
                    async_log_util.info(logger_trade, f"准备下单:{cb_code}-{limit_up_price}")
                    # 买入20股
                    result = huaxin_trade_api.order(1, cb_code, 20, round(float(limit_up_price), 3), blocking=True)
                    __deal_codes.add(code)
                    async_log_util.info(logger_trade, f"可转债下单结果:{result}")
        except Exception as e:
            logger_debug.exception(e)
utils/l2_huaxin_util.py
New file
@@ -0,0 +1,141 @@
"""
华鑫LV2处理工具类
"""
# 处理逐笔委托
# item逐笔委托
# (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'],
# data['OrderTime'],data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'])
def convert_time(time_str, with_ms=False):
    time_str = str(time_str)
    if time_str.startswith("9"):
        time_str = f"0{time_str}"
    ms = "{:0<3}".format(time_str[6:])
    time_ = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
    if with_ms:
        return f"{time_}.{ms}"
    return time_
def __convert_order(item, limit_up_price):
    time_str = f"{item[5]}"
    if time_str.startswith("9"):
        time_str = f"0{time_str}"
    ms = "{:0<3}".format(time_str[6:])
    time_ = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
    price = item[1]
    if price <= 0:
        # 深证的买撤无价格数据,需要去查找价格数据,暂时设置为涨停价
        price = limit_up_price
    limitPrice = 1 if abs(limit_up_price - price) < 0.001 else 0
    operateType = 0
    if item[9] == 'D':
        if item[3] == '1':
            # 买撤
            operateType = 1
        else:
            # 卖撤
            operateType = 3
    else:
        if item[3] == '1':
            # 买
            operateType = 0
        else:
            # 卖
            operateType = 2
    return {"time": time_, "tms": ms, "price": price, "num": item[2] // 100, "limitPrice": limitPrice,
            "operateType": operateType, "cancelTime": 0, "cancelTimeUnit": 0, "orderNo": item[8],
            "mainSeq": item[6], "subSeq": item[7]}
def __format_l2_data(origin_datas, code, limit_up_price, filter_not_limit_up=True, filter_limit_up_sell=False):
    """
    处理l2数据
    @param origin_datas:原始数据
    @param code:代码
    @param limit_up_price:涨停价
    @param filter_not_limit_up:是否过滤掉非涨停数据
    @param filter_limit_up_sell:是否过滤涨停卖
    @return:格式化过后的数据
    """
    datas = []
    dataIndexs = {}
    same_time_num = {}
    for i in range(0, len(origin_datas)):
        item = origin_datas[i]
        # 解析数据
        time = item["time"]
        if time in same_time_num:
            same_time_num[time] = same_time_num[time] + 1
        else:
            same_time_num[time] = 1
        price = item["price"]
        num = item["num"]
        limitPrice = item["limitPrice"]
        # 涨停价
        if limit_up_price is not None:
            if abs(price - limit_up_price) < 0.001:
                limitPrice = 1
            else:
                limitPrice = 0
            item["limitPrice"] = "{}".format(limitPrice)
        operateType = item["operateType"]
        # 不需要非涨停买与买撤
        if filter_not_limit_up and int(item["limitPrice"]) != 1 and (
                int(operateType) == 0 or int(operateType) == 1) and num != 1:
            continue
        if filter_limit_up_sell and int(item["limitPrice"]) == 1 and int(operateType) == 2:
            # 过滤涨停卖
            continue
        key = "{}-{}-{}".format(code, item["mainSeq"], item["subSeq"])
        if key in dataIndexs:
            # 数据重复次数+1
            datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
        else:
            # 数据重复次数默认为1
            datas.append({"key": key, "val": item, "re": 1})
            dataIndexs.setdefault(key, len(datas) - 1)
    return datas
def get_format_l2_datas(code, origin_datas, limit_up_price, start_index, filter_limit_up_sell=False):
    """
    华鑫L2数据格式化
    @param code:
    @param origin_datas:
    @param limit_up_price:涨停价
    @param start_index:
    @param filter_limit_up_sell:是否过滤涨停卖
    @return:
    """
    # 先转变数据格式
    datas = [__convert_order(x, float(limit_up_price)) for x in origin_datas]
    # 在9:25之前不过滤非涨停金额
    # filter_not_limit_up = True
    # if int(datas[0]["time"][:5].replace(":", "")) <= 925:
    #     filter_not_limit_up = False
    # 不过滤非涨停金额
    filter_not_limit_up = False
    fdatas = __format_l2_data(datas, code, float(limit_up_price), filter_not_limit_up=filter_not_limit_up,
                              filter_limit_up_sell=filter_limit_up_sell)
    for i in range(0, len(fdatas)):
        fdatas[i]["index"] = start_index + i
    return fdatas
if __name__ == "__main__":
    ds = ["('605167', 10.08, 68500, '1', '0', 9303108, 2, 439438, 436472, 'D', 1695864632451)",
          "('603439', 17.97, 27800, '1', '0', 9304966, 6, 435127, 407524, 'D', 1695864649883)",
          "('002369', 0.0, 100800, '1', '2', 93051880, 2011, 1431910, 1160638, 'D', 1695864651875)"
          ]
    for d in ds:
        d = eval(d)
        print(__convert_order(d, 15.55))