Administrator
2023-08-21 08adeb202828d464e2b61e9f2e7140a1d9380a00
G撤
4个文件已修改
84 ■■■■ 已修改文件
huaxin_client/l2_client.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_data_manager.py 51 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -51,6 +51,8 @@
    # 代码的上次成交的订单唯一索引
    __last_transaction_keys_dict = {}
    # 买入的大单订单号
    def __init__(self, api):
        lev2mdapi.CTORATstpLev2MdSpi.__init__(self)
        self.__api = api
@@ -269,6 +271,10 @@
        min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
        # 输出逐笔成交数据
        if pTransaction['ExecType'] == b"2":
            transaction_big_order_no = l2_data_manager.get_latest_transaction_order_no(code)
            if transaction_big_order_no == pTransaction['BuyNo']:
                # 正在成交的订单撤单了
                l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo'])
            if min_volume is None:
                # 默认筛选50w
                if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
@@ -294,6 +300,7 @@
                item["Side"] = "2"
            # 深证撤单
            print("逐笔委托", item)
            l2_data_manager.add_l2_order_detail(item, True)
        else:
            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
@@ -336,6 +343,12 @@
            else:
                self.special_code_volume_for_order_dict.pop(code)
        if not can_listen:
            transaction_big_order_no = l2_data_manager.get_latest_transaction_order_no(code)
            if transaction_big_order_no == pOrderDetail['OrderNO'] and pOrderDetail['OrderStatus'] == b'D':
                # 正在成交的订单撤单了
                l2_data_manager.trading_order_canceled(code,pOrderDetail['OrderNO'])
            min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
            if min_volume is None:
                # 默认筛选50w
@@ -554,6 +567,7 @@
        t1.start()
    __init_l2()
    l2_data_manager.run_upload_common()
    l2_data_manager.run_upload_trading_canceled()
    global l2CommandManager
    l2CommandManager = command_manager.L2CommandManager()
    l2CommandManager.init(MyL2ActionCallback())
huaxin_client/l2_data_manager.py
@@ -10,7 +10,7 @@
from huaxin_client.client_network import SendResponseSkManager
# 活动时间
from log_module.log import logger_local_huaxin_l2_upload, logger_local_huaxin_l2_error
from log_module.log import logger_local_huaxin_l2_error
order_detail_upload_active_time_dict = {}
transaction_upload_active_time_dict = {}
@@ -19,6 +19,21 @@
tmep_transaction_queue_dict = {}
target_codes = set()
common_queue = queue.Queue()
trading_canceled_queue = queue.Queue()
# 买入订单号的字典
buy_order_nos_dict = {}
# 最近的大单成交单号
latest_big_order_transaction_order_dict = {}
# 获取最近的大单成交订单号
def get_latest_transaction_order_no(code):
    return latest_big_order_transaction_order_dict.get(code)
# 正在成交的订单撤单了
def trading_order_canceled(code_, order_no):
    trading_canceled_queue.put((code_, order_no))
# 添加委托详情
@@ -37,6 +52,11 @@
    #                 "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'],
    #                 "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'],
    #                 "OrderStatus": pOrderDetail['OrderStatus'].decode()}
    if data['Side'] == "1":
        # 记录所有买入的订单号
        if data['SecurityID'] not in buy_order_nos_dict:
            buy_order_nos_dict[data['SecurityID']] = set()
        buy_order_nos_dict[data['SecurityID']].add(data['OrderNO'])
    tmep_order_detail_queue_dict[code].put(
        (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
@@ -48,7 +68,6 @@
    code = data["SecurityID"]
    if code not in tmep_transaction_queue_dict:
        tmep_transaction_queue_dict[code] = queue.Queue()
    # 原来的格式
    #  item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
    #                     "TradeVolume": pTransaction['TradeVolume'],
@@ -56,6 +75,11 @@
    #                     "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'],
    #                     "ExecType": pTransaction['ExecType'].decode()}
    # 判断是否为大单成交
    code = data['SecurityID']
    if code in buy_order_nos_dict:
        if data['BuyNo'] in buy_order_nos_dict[code]:
            latest_big_order_transaction_order_dict[code] = data['BuyNo']
    tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'],
                                           data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'],
                                           data['SellNo'], data['ExecType']))
@@ -114,15 +138,16 @@
    # print("请求开始", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}")
    result = None
    start_time = time.time()
    logger_local_huaxin_l2_upload.info(f"{code} 上传数据开始-{_type}")
    # logger_local_huaxin_l2_upload.info(f"{code} 上传数据开始-{_type}")
    try:
        result = send_response(key, fdata.encode('utf-8'))
    except Exception as e:
        logging.exception(e)
    finally:
        pass
        # print("请求结束", uid, result)
        logger_local_huaxin_l2_upload.info(
            f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}")
        # logger_local_huaxin_l2_upload.info(
        #     f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}")
    # print("上传结果", result)
@@ -183,6 +208,17 @@
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
def __run_upload_trading_canceled():
    print("__run_upload_trading_canceled")
    while True:
        try:
            temp = trading_canceled_queue.get()
            upload_data(temp[0], "trading_order_canceled", temp[1])
        except Exception as e:
            logger_local_huaxin_l2_error.exception(e)
            logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}")
# 运行上传任务
def run_upload_task(code):
    # 如果代码没有在目标代码中就不需要运行
@@ -203,6 +239,11 @@
    t.start()
def run_upload_trading_canceled():
    t = threading.Thread(target=lambda: __run_upload_trading_canceled(), daemon=True)
    t.start()
if __name__ == "__main__":
    code = "603809"
    target_codes.add(code)
l2/l2_data_manager_new.py
@@ -1152,10 +1152,6 @@
    @classmethod
    def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                  threshold_money, buy_single_index, max_num_set):
        def get_threshold_count():
            count = threshold_count
            return count
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code)
@@ -1213,9 +1209,9 @@
                    # 只统计59万以上的金额
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                    if buy_nums >= threshold_num and buy_count >= threshold_count:
                        logger_l2_trade_buy.info(
                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{get_threshold_count()}, 大单数量:{len(max_buy_num_set)}")
                            f"{code}获取到买入执行点:{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count} 目标纯买单数:{threshold_count}, 大单数量:{len(max_buy_num_set)}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if _val["num"] >= bigger_num:
                    # 只统计59万以上的金额
@@ -1254,13 +1250,13 @@
            for i in max_buy_num_set:
                max_buy_num_set_count += total_datas[i]["re"]
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and max_buy_num_set_count >= big_num_count:
            if buy_nums >= threshold_num and buy_count >= threshold_count and trigger_buy and max_buy_num_set_count >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        l2_log.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}",
                         compute_start_index,
                         buy_nums,
                         threshold_num, buy_count, get_threshold_count(), max_buy_num_set_count, big_num_count)
                         threshold_num, buy_count, threshold_count, max_buy_num_set_count, big_num_count)
        return None, buy_nums, buy_count, None, max_buy_num_set
trade/huaxin/trade_server.py
@@ -332,6 +332,13 @@
                            logging.exception(e)
                        finally:
                            sk.sendall(socket_util.load_header(json.dumps({"code": 0}).encode(encoding='utf-8')))
                    elif data_json["type"] == "trading_order_canceled":
                        data = data_json["data"]
                        code = data["code"]
                        order_no = data["data"]
                        hx_logger_l2_upload.info(f"{code}-正在成交的订单撤单,order_no:{order_no}")
                        # 执行撤单
                        l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "G撤撤单", "G撤")
                else:
                    # 断开连接
                    break