Administrator
2023-09-04 526b8195c771059c6ab5b48064535fb404982cfc
L撤改为看笔数+纯买额
6个文件已修改
262 ■■■■ 已修改文件
constant.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 172 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -111,14 +111,10 @@
# L撤下单之后多久开始守护
L_CANCEL_START_TIME = 1
L_CANCEL_MAX_WATCH_COUNT = 10
L_CANCEL_MIN_WATCH_COUNT = 10
# 撤单比例
L_CANCEL_RATE = 0.6
# 最小金额
L_CANCEL_MIN_MONEY = 50
# 小金额订单数量
L_CANCEL_MIN_MONEY_COUNT = 2
# 大金额
L_CANCEL_BIG_MONEY = 100
huaxin_client/trade_client.py
@@ -598,28 +598,24 @@
                   pOrderField.OrderStatus, pOrderField.InsertTime))
            OrderIDManager.set_system_order_id(pOrderField.SecurityID, pOrderField.SInfo, pOrderField.OrderSysID)
            # 上报订单状态
            if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_AllTraded or pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_PartTradeCanceled:
                threading.Thread(target=lambda: self.__data_callback(TYPE_DEAL, 0, {"sinfo": pOrderField.SInfo,
                                                                                    "securityId": pOrderField.SecurityID,
                                                                                    "orderLocalId": pOrderField.OrderLocalID,
                                                                                    "orderStatus": pOrderField.OrderStatus,
                                                                                    "statusMsg": pOrderField.StatusMsg,
                                                                                    "orderSysID": pOrderField.OrderSysID,
                                                                                    "accountID": pOrderField.AccountID}),
                                 daemon=True).start()
            if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown:
                threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, 0, {"sinfo": pOrderField.SInfo,
                                                                                     "securityId": pOrderField.SecurityID,
                                                                                     "orderLocalId": pOrderField.OrderLocalID,
                                                                                     "orderStatus": pOrderField.OrderStatus,
                                                                                     "statusMsg": pOrderField.StatusMsg,
                                                                                     "orderSysID": pOrderField.OrderSysID,
                                                                                     "accountID": pOrderField.AccountID}),
                order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID,
                              "orderLocalID": pOrderField.OrderLocalID,
                              "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID,
                              "insertTime": pOrderField.InsertTime, "insertDate": pOrderField.InsertDate,
                              "acceptTime": pOrderField.AcceptTime, "cancelTime": pOrderField.CancelTime,
                              "limitPrice": pOrderField.LimitPrice, "accountID": pOrderField.AccountID,
                              "turnover": pOrderField.Turnover,
                              "volume": pOrderField.VolumeTotalOriginal,
                              "volumeTraded": pOrderField.VolumeTraded, "orderStatus": pOrderField.OrderStatus,
                              "orderSubmitStatus": pOrderField.OrderSubmitStatus, "statusMsg": pOrderField.StatusMsg}
                threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, 0, order_data),
                                 daemon=True).start()
        except Exception as e:
            logger_local_huaxin_trade_debug.exception(e)
            logger_local_huaxin_trade_debug.error("OnRtnOrder 出错")
        except:
            pass
            logger_local_huaxin_trade_debug.error("OnRtnOrder 出错")
    def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void":
        try:
l2/cancel_buy_strategy.py
@@ -1075,44 +1075,48 @@
    # 设置成交位置,成交位置变化之后相应的监听数据也会发生变化
    def set_trade_progress(self, code, index, total_data):
        # 求动态m值
        volume_rate = code_volumn_manager.get_volume_rate(code)
        volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
        m_val = L2PlaceOrderParamsManager(code, True, volume_rate, volume_rate_index, None).get_m_val()[0]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        m_val_num = int(m_val / (float(limit_up_price) * 100))
        threshold_num = m_val_num
        old_watch_indexes = self.__get_watch_indexes_cache(code)
        if self.__last_trade_progress_dict.get(code) == index and len(
                old_watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT:
        if self.__last_trade_progress_dict.get(code) == index:
            # 成交进度尚未发生变化且已经监听到了足够的数据
            return
            if len(old_watch_indexes) >= constant.L_CANCEL_MIN_WATCH_COUNT:
                if old_watch_indexes:
                    total_num = 0
                    for i in old_watch_indexes:
                        data = total_data[i]
                        val = data['val']
                        total_num += val['num'] * data['re']
                    if total_num >  threshold_num:
                        return
        self.__last_trade_progress_dict[code] = index
        watch_indexes = set()
        # 小金额
        watch_indexes_small_money = set()
        start_index = index + 1
        end_index = total_data[-1]["index"]
        total_num = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            money = val["num"] * float(val["price"])
            if money <= constant.L_CANCEL_MIN_MONEY * 100:
                continue
            is_small_money = money < constant.L_CANCEL_BIG_MONEY * 100
            if len(
                    watch_indexes_small_money) >= constant.L_CANCEL_MIN_MONEY_COUNT and is_small_money:
                # 小金额的个数已满
                continue
            # 判断当前订单是否已经撤单
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, i, total_data,
                                                                                                  local_today_num_operate_map.get(code))
            if left_count > 0:
                watch_indexes.add(i)
                if is_small_money:
                    # 添加小金额
                    watch_indexes_small_money.add(i)
                total_num += val['num'] * data['re']
                if len(watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT:
                if len(watch_indexes) >= constant.L_CANCEL_MIN_WATCH_COUNT and total_num > threshold_num:
                    break
        l2_log.l_cancel_debug(code, f"设置监听范围,成交进度-{index} , 数据范围:{start_index}-{end_index} 监听范围-{watch_indexes}")
        l2_log.l_cancel_debug(code, f"设置监听范围,成交进度-{index} , 数据范围:{start_index}-{end_index} 监听范围-{watch_indexes} 纯买手数:{total_num}/{threshold_num}")
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
trade/huaxin/huaxin_trade_data_update.py
@@ -92,15 +92,15 @@
                    hx_logger_trade_debug.info(f"获取交易数据成功:{type_}")
                except Exception as e1:
                    if str(e1).find("超时") >= 0:
                        # 读取结果超时需要重新请求
                        trade_data_request_queue.put_nowait({"type": type_})
                    # if str(e1).find("超时") >= 0:
                    #     # 读取结果超时需要重新请求
                    #     trade_data_request_queue.put_nowait({"type": type_})
                    raise e1
        except Exception as e:
            hx_logger_trade_debug.exception(e)
        finally:
            # 有0.1s的间隔
            time.sleep(0.1)
            time.sleep(0.01)
def __add_data(data):
trade/huaxin/trade_server.py
@@ -36,7 +36,7 @@
from log_module import async_log_util, log_export
from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
    hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue, \
    logger_l2_g_cancel, logger_debug, logger_system
    logger_l2_g_cancel, logger_debug, logger_system, logger_trade
from third_data import block_info, kpl_api, kpl_data_manager
from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
@@ -44,10 +44,11 @@
from third_data.kpl_util import KPLDataType
from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update
from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \
    huaxin_trade_record_manager
from trade.l2_trade_factor import L2PlaceOrderParamsManager
from trade.trade_manager import TradeTargetCodeModeManager
from utils import socket_util, data_export_util, middle_api_protocol, tool
from utils import socket_util, data_export_util, middle_api_protocol, tool, huaxin_util
trade_data_request_queue = queue.Queue()
@@ -278,7 +279,7 @@
        data = data_json["data"]
        request_id = data_json["request_id"]
        datas = data["data"]
        HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas,request_id = request_id)
        HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)
    @classmethod
    def l2_order(cls, code, _datas, timestamp):
