Administrator
2024-10-31 b855b811e3753ffcb35f145c985bb32f4b550038
排1之前的数据准备
12个文件已修改
198 ■■■■ 已修改文件
api/outside_api_command_callback.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_order_processor.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/radical_buy_strategy.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -1341,7 +1341,22 @@
                self.send_response({"code": 0, "data": {}, "msg": f""},
                                   client_id,
                                   request_id)
            elif ctype == "get_buy_open_limit_up_codes":
                # 获取隔夜单排1的代码
                codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
                if not codes:
                    codes = set()
                self.send_response({"code": 0, "data": list(codes), "msg": f""},
                                   client_id,
                                   request_id)
            elif ctype == "set_buy_open_limit_up_codes":
                # 设置隔夜单排1的代码
                codes = data.get("codes")
                gpcode_manager.BuyOpenLimitUpCodeManager().set_codes(set(codes))
                self.send_response({"code": 0, "data": list(codes), "msg": f""},
                                   client_id,
                                   request_id)
        except Exception as e:
code_attribute/gpcode_manager.py
@@ -1,6 +1,7 @@
"""
股票代码管理器
"""
import copy
import json
import time
@@ -19,6 +20,47 @@
__db = 0
class BuyOpenLimitUpCodeManager:
    """
    排1代码管理
    """
    __db = 2
    __redisManager = redis_manager.RedisManager(2)
    __instance = None
    __codes_cache = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BuyOpenLimitUpCodeManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_data(cls):
        val = RedisUtils.get(cls.__get_redis(), "buy_open_limit_up_codes")
        if val:
            val = json.loads(val)
            cls.__codes_cache = set(val)
    def set_codes(self, codes):
        self.__codes_cache = copy.deepcopy(codes)
        RedisUtils.set_async(self.__db, "buy_open_limit_up_codes", json.dumps(list(codes)))
    def get_codes(self):
        return self.__codes_cache
    def is_in_cache(self, code):
        if not self.__codes_cache:
            return False
        if code in self.__codes_cache:
            return True
        return False
class CodesNameManager:
    __mysqldb = Mysqldb()
    __code_name_dict = {}
huaxin_client/l1_client.py
@@ -9,7 +9,7 @@
from huaxin_client import socket_util, l1_subscript_codes_manager
import xmdapi
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
from utils import tool as out_tool
################B类##################
@@ -129,9 +129,10 @@
        if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001:
            # 涨停板20%以上的打折
            rate = rate / 2
        # (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)
        level1_data_dict[pMarketDataField.SecurityID] = (
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1)
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2, pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
__latest_subscript_codes = set()
@@ -208,7 +209,16 @@
            time.sleep(3)
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w):
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None):
    """
    运行l1订阅任务
    @param queue_l1_w_strategy_r: L1方写,策略方读
    @param queue_l1_r_strategy_w: L1方读,策略方写
    @param fixed_codes: 固定要返回数据的代码
    @return:
    """
    if fixed_codes is None:
        fixed_codes = set()
    logger_local_huaxin_l1.info("运行l1订阅服务")
    codes_sh = []
    codes_sz = []
@@ -277,10 +287,14 @@
            threshold_rate = constant.L1_MIN_RATE_PRE if now_time_int < int(
                "094000") else constant.L1_MIN_RATE
            for d in list_:
                if d[2] >= threshold_rate:
                if d[2] >= threshold_rate or d[0] in fixed_codes:
                    # 涨幅小于5%的需要删除
                    flist.append(d)
            flist.sort(key=lambda x: x[2], reverse=True)
            # 将固定代码的排在最前
            for code in fixed_codes:
                if code in level1_data_dict:
                    flist.insert(0, level1_data_dict[code])
            # 正式交易之前先处理比较少的数据,不然处理时间久造成数据拥堵
            MAX_COUNT = 500
            if now_time_int < int("092600"):
@@ -295,6 +309,7 @@
                __upload_codes_info(queue_l1_w_strategy_r, datas)
        except Exception as e:
            logging.exception(e)
            logger_debug.exception(e)
        finally:
            time.sleep(3)
huaxin_client/trade_client.py
@@ -118,8 +118,19 @@
        cls.__session_id = session_id
        cls.__front_id = front_id
    # sinfo char(32)
    def buy(self, code, count, price, sinfo, order_ref, shadow_price=None):
    # sinfo
    def buy(self, code, count, price, sinfo, order_ref, shadow_price=None, cancel_shadow_order=True):
        """
        下单
        @param code:
        @param count:
        @param price:
        @param sinfo:char(32)
        @param order_ref:
        @param shadow_price: 影子单价格
        @param cancel_shadow_order: 是否撤影子单
        @return:
        """
        if not ENABLE_ORDER:
            return
        if sinfo in self.__buy_sinfo_set:
