Administrator
2023-11-17 061204832c6eb71d5583fa8ebdd41479100b6b54
L撤修改/已成交统计修改
9个文件已修改
261 ■■■■ 已修改文件
inited_data.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 68 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 131 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py
@@ -12,6 +12,7 @@
from code_attribute.code_nature_analyse import LatestMaxVolumeManager, HighIncreaseCodeManager, CodeNatureRecordManager
from db.redis_manager_delegate import RedisUtils
from l2.l2_sell_manager import L2MarketSellManager
from l2.l2_transaction_data_manager import HuaXinTransactionDatasProcessor
from ths import client_manager
import constant
from trade.deal_big_money_manager import DealOrderNoManager
@@ -294,6 +295,12 @@
            return target_volume[0], target_volume[0], target_volume[1].strftime("%Y-%m-%d")
# 关闭服务器之前需要同步的数据
def sync_data_before_close_server():
    HuaXinTransactionDatasProcessor().sync_dealing_data_to_db()
if __name__ == '__main__':
    # init_data()+
    init_data()
l2/cancel_buy_strategy.py
@@ -479,7 +479,10 @@
        if not start_compute_index:
            l2_log.h_cancel_debug(code, "尚未找到开始计算位置")
            return
        transaction_index = self.__transaction_progress_index_dict.get(code)
        if transaction_index:
            # 不能计算成交进度以前的数据
            start_compute_index = max(transaction_index + 1, start_compute_index)
        total_datas = local_today_datas.get(code)
        # -----------------计算H上-------------------
@@ -555,8 +558,30 @@
        start_compute_index = self.__start_compute_index_dict.get(code)
        if start_compute_index is None:
            return False
        # 成交位到开始计算位置没有买单之后
        real_place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code)
        total_datas = local_today_datas.get(code)
        if real_place_order_index and real_place_order_index > transaction_index:
            # 成交位置离我们真实下单的位置只有5笔没撤的大单就需要计算H撤的囊括范围了
            total_left_count = 0
            for index in range(transaction_index + 1, real_place_order_index):
                data = total_datas[index]
                val = data['val']
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if float(val['price']) * val['num'] < 5000:
                    continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, index,
                                                                                                         total_datas,
                                                                                                         local_today_canceled_buyno_map.get(
                                                                                                             code))
                if left_count > 0:
                    total_left_count += 1
                    if total_left_count > 5:
                        break
            if total_left_count <= 5:
                return True
        # 成交位到开始计算位置没有买单之后
        for index in range(transaction_index + 1, start_compute_index):
            data = total_datas[index]
            val = data['val']
@@ -615,8 +640,9 @@
                    not_cancel_indexes.append(j)
            if not_cancel_indexes:
                temp_count = len(not_cancel_indexes)
                # 取后1/3的数据
                temp_index = int(temp_count * 9 / 10)
                if temp_index + 1 >= temp_count:
                    temp_index = temp_count - 1
                self.__start_compute_index_dict[code] = not_cancel_indexes[temp_index]
        except Exception as e:
            async_log_util.exception(logger_l2_h_cancel, e)
@@ -1183,6 +1209,42 @@
        #     self.compute_watch_index(code, self.__last_trade_progress_dict.get(code),
        #                              self.__real_place_order_index_dict.get(code))
    # 已经成交的索引
    def add_deal_index(self, code, index, buy_single_index):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
            return
        if index not in watch_indexes:
            return
        if buy_single_index is None:
            return
        # 重新囊括1笔
        real_place_order_index = self.__real_place_order_index_dict.get(code)
        if real_place_order_index and real_place_order_index > index:
            total_datas = local_today_datas.get(code)
            min_num = int(5000 / (float(gpcode_manager.get_limit_up_price(code))))
            for j in range(real_place_order_index - 1, index, -1):
                data = total_datas[j]
                val = data['val']
                if data["index"] in watch_indexes:
                    continue
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if val["num"] < min_num:
                    continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code,
                                                                                                         j,
                                                                                                         total_datas,
                                                                                                         local_today_canceled_buyno_map.get(
                                                                                                             code))
                if left_count > 0:
                    watch_indexes.add(data["index"])
                    break
        # 移除当前index
        watch_indexes.discard(index)
        self.__set_watch_indexes(code, buy_single_index, watch_indexes)
    def __compute_need_cancel(self, code, buy_exec_index, start_index, end_index, total_data, is_first_code):
        watch_indexes = self.__get_watch_indexes_cache(code)
        if not watch_indexes:
l2/l2_transaction_data_manager.py
@@ -1,24 +1,101 @@
"""
L2成交数据处理器
"""
import json
import logging
import time
from code_attribute import gpcode_manager
from db import redis_manager
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_util, l2_data_manager, transaction_progress
from l2.cancel_buy_strategy import LCancelRateManager, LCancelBigNumComputer, \
    SecondCancelBigNumComputer, HourCancelBigNumComputer, FastCancelBigNumComputer
from l2.l2_data_util import L2DataUtil
from log_module import async_log_util
from log_module.log import logger_l2_trade_buy_queue, hx_logger_l2_upload, hx_logger_l2_debug
from log_module.log import logger_l2_trade_buy_queue, hx_logger_l2_upload, hx_logger_l2_debug, \
    hx_logger_l2_transaction_desc
from trade import current_price_process_manager, trade_manager, l2_trade_factor
from trade.deal_big_money_manager import DealOrderNoManager
from utils import tool
class HuaXinTransactionDatasProcessor:
    __db = 0
    __instance = None
    __redis_manager = redis_manager.RedisManager(0)
    __TradeBuyQueue = transaction_progress.TradeBuyQueue()
    # 正在成交的订单
    __dealing_order_info_dict = {}
    # 最近成交的订单{"code":(订单号,是否成交完成)}
    __latest_deal_order_info_dict = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(HuaXinTransactionDatasProcessor, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def process_huaxin_transaction_datas(cls, code, datas):
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "dealing_order_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__dealing_order_info_dict, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 将数据持久化到数据库
    def sync_dealing_data_to_db(self):
        for code in self.__dealing_order_info_dict:
            RedisUtils.setex(self.__get_redis(), f"dealing_order_info-{code}", tool.get_expire(),
                             json.dumps(self.__dealing_order_info_dict[code]))
    # 统计成交的情况
    def __statistic_deal_desc(self, code, data, total_buy_num):
        if code not in self.__dealing_order_info_dict:
            # 数据格式[订单号,总手数,开始成交时间,结束成交时间, 总买]
            self.__dealing_order_info_dict[code] = [data[6], 0, data[3], data[3], total_buy_num]
        if self.__dealing_order_info_dict[code][0] == data[6]:
            # 成交同一个订单号
            self.__dealing_order_info_dict[code][1] += data[2]
            self.__dealing_order_info_dict[code][3] = data[3]
        else:
            # 保存上一条数据
            async_log_util.info(hx_logger_l2_transaction_desc, f"{code}#{self.__dealing_order_info_dict[code]}")
            # 设置最近成交完成的一条数据
            deal_info = (
                self.__dealing_order_info_dict[code][0],
                self.__dealing_order_info_dict[code][4] == self.__dealing_order_info_dict[code][1])
            self.__latest_deal_order_info_dict[code] = deal_info
            # 初始化本条数据
            self.__dealing_order_info_dict[code] = [data[6], data[2], data[3], data[3], total_buy_num]
            return deal_info
        return None
    # 计算成交进度
    def __compute_latest_trade_progress(self, code, buyno_map, datas):
        buy_progress_index = None
        for i in range(len(datas) - 1, -1, -1):
            d = datas[i]
            buy_no = f"{d[6]}"
            if buyno_map and buy_no in buyno_map:
                async_log_util.info(hx_logger_l2_debug, f"{code}成交进度:{buyno_map[buy_no]['index']}")
                buy_progress_index = buyno_map[buy_no]["index"]
                break
        return buy_progress_index
    def process_huaxin_transaction_datas(self, code, datas):
        # 设置成交价
        current_price_process_manager.set_trade_price(code, datas[-1][1])
        total_datas = l2_data_util.local_today_datas.get(code)
@@ -30,37 +107,28 @@
                        code) != trade_manager.TRADE_STATE_NOT_TRADE:
                    l2_data_util.load_l2_data(code)
                    buyno_map = l2_data_util.local_today_buyno_map.get(code)
            # async_log_util.debug(hx_logger_l2_transaction,
            #                     f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}")
            if buyno_map is None:
                buyno_map = {}
            buy_progress_index = None
            for i in range(len(datas) - 1, -1, -1):
                d = datas[i]
                buy_no = f"{d[6]}"
                if buyno_map and buy_no in buyno_map:
                    async_log_util.info(hx_logger_l2_debug, f"{code}成交进度:{buyno_map[buy_no]['index']}")
                    buy_progress_index = buyno_map[buy_no]["index"]
                    break
            # ------统计保存成交大单订单号------
            origin_ordernos = set()
            for d in datas:
                buy_no = f"{d[6]}"
                origin_ordernos.add(buy_no)
            # ---------------------处理成交大单----------------------
            order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
            # 计算已经成交的大单
            big_money_count = 0
            for buy_no in origin_ordernos:
                # 查询是否为大单
                if buy_no in buyno_map:
                    data = buyno_map[buy_no]
            for d in datas:
                data = buyno_map.get(f"{d[6]}")
                buy_num = None
                if data:
                    buy_num = data["val"]["num"] * 100
                deal_info = self.__statistic_deal_desc(code, d, buy_num)
                if deal_info and deal_info[1]:
                    data = buyno_map.get(f"{deal_info[0]}")
                    print("已经成交索引:", data["index"])
                    val = data["val"]
                    if not L2DataUtil.is_limit_up_price_buy(val):
                        continue
                    if l2_data_util.is_big_money(val):
                        # 涨停买的大单
                    if l2_data_util.is_big_money(val) and L2DataUtil.is_limit_up_price_buy(val):
                        big_money_count += 1
                        DealOrderNoManager().add_orderno(code, buy_no)
                        DealOrderNoManager().add_orderno(code, deal_info[0])
                    # L后是否有成交,如果有成交就需要除去当前笔数,然后重新囊括一笔
                    LCancelBigNumComputer().add_deal_index(code, data["index"], order_begin_pos.buy_single_index)
            if big_money_count > 0:
                # 统计大单/m值成交比
                total_deal_nums = DealOrderNoManager().get_deal_nums(code, buyno_map)
