L2净流入订阅修改/买流入前几修改/创业板大单数量要求修改/将市场强度纳入策略
11个文件已修改
266 ■■■■■ 已修改文件
huaxin_client/l2_client_test.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/custom_block_in_money_manager.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_util.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 109 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_test.py
@@ -64,33 +64,37 @@
        #         "ExecType": pTransaction['ExecType'].decode()}
        money = round(item[2] * item[3])
        volume = item[3]
        price = item[2]
        order_time = data["OrderTime"]
        if not self.__latest_buy_order:
            self.__latest_buy_order = [item[0], 0, 0, order_time]
            # (买单号, 量, 金额, 时间, 最新成交价格)
            self.__latest_buy_order = [item[0], 0, 0, order_time, price]
        if self.__latest_buy_order[0] == item[0]:
            self.__latest_buy_order[1] += volume
            self.__latest_buy_order[2] += money
            self.__latest_buy_order[3] = order_time
            self.__latest_buy_order[4] = price
        else:
            if self.__latest_buy_order[2] > 1e6:
                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], self.__latest_buy_order[3])
                d = (self.__latest_buy_order[0], self.__latest_buy_order[1], self.__latest_buy_order[2], self.__latest_buy_order[3], self.__latest_buy_order[4])
                self.__big_buy_orders.append(d)
                self.big_buy_order_queue.put_nowait(d)
            self.__latest_buy_order = [item[0], volume, money, order_time]
            self.__latest_buy_order = [item[0], volume, money, order_time, price]
        if not self.__latest_sell_order:
            self.__latest_sell_order = [item[1], 0, 0, order_time]
            self.__latest_sell_order = [item[1], 0, 0, order_time, price]
        if self.__latest_sell_order[0] == item[1]:
            self.__latest_sell_order[1] += volume
            self.__latest_sell_order[2] += money
            self.__latest_sell_order[3] = order_time
            self.__latest_sell_order[4] = price
        else:
            if self.__latest_sell_order[2] > 1e6:
                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], self.__latest_sell_order[3])
                d = (self.__latest_sell_order[0], self.__latest_sell_order[1], self.__latest_sell_order[2], self.__latest_sell_order[3], self.__latest_sell_order[4])
                self.__big_sell_orders.append(d)
                self.big_sell_order_queue.put_nowait(d)
            self.__latest_sell_order = [item[1], volume, money, order_time]
            self.__latest_sell_order = [item[1], volume, money, order_time, price]