@@ -203,6 +214,7 @@
                shadow_cancel_order_ref = shadow_order_ref + 1
                # 深证停留50ms上证停留200ms
                delay_s = 0.05 if tool.is_sz_code(code) else 0.2
                if cancel_shadow_order:
                self.cancel_buy(code, f"s_c_{shadow_order_ref}", order_sys_id=None,
                                order_ref=shadow_order_ref,
                                order_action_ref=None, delay_s=delay_s)
@@ -901,6 +913,9 @@
            order_ref = data.get("order_ref")
            shadow_price = data.get("shadow_price")
            blocking = data.get("blocking")
            cancel_shadow = data.get("cancel_shadow")
            if cancel_shadow is None:
                cancel_shadow = True
            if direction == 1:
                async_log_util.info(logger_trade, f"{code}华鑫本地开始下单")
@@ -911,8 +926,7 @@
                    # threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref),
                    #                  daemon=True).start()
                    self.trade_thread_pool.submit(self.__tradeSimpleApi.buy, code, volume, price, sinfo, order_ref,
                                                  shadow_price)
                                                  shadow_price=shadow_price, cancel_shadow=cancel_shadow)
                    async_log_util.info(logger_trade, f"{code}华鑫本地下单线程结束")
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
l2/huaxin/huaxin_target_codes_manager.py
@@ -65,6 +65,9 @@
    def set_level_1_codes_datas(cls, datas, request_id=None):
        async_log_util.info(logger_l2_codes_subscript, f"({request_id})接受到L1的数据,开始预处理")
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        fixed_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
        if fixed_codes is None:
            fixed_codes = set()
        # 订阅的代码
        flist = []
        temp_volumns = []
@@ -92,8 +95,8 @@
            # 如果现价是0.0就取买1价
            price = d[1] if d[1] > 0 else d[5]
            # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
            # 剔除昨日涨停的票
            if code in yesterday_codes:
            # 剔除昨日涨停的票且不在固定代码中的票
            if code in yesterday_codes and code not in fixed_codes:
                continue
            # 剔除股价大于40块的票
            if price > constant.MAX_SUBSCRIPT_CODE_PRICE:
l2/l2_data_manager_new.py
@@ -723,11 +723,18 @@
        if now_time_int >= 145700:
            return False, True, f"14:57后不能交易", True
        # 二板以上的票不买
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        if yesterday_codes and code in yesterday_codes:
            return False, True, f"不买高位板", True
        if cls.__TradeTargetCodeModeManager.get_mode_cache() == TradeTargetCodeModeManager.MODE_ONLY_BUY_WANT_CODES:
            if not cls.__WantBuyCodesManager.is_in_cache(
                    code) and not gpcode_manager.GreenListCodeManager().is_in_cache(code):
                return False, True, f"只买想买:没在想买单和绿单", True
        return True, False, f"", False
    @classmethod
main.py
@@ -1,6 +1,7 @@
"""
GUI管理
"""
from code_attribute import gpcode_manager
from log_module import log
from log_module.log import logger_l2_trade, logger_system
import logging
@@ -113,7 +114,8 @@
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,))
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
                                                  gpcode_manager.BuyOpenLimitUpCodeManager().get_codes(),))
        l1Process.start()
        l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
servers/huaxin_trade_server.py
@@ -55,7 +55,7 @@
from trade.sell.sell_rule_manager import TradeRuleManager
from trade.trade_data_manager import RadicalBuyDealCodesManager
from trade.trade_manager import CodesTradeStateManager
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util
from utils import socket_util, middle_api_protocol, tool, huaxin_util, global_util, trade_util, init_data_util
trade_data_request_queue = queue.Queue()
@@ -354,6 +354,21 @@
            L1DataManager.set_l1_current_price(code, price)
            huaxin_l1_data_manager.set_buy1_data(code, d[5], d[6])
    @classmethod
    def __process_buy_open_limit_up_datas(cls, datas):
        """
        处理排1的数据
        @param datas: [(代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间)]
        @return:
        """
        # 9:25之后不再处理
        if tool.get_now_time_as_int() > int("092500"):
            return
        for d in datas:
            if gpcode_manager.BuyOpenLimitUpCodeManager().is_in_cache(d[0]):
                #09:19:50 到 09:20:00判断是否要撤单
                if int("09:19:50") <=tool.get_now_time_as_int()<int("09:20:00"):
                    async_log_util.info(logger_debug, f"排1撤单:{d}")
    # 获取L1现价
    @classmethod
    def get_l1_current_price(cls, code):
@@ -366,6 +381,7 @@
        request_id = data_json["request_id"]
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.__process_buy_open_limit_up_datas(datas)
        # 9:30之前采用非线程
        if int(tool.get_now_time_str().replace(":", "")) < int("093000"):
            HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)