@@ -69,13 +137,15 @@
                if limit_up_price:
                    rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2)
                    LCancelRateManager().set_big_num_deal_rate(code, rate)
                    # 获取执行位时间
            order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
            buy_progress_index = self.__compute_latest_trade_progress(code, buyno_map, datas)
            if buy_progress_index is not None:
                cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index, total_datas)
                self.__TradeBuyQueue.set_traded_index(code, buy_progress_index, total_datas)
                async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
                                    buy_progress_index)
            if buy_progress_index is not None:
                LCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_single_index, buy_progress_index,
                                                           total_datas)
                FastCancelBigNumComputer().set_trade_progress(code, buy_progress_index)
@@ -92,6 +162,7 @@
                LCancelBigNumComputer().re_compute_l_up_watch_indexes(code, order_begin_pos.buy_single_index)
        except Exception as e:
            logging.exception(e)
            hx_logger_l2_debug.exception(e)
        finally:
            use_time = int((time.time() - __start_time) * 1000)
l2/transaction_progress.py
@@ -191,9 +191,10 @@
        if not last_info or last_info[0] != index:
            if last_info and total_datas:
                val = total_datas[last_info[0]]['val']
                rate = round(val["num"] * float(val["price"]) * 100 / (time.time() - last_info[1]))
                # 成交速率
                self.trade_speed_cache[code] = rate
                if time.time() - last_info[1] > 0:
                    rate = round(val["num"] * float(val["price"]) * 100 / (time.time() - last_info[1]))
                    # 成交速率
                    self.trade_speed_cache[code] = rate
            self.latest_buy_progress_index_cache[code] = (index, time.time())
        self.__save_buy_progress_index(code, index, False)