# 买入的大单订单号
l2/l2_transaction_data_manager.py
@@ -84,6 +84,16 @@
            return 0
        return int(sum([x[2] for x in self.__total_buy_datas_dict[code]]))
    def get_total_buy_order_ids(self, code):
        """
        获取成交大单的订单号
        @param code:
        @return:
        """
        if code not in self.__total_buy_datas_dict:
            return set()
        return set([x[0] for x in self.__total_buy_datas_dict[code]])
    def get_total_buy_count(self, code):
        """
        获取大单成交的笔数
l2_test.py
@@ -28,6 +28,7 @@
        url = urlparse.urlparse(path)
        response_data = ""
        if url.path == "/get_block_codes_money":
            # 获取板块对应的代码与该代码的净流入
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            block = ps_dict.get('block')
            try:
@@ -35,6 +36,17 @@
                response_data = json.dumps({"code": 0, "data": fdatas})
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
        elif url.path == "/get_big_buy_order_list":
            # 获获取代码的大买单列表
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict.get('code')
            try:
                fdatas = CodeInMoneyManager().get_big_buy_money_list(code)
                if fdatas is None:
                    fdatas = []
                response_data = json.dumps({"code": 0, "data": fdatas})
            except Exception as e:
                response_data = json.dumps({"code": 1, "msg": str(e)})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
servers/data_server.py
@@ -1077,6 +1077,14 @@
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_market_jingxuan_out_blocks(result_list)
        elif type_ == KPLDataType.MARKET_STRONG.value:
            strong = data["data"]
            logger_debug.debug("开盘啦市场强度:{}", strong)
            # 保存市场热度
            if strong is not None:
                RealTimeKplMarketData.set_market_strong(strong)
        return json.dumps({"code": 0})
    def __send_response(self, data):
servers/huaxin_trade_server.py
@@ -11,6 +11,7 @@
import threading
import time
import requests
import schedule
import constant
@@ -52,7 +53,7 @@
from trade.huaxin.huaxin_trade_record_manager import DelegateRecordManager
from trade.order_statistic import DealAndDelegateWithBuyModeDataManager
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager, \
    EveryLimitupBigDealOrderManager, RadicalCodeMarketInfoManager
    EveryLimitupBigDealOrderManager, RadicalCodeMarketInfoManager, RadicalBigOrderThresholdManager
from trade.sell.sell_rule_manager import TradeRuleManager
from trade.trade_data_manager import RadicalBuyDealCodesManager
from trade.trade_manager import CodesTradeStateManager
@@ -405,6 +406,19 @@
            finally:
                cls.__updating_jx_blocks_codes.discard(code_)
        def pull_pre_deal_big_orders(code_):
            response_data = requests.get(
                "http://127.0.0.1:9005/get_big_buy_order_list?code=" + code_)
            r_str = response_data.text
            response_data = json.loads(r_str)
            if response_data["code"] == 0:
                datas = response_data["data"]
                logger_debug.info(f"拉取炸板前成交的大单:{code_}-{datas}")
                if datas:
                    RadicalBigOrderThresholdManager().set_big_deal_order_list(code_, datas, gpcode_manager.get_limit_up_price_as_num(code_))
        time_str = f"{data['dataTimeStamp']}"
        if time_str.startswith("9"):
            time_str = "0" + time_str
@@ -480,6 +494,10 @@
        if data["sell"] and len(data["sell"]) > 1 and data["sell"][1][1] > 0:
            # 出现卖二
            radical_buy_strategy.clear_data(code, force=True)
            if RadicalBigOrderThresholdManager().is_need_update(code):
                #  炸板更新数据
                cls.__sell_thread_pool.submit(
                    lambda: pull_pre_deal_big_orders(code))
        # 设置扫入数据
        RadicalCodeMarketInfoManager().set_market_info(code, time_str, round(float(limit_up_price), 2), data["buy"][0],
third_data/code_plate_key_manager.py
@@ -352,6 +352,24 @@
    __top_jx_out_blocks = []
    # 精选板块流入金额
    __jx_blocks_in_money_dict = {}
    # 市场行情热度,默认为60
    __market_strong = 60
    @classmethod
    def get_jingxuan_in_block_threshold_count(cls):
        """
        获取买精选流入前几
        @return:
        """
        score = 60
        if cls.__market_strong is not None:
            score = int(cls.__market_strong)
        score = score // 10
        if score >= 10:
            score = 9
        if score < 6:
            score = 5
        return score * 2 - 2
    @classmethod
    def set_market_jingxuan_blocks(cls, datas):
@@ -365,6 +383,8 @@
        THRESHOLD_MONEY = min(THRESHOLD_MONEY, 10000)
        THRESHOLD_MONEY = max(THRESHOLD_MONEY, 1000)
        THRESHOLD_MONEY = THRESHOLD_MONEY * 10000
        # 最大数量
        MAX_COUNT = cls.get_jingxuan_in_block_threshold_count()
        cls.top_in_list_cache = datas
        blocks = set()
@@ -394,7 +414,7 @@
            if has_code:
                count += 1
            if count >= 10:
            if count >= MAX_COUNT:
                break
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}")
@@ -430,6 +450,19 @@
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = list(blocks)
    @classmethod
    def set_market_strong(cls, strong):
        """
        设置市场行情强度
        @param strong:
        @return:
        """
        cls.__market_strong = strong
    @classmethod
    def get_market_strong(cls):
        return cls.__market_strong
    @classmethod
    def get_top_market_jingxuan_blocks(cls):
@@ -1146,4 +1179,5 @@
if __name__ == "__main__":
    pass
    RealTimeKplMarketData.set_market_strong(120)
    print(RealTimeKplMarketData.get_jingxuan_in_block_threshold_count())
third_data/custom_block_in_money_manager.py
@@ -16,7 +16,10 @@
@tool.singleton
class CodeInMoneyManager:
    def __init__(self):
        # 总的净流入
        self.__code_money_dict = {}
        # 净流入大单金额
        self.__code_big_buy_mmoney_list_dict = {}
        self.__load_data()
    def __load_data(self):
@@ -38,7 +41,13 @@
        if tool.is_ge_code(code) and item[2][2] < 299e4 and item[2][1] < 290000:
            return
        if item[1] == 0:
            # item[2]的数据结构:  (买单号, 量, 金额, 时间, 最新成交价格)
            self.__code_money_dict[code] += item[2][2]
            if code not in self.__code_big_buy_mmoney_list_dict:
                self.__code_big_buy_mmoney_list_dict[code] = []
            # 大买单信息:(金额,最后价格)
            if len(item[2]) >= 5:
                self.__code_big_buy_mmoney_list_dict[code].append((item[2][2], item[2][4], item[2][0]))
        else:
            self.__code_money_dict[code] -= item[2][2]
@@ -53,6 +62,14 @@
    def set_money(self, code, money):
        self.__code_money_dict[code] = money
    def get_big_buy_money_list(self, code):
        """
        获取代码的大买单列表
        @param code:
        @return:[(金额, 价格, 订单号)]
        """
        return self.__code_big_buy_mmoney_list_dict.get(code)
@tool.singleton
class BlockInMoneyRankManager:
third_data/kpl_api.py
@@ -286,6 +286,17 @@
    result = json.loads(result)
    return result
def getMarketStrong():
    """
    获取市场强度
    :return:
    """
    result = __base_request("https://apphwhq.longhuvip.com/w1/api/index.php",
                            f"a=DiskReview&apiv=w35&c=HomeDingPan&VerSion=5.13.0.0&PhoneOSNew=1&DeviceID=d6f20ce9-fa08-31c9-a493-536ebb8e9773&")
    data = result.text
    data = json.loads(data)
    return int(data["info"]["strong"])
if __name__ == "__main__":
    result = getZLJECodesRank(0)
third_data/kpl_data_manager.py
@@ -499,6 +499,12 @@
        #     # 大于20s就需要更新
        #     threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
        key = "market_strong"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:市场强度")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_market_strong, daemon=True).start()
    @classmethod
    def run_limit_up_task(cls):
        # 关闭log
@@ -559,6 +565,23 @@
                time.sleep(3)
    @classmethod
    def run_market_strong(cls):
        """
        精选流出
        @return:
        """
        while True:
            try:
                if tool.is_trade_time() or True:
                    strong_value = kpl_api.getMarketStrong()
                    cls.__upload_data("market_strong", strong_value)
            except:
                pass
            finally:
                cls.__latest_update_time_dict["market_strong"] = time.time()
                time.sleep(3)
    @classmethod
    # 运行拉取任务
    def run_pull_task(cls):
        def get_bidding_money():
@@ -600,6 +623,7 @@
                    time.sleep(3)
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        threading.Thread(target=cls.run_market_strong, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        # threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
third_data/kpl_util.py
@@ -66,6 +66,7 @@
    INDUSTRY_RANK = "industry_rank"
    JINGXUAN_RANK = "jingxuan_rank"
    JINGXUAN_RANK_OUT = "jingxuan_rank_out"
    MARKET_STRONG = "market_strong"
def __parseDaBanItemData(data, type):
    if type == DABAN_TYPE_BIDDING:
trade/buy_radical/radical_buy_data_manager.py
@@ -28,6 +28,102 @@
@tool.singleton
class RadicalBigOrderThresholdManager:
    """
    大单阈值管理
    """
    __db = 3
    __big_order_threshold = {}
    # 已经成交的累计大单金额:用于初次上板尚未订阅的情况
    __already_total_deal_big_order_money = {}
    __redis_manager = redis_manager.RedisManager(3)
    def __init__(self):
        self.__load_data()
    def __load_data(self):
        keys = redis_manager.RedisUtils.keys(self.__get_redis(), "radical_big_order_threshold-*")
        for k in keys:
            code = k.split("-")[1]
            val = redis_manager.RedisUtils.get(self.__get_redis(), k)
            val = int(val)
            self.__big_order_threshold[code] = val
    def __get_redis(self):
        return self.__redis_manager.getRedis()
    def set_big_order_threshold(self, code, threshold_money):
        """
        设置大单的阈值
        @param code:
        @param threshold_money:
        @return:
        """
        async_log_util.info(logger_l2_radical_buy_data, f"大单阈值数据:{code}-{threshold_money}")
        if threshold_money < 299e4:
            # 小于299w的不保存
            return
        self.__big_order_threshold[code] = threshold_money
        redis_manager.RedisUtils.setex_async(self.__db, f"radical_big_order_threshold-{code}", tool.get_expire(),
                                             f"{threshold_money}")
    def set_big_deal_order_list(self, code, money_list, limit_up_price):
        """
        设置大单成交数据
        @param code:
        @param money_list:[(金额,价格,订单号)]
        @param limit_up_price:
        @return:
        """
        # 涨停价成交的大单(策略进程尚未统计到的)
        total_deal_money = 0
        limit_up_price_money_list = []
        deal_order_ids = BigOrderDealManager().get_total_buy_order_ids(code)
        for info in money_list:
            if info[1] != limit_up_price:
                continue
            limit_up_price_money_list.append(info[0])
            if info[2] in deal_order_ids:
                continue
            total_deal_money += info[0]
        if limit_up_price_money_list:
            average_money = sum(limit_up_price_money_list) // len(limit_up_price_money_list)
            self.set_big_order_threshold(code, average_money)
        self.__already_total_deal_big_order_money[code] = total_deal_money
    def get_big_order_threshold(self, code):
        """
        获取大单阈值
        @param code:
        @return:
        """
        if code in self.__big_order_threshold:
            return self.__big_order_threshold.get(code)
        return 2990000
    def get_deal_big_order_money(self, code):
        """
        获取大单成交金额
        @param code:
        @return:
        """
        if code in self.__already_total_deal_big_order_money:
            return self.__already_total_deal_big_order_money[code]
        return 0
    def is_need_update(self, code):
        """
        是否有必要拉数据
        @param code:
        @return:
        """
        if not self.__big_order_threshold.get(code):
            # 没有大单成交
            return False
        return True
@tool.singleton
class RadicalCodeMarketInfoManager:
    """
    激进买的票行情数据管理