@@ -896,13 +912,22 @@
# 预埋单
def __test_pre_place_order():
    logger_debug.info("进入预埋单测试")
    price = round(21.98*1.1, 2)
    code = "002253"
    shadow_price = tool.get_shadow_price(price)
    codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
    if codes:
        for code in codes:
            # 获取昨日收盘价格
            limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
            if not limit_up_price:
                init_data_util.re_set_price_pre(code)
                limit_up_price = gpcode_manager.get_limit_up_price_as_num(code)
            if not limit_up_price:
                logger_debug.info(f"没有获取到涨停价:{code}")
                continue
            shadow_price = tool.get_shadow_price(limit_up_price)
    if not constant.TRADE_ENABLE:
        return
    try:
        result = huaxin_trade_api.order(1, code, 100, price, blocking=True,
                result = huaxin_trade_api.order(1, code, 100, limit_up_price, blocking=True,
                                        shadow_price=shadow_price)
        async_log_util.info(logger_trade, f"{code}下单结束:{result}")
    except Exception as e:
trade/huaxin/huaxin_trade_api.py
@@ -80,9 +80,13 @@
                if huaxin_util.is_deal(order.orderStatus):
                    # 如果成交了需要刷新委托列表
                    huaxin_trade_data_update.add_delegate_list("卖成交")
            TradeResultProcessor.process_buy_order(order)
            need_cancel = TradeResultProcessor.process_buy_order(order)
            if need_cancel:
                # 需要撤买单
                threading.Thread(target=lambda:  cancel_order(2, order.code, order.orderSysID), daemon=True).start()
            need_watch_cancel = TradeResultProcessor.process_sell_order(order)
            if need_watch_cancel:
                # 需要撤卖单
                threading.Thread(target=lambda: __cancel_order(order.code, order.orderRef), daemon=True).start()
        finally:
            try:
@@ -492,6 +496,10 @@
    if not request_id:
        request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
    for i in range(1):
        cancel_shadow = True
        if int(tool.get_now_time_str().replace(":", "")) < int("091500"):
            # 预埋单不能撤影子单
            cancel_shadow = False
        request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                               {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
                                "direction": direction,
@@ -499,7 +507,11 @@
                                "order_ref": order_ref,
                                "volume": volume,
                                "price_type": price_type,
                                "price": price, "shadow_price": shadow_price, "sinfo": sinfo, "blocking": blocking},
                                "price": price,
                                "shadow_price": shadow_price,
                                "sinfo": sinfo,
                                "blocking": blocking,
                                "cancel_shadow": cancel_shadow},
                               request_id=request_id,
                               is_trade=True)
    try:
trade/huaxin/huaxin_trade_order_processor.py
@@ -117,6 +117,12 @@
    # 处理买单
    @classmethod
    def process_buy_order(cls, order: HuaxinOrderEntity):
        """
        处理买单
        @param order:
        @return: 是否需要撤单
        """
        # 处理下单成功
        def process_order_success(order_: HuaxinOrderEntity, delay_s=0.0):
            if delay_s > 0:
@@ -135,16 +141,22 @@
        # 只处理买入单
        if order.direction != str(huaxin_util.TORA_TSTP_D_Buy):
            return
            return False
        # 只处理正式订单,不处理影子订单
        if order.is_shadow_order:
            return
            # 9:15之前下的影子单
            if order.insertTime and int(order.insertTime.replace(":", "")) < int("091500"):
                # 是委托状态的影子单且是交易所已接受的状态
                if order.orderStatus == huaxin_util.TORA_TSTP_OST_Accepted:
                    # 需要撤单
                    return True
            return False
        # 同一订单号只有状态变化了才需要处理
        key = f"{order.insertDate}_{order.code}_{order.orderSysID}_{order.orderStatus}"
        if key in cls.__processed_keys:
            return
            return False
        try:
            async_log_util.info(hx_logger_trade_debug, f"处理华鑫订单:{key}")
            cls.__processed_keys.add(key)
@@ -169,6 +181,7 @@
                    CancelOrderManager().cancel_success(order.code, order.orderRef, order.orderSysID)
        except Exception as e:
            async_log_util.exception(hx_logger_trade_debug, e)
        return False
    # 返回是否要监控撤单
    @classmethod
trade/radical_buy_strategy.py
@@ -152,7 +152,8 @@
        THRESHOLD_RATE = radical_buy_data_manager.get_volume_rate_threshold(code, volume_rate)
        if rate >= THRESHOLD_RATE:
            # 成交的比例
            if total_sell > 1000 * 1e4:
            # if total_sell > 1000 * 1e4:
            if total_sell >= 0:
                return BUY_MODE_DIRECT, f"剩余涨停总卖额-{selling_num * price},原涨停总卖-{total_sell},已成交额-{__deal_active_buy_total_money[code]},成交比例-{rate}/{THRESHOLD_RATE}"
            else:
                return BUY_MODE_BY_L2, f"剩余涨停总卖额小于500w-{selling_num * price},原涨停总卖-{total_sell},已成交额-{__deal_active_buy_total_money[code]},成交比例-{rate}/{THRESHOLD_RATE} "
utils/tool.py
@@ -54,6 +54,11 @@
    return time_str
def get_now_time_as_int():
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    return int(time_str.replace(":", ""))
def get_now_time_with_ms_str():
    now = datetime.datetime.now()
    ms = int(now.microsecond / 1000)