@@ -294,69 +295,91 @@
        if datas:
            # 设置成交价
            current_price_process_manager.set_trade_price(code, datas[-1][1])
        try:
            buyno_map = l2_data_util.local_today_buyno_map.get(code)
            if not buyno_map:
                if trade_manager.CodesTradeStateManager().get_trade_state(code) != trade_manager.TRADE_STATE_NOT_TRADE:
                    l2_data_util.load_l2_data(code)
                    buyno_map = l2_data_util.local_today_buyno_map.get(code)
            async_log_util.info(hx_logger_l2_transaction, f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
            buy_progress_index = None
            for i in range(len(datas) - 1, -1, -1):
                d = datas[i]
                buy_no = f"{d[6]}"
                if buyno_map and buy_no in buyno_map:
                    async_log_util.info(hx_logger_l2_transaction, f"{code}成交进度:{buyno_map[buy_no]['index']}")
                    buy_progress_index = buyno_map[buy_no]["index"]
                    break
            total_datas = l2_data_util.local_today_datas.get(code)
            try:
                buyno_map = l2_data_util.local_today_buyno_map.get(code)
                if not buyno_map:
                    if trade_manager.CodesTradeStateManager().get_trade_state(
                            code) != trade_manager.TRADE_STATE_NOT_TRADE:
                        l2_data_util.load_l2_data(code)
                        buyno_map = l2_data_util.local_today_buyno_map.get(code)
                async_log_util.info(hx_logger_l2_transaction,
                                    f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
                buy_progress_index = None
                for i in range(len(datas) - 1, -1, -1):
                    d = datas[i]
                    buy_no = f"{d[6]}"
                    if buyno_map and buy_no in buyno_map:
                        async_log_util.info(hx_logger_l2_transaction, f"{code}成交进度:{buyno_map[buy_no]['index']}")
                        buy_progress_index = buyno_map[buy_no]["index"]
                        break
            # 获取执行位时间
            buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                code)
            if True:
                if buy_progress_index is not None:
                    cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                    total_datas = l2_data_util.local_today_datas.get(code)
                    num_operate_map = l2_data_util.local_today_num_operate_map.get(
                        code)
                    async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code, buy_progress_index)
                    buy_time = total_datas[buy_progress_index]["val"]["time"]
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if buy_exec_index:
                        need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                # 获取执行位时间
                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
                    code)
                if True:
                    if buy_progress_index is not None:
                        cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                        num_operate_map = l2_data_util.local_today_num_operate_map.get(
                            code)
                        async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                            buy_progress_index)
                        buy_time = total_datas[buy_progress_index]["val"]["time"]
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if buy_exec_index:
                            need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                          buy_progress_index,
                                                                                          buy_exec_index,
                                                                                          total_datas,
                                                                                          num_operate_map,
                                                                                          num * 100 * float(
                                                                                              limit_up_price),
                                                                                          limit_up_price)
                            if need_cancel:
                                L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                        f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                         buy_exec_index,
                                                                                         buy_progress_index,
                                                                                         total_datas,
                                                                                         num_operate_map)
                        f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                      buy_progress_index,
                                                                                      buy_exec_index,
                                                                                      total_datas,
                                                                                      num_operate_map,
                                                                                      num * 100 * float(
                                                                                          limit_up_price),
                                                                                      limit_up_price)
                        if need_cancel:
                            L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
                                                                                      total_datas)
                        f3 = dask.delayed(
                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                            code,
                            buy_progress_index,
                            total_datas,
                            num_operate_map)
                    f1 = dask.delayed(HourCancelBigNumComputer().set_trade_progress)(code, buy_time,
                                                                                     buy_exec_index,
                                                                                     buy_progress_index,
                                                                                     total_datas,
                                                                                     num_operate_map)
                    f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                  buy_progress_index,
                                                                                  total_datas)
                    f3 = dask.delayed(
                        deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                        code,
                        buy_progress_index,
                        total_datas,
                        num_operate_map)
                        f4 = dask.delayed(
                            SecondCancelBigNumComputer().set_transaction_index)(
                            code,
                            buy_progress_index)
                    f4 = dask.delayed(
                        SecondCancelBigNumComputer().set_transaction_index)(
                        code,
                        buy_progress_index)
                    dask.compute(f1, f2, f3, f4)
        except Exception as e:
            hx_logger_l2_transaction.exception(e)
                        dask.compute(f1, f2, f3, f4)
                    else:
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        # 涨停价成交
                        if limit_up_price and abs(float(limit_up_price) - float(datas[-1][1])) < 0.01:
                            # 买入订单号
                            # 获取开盘啦的涨停时间
                            kpl_total_datas = kpl_data_manager.KPLLimitUpDataRecordManager.total_datas
                            if total_datas and kpl_total_datas:
                                limit_up_time = None
                                for d in kpl_total_datas:
                                    if d[3] == code:
                                        limit_up_time = int(d[5])
                                if limit_up_time:
                                    if int(tool.to_time_str(limit_up_time).replace(":", "")) < int(
                                            total_datas[0]["val"]["time"].replace(":", "")):
                                        # 涨停时间比第一条数据时间还早
                                        l2_trade_util.forbidden_trade(code)
                                        logger_trade.warning(f"{code}添加到禁止买入,原因:涨停时间比第一条数据时间还早")
            except Exception as e:
                hx_logger_l2_transaction.exception(e)
    @classmethod
    def l2_market_data(cls, code, data):
@@ -782,12 +805,18 @@
    def OnTradeCallback(self, data_json):
        data_json = data_json["data"]
        type_ = data_json["type"]
        if type_ == 0:
            # 获取是否交易成功
            data =  data_json["data"]
            order_status = data["orderStatus"]
            huaxin_trade_record_manager.DelegateRecordManager.add([data])
            if huaxin_util.is_deal(order_status):
                l2_trade_util.forbidden_trade(data["securityID"])
                # 成交,更新成交列表与资金列表
                huaxin_trade_data_update.add_deal_list()
                huaxin_trade_data_update.add_money_list()
        # 记录交易反馈日志
        hx_logger_trade_callback.info(data_json)
        # 重新请求委托列表与资金
        huaxin_trade_data_update.add_delegate_list("交易回调")
        huaxin_trade_data_update.add_deal_list()
        huaxin_trade_data_update.add_money_list()
        async_log_util.info(hx_logger_trade_callback, data_json)
    def OnTradeResponse(self, data_json):
        hx_logger_trade_callback.info(f"response:request_id-{data_json['request_id']}")
@@ -839,8 +868,9 @@
if __name__ == "__main__":
    code = "000536"
    TradeServerProcessor.l2_transaction(code,
                                        [('000536', 2.08, 20000, 143436460, 2014, 31126359, 16718956, 31126358, '1')])
    print(int(tool.to_time_str(1684132912).replace(":", "")))
    # code = "000536"
    # TradeServerProcessor.l2_transaction(code,
    #                                     [('000536', 2.08, 20000, 143436460, 2014, 31126359, 16718956, 31126358, '1')])
    while True:
        time.sleep(10)
utils/tool.py
@@ -212,7 +212,7 @@
if __name__ == "__main__":
    print(to_time_str(1639532673))
    print(int(to_time_str(1684132912).replace(":", "")))
    # print(trade_time_sub("11:29:59", 5))
    # print(trade_time_sub("10:29:59", 10))
    # print(trade_time_add_second("13:29:59", 60))