@@ -1007,10 +1103,16 @@
    @param code:
    @return:(缺少的资金,总成交金额, 要求的大单金额)
    """
    THRESHOLD_MONEY = RadicalBigOrderThresholdManager().get_big_order_threshold(code)
    TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT = round(
        code_volumn_manager.CodeVolumeManager().get_max_volume_in_5days(code) * limit_up_price / 1e8, 2)
    TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY = TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT * 299 * 10000
    if tool.is_ge_code(code):
        TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT *= 3.3
    TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY = TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT * THRESHOLD_MONEY
    deal_big_order_money = BigOrderDealManager().get_total_buy_money(code)
    # 添加订阅之前缺失的大单
    deal_big_order_money += RadicalBigOrderThresholdManager().get_deal_big_order_money(code)
    try:
        # 获取正在成交的订单
        dealing_order_info = HuaXinBuyOrderManager().get_dealing_order_info(code)
@@ -1049,7 +1151,10 @@
        current_threshold_count = 1
    if before_time:
        current_threshold_count = int(round(0.4 * money_y * 1.5))
    current_threshold_money = current_threshold_count * 299 * 10000
    THRESHOLD_MONEY = RadicalBigOrderThresholdManager().get_big_order_threshold(code)
    current_threshold_money = current_threshold_count * THRESHOLD_MONEY
    # ==========判断总大单成交================
    total_lack_money_info = get_total_deal_big_order_info(code, limit_up_price)