log_module/log.py
@@ -202,6 +202,9 @@
        logger.add(self.get_hx_path("l2", "transaction"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_transaction",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "transaction_desc"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_transaction_desc",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_hx_path("l2", "orderdetail"),
                   filter=lambda record: record["extra"].get("name") == "hx_l2_orderdetail",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -362,6 +365,7 @@
# -------------------------------华鑫日志---------------------------------
hx_logger_l2_orderdetail = __mylogger.get_logger("hx_l2_orderdetail")
hx_logger_l2_transaction = __mylogger.get_logger("hx_l2_transaction")
hx_logger_l2_transaction_desc = __mylogger.get_logger("hx_l2_transaction_desc")
hx_logger_l2_market_data = __mylogger.get_logger("hx_l2_market_data")
hx_logger_l2_upload = __mylogger.get_logger("hx_l2_upload")
hx_logger_l2_debug = __mylogger.get_logger("hx_l2_debug")
log_module/log_export.py
@@ -414,9 +414,32 @@
    return fdatas
# 读取系统日志
def load_huaxin_transaction_map():
    path = f"{constant.get_path_prefix()}/logs/huaxin/l2/transaction.{tool.get_now_date_str()}.log"
    fdatas = {}
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    try:
                        data = line.split(" - ")[1].strip()
                        if data.startswith("["):
                            data = data[data.find("]") + 1:].strip()
                        code = data.split("#")[0]
                        l2_data = eval( data.split("#")[1])
                        if code not in fdatas:
                            fdatas[code]=[]
                        fdatas[code].append(l2_data)
                    except:
                        pass
    return fdatas
if __name__ == '__main__':
    datas = get_trade_progress("000026")
    print(datas)
    fdatas = load_huaxin_transaction_map()
    print(len(fdatas))
    # print(get_h_cancel_compute_info("603912"))
    # logger_l2_h_cancel.info("test")
test/l2_trade_test.py
@@ -18,7 +18,7 @@
from trade.huaxin import huaxin_trade_api
from utils import tool
from db import redis_manager_delegate as redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress, l2_data_manager_new
from l2 import l2_log, l2_data_manager, transaction_progress, l2_data_manager_new, l2_transaction_data_manager
from l2.transaction_progress import TradeBuyQueue
from third_data import kpl_util, kpl_data_manager, block_info
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodePlateKeyBuyManager, KPLCodeJXBlockManager
@@ -85,6 +85,7 @@
                except Exception as e:
                    pass
    @unittest.skip("跳过此单元测试")
    def test_trade(self):
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()
        code = "000026"
@@ -227,6 +228,16 @@
                                                     yesterday_codes,
                                                     block_info.get_before_blocks_dict())
    def test_transaction(self):
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()
        code = "003019"
        l2.l2_data_util.load_l2_data(code)
        # 读取成交数据
        transaction_map = log_export.load_huaxin_transaction_map()
        fdatas = transaction_map.get(code)
        for d in fdatas:
            l2_transaction_data_manager.HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas(code, d)
# class TestTrade(unittest.TestCase):
#     processor = L2TradeDataProcessor()
trade/huaxin/huaxin_trade_server.py
@@ -309,7 +309,7 @@
    def l2_transaction(cls, code, datas):
        async_log_util.info(hx_logger_l2_transaction, f"{code}#{datas}")
        if datas:
            HuaXinTransactionDatasProcessor.process_huaxin_transaction_datas(code, datas)
            HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas(code, datas)
    @classmethod
    def l2_market_data(cls, code, data):
trade/l2_trade_factor.py
@@ -547,7 +547,7 @@
    # print(L2TradeFactorUtil.factors_to_string("003004"))
    # for i in range(2, 150):
    print(19, L2TradeFactorUtil.get_base_safe_val(100000000 * 19))
    print(22, L2TradeFactorUtil.get_base_safe_val(100000000 * 22))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("13:48:00"))