Administrator
2024-11-19 cf32945520de905d86f87c04cad426a7402d12d8
删除原有的资金净流入统计方式
10个文件已修改
331 ■■■■■ 已修改文件
huaxin_client/l1_client.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 65 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/custom_block_in_money_manager.py 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/block_special_codes_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py
@@ -11,8 +11,6 @@
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
from third_data import custom_block_in_money_manager
from third_data.custom_block_in_money_manager import BlockInMoneyRankManager, CodeInMoneyManager
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
from utils import tool as out_tool
################B类##################
@@ -137,11 +135,6 @@
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
            pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
        try:
            custom_block_in_money_manager.CodeInMoneyManager.set_market_info(pMarketDataField.SecurityID, lastPrice,
                                                                             close_price, pMarketDataField.Turnover)
        except:
            pass
__latest_subscript_codes = set()
@@ -217,37 +210,13 @@
        finally:
            time.sleep(3)
def __test_block_in_money():
    codes = BlockInMoneyRankManager().get_codes()
    logger_debug.info("获取到测试净流入代码数量:{}", len(codes))
    page_size = 200
    total_page = len(codes) // page_size + 1
    for i in range(0, total_page):
        temp_codes = codes[i * page_size: (i + 1) * page_size]
        print(temp_codes)
        # 获取最近的信息
        latest_infos = HistoryKDatasUtils.get_gp_latest_info(temp_codes, "sec_id,pre_close")
        pre_close_dict = {x["sec_id"]: x["pre_close"] for x in latest_infos}
        current_infos = JueJinApi.get_gp_current_info(temp_codes, "symbol,price,cum_amount")
        current_infos = {x["symbol"].split(".")[1]: (x["price"], x["cum_amount"]) for x in
                         current_infos}
        for code in current_infos:
            if code not in pre_close_dict:
                continue
            CodeInMoneyManager.set_market_info(code, current_infos[code][0], pre_close_dict[code],
                                               current_infos[code][1])
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, queue_custom_block_in_money, fixed_codes=None):
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None):
    """
    运行l1订阅任务
    @param queue_l1_w_strategy_r: L1方写,策略方读
    @param queue_l1_r_strategy_w: L1方读,策略方写
    @param queue_custom_block_in_money: 板块流入流出计算结果
    @param fixed_codes: 固定要返回数据的代码
    @return:
    """
@@ -306,19 +275,10 @@
    threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start()
    # 测试
    # __test_block_in_money()
    # 等待程序结束
    while True:
        # print("数量", len(level1_data_dict))
        try:
            # 计算流入流出并上传
            custom_block_in_money_manager.BlockInMoneyRankManager().compute()
            val = (custom_block_in_money_manager.BlockInMoneyRankManager().get_in_list(),  custom_block_in_money_manager.BlockInMoneyRankManager().get_out_list())
            val = json.dumps(val)
            queue_custom_block_in_money.put_nowait(val)
            if len(level1_data_dict) < 1:
                continue
            # 根据涨幅排序
l2/l2_data_manager_new.py
@@ -1174,10 +1174,11 @@
                                                                                                          min_money=min_money)
                    if left_count < 1:
                        return False, False, f"没有已挂或者成交的大单", False
            big_deal_order_info = cls.__is_big_order_deal_enough(code)
            if not big_deal_order_info[0]:
                return False, False, big_deal_order_info[1], False
            if not cls.__WantBuyCodesManager.is_in_cache(code):
                # 想买单不需要大单约束
                big_deal_order_info = cls.__is_big_order_deal_enough(code)
                if not big_deal_order_info[0]:
                    return False, False, big_deal_order_info[1], False
            place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code)
            # ------------------第一和第二次下单都必须要有至少一笔未成交的大单--------------------------
l2_test.py
@@ -1,9 +1,14 @@
import json
import logging
import multiprocessing
import threading
import time
import psutil
import requests
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_upload
from third_data.custom_block_in_money_manager import CodeInMoneyManager, BlockInMoneyRankManager
def run():
@@ -27,12 +32,53 @@
    while True:
        try:
            data = big_order_queue.get()
            CodeInMoneyManager().add_data(data)
            logger_local_huaxin_l2_upload.info(f"{data}")
        except:
            pass
def __compute_and_upload():
    def __upload_data(type_, data_):
        root_data = {
            "type": type_,
            "data": data_
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
    def __upload_codes_in_money():
        """
        上传所有代码的流入
        @param type_:
        @param data_:
        @return:
        """
        root_data = {
            "data": json.dumps(CodeInMoneyManager().get_code_money_dict())
        }
        requests.post("http://127.0.0.1:9004/upload_codes_in_money", json.dumps(root_data))
    while True:
        try:
            BlockInMoneyRankManager().compute()
            in_list = BlockInMoneyRankManager().get_in_list()
            out_list = BlockInMoneyRankManager().get_out_list()
            fins = [(0, x[0], 0, x[1]) for x in in_list[:20]]
            fouts = [(0, x[0], 0, x[1]) for x in out_list[:20]]
            # (代码,名称,强度,主力净额)
            # 上传
            __upload_data("jingxuan_rank", json.dumps(fins))
            __upload_data("jingxuan_rank_out", json.dumps(fouts))
            __upload_codes_in_money()
        except Exception as e:
            logging.exception(e)
        finally:
            time.sleep(3)
if __name__ == "__main__":
    threading.Thread(target=__compute_and_upload, daemon=True).start()
    run()
    while True:
        time.sleep(2)
main.py
@@ -29,7 +29,7 @@
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_,
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr, queue_custom_block_in_money):
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr):
    """
    策略进程
@@ -41,7 +41,6 @@
    @param queue_l1_trade_r_strategy_w_:
    @param queue_l1_trade_w_strategy_r_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @param queue_custom_block_in_money: 接收板块流入流出
    @return:
    """
    logger_system.info("策略进程ID:{}", os.getpid())
@@ -67,7 +66,7 @@
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr, queue_custom_block_in_money)
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr)
# 主服务
@@ -100,7 +99,6 @@
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue()
        queue_l1_r_strategy_w = multiprocessing.Queue()
        queue_custom_block_in_money = multiprocessing.Queue()  # 接收板块流入流出
        # l1交易
        queue_l1_trade_w_strategy_r = multiprocessing.Queue()
        queue_l1_trade_r_strategy_w = multiprocessing.Queue()
@@ -122,7 +120,6 @@
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
                                                  queue_custom_block_in_money,
                                                  gpcode_manager.BuyOpenLimitUpCodeManager().get_codes(),))
        l1Process.start()
@@ -154,10 +151,7 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr),
                          queue_custom_block_in_money)
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr))
        # 将tradeServer作为主进程
        l1Process.join()
servers/data_server.py
@@ -9,6 +9,7 @@
from code_attribute.gpcode_manager import BlackListCodeManager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api
from third_data.custom_block_in_money_manager import CodeInMoneyManager
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant
from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager
from third_data.third_blocks_manager import BlockMapManager
@@ -782,6 +783,18 @@
            params = self.__parse_request()
            result_str = self.__process_kpl_data(params)
            self.__send_response(result_str)
        if url.path == "/upload_codes_in_money":
            # 接收代码净流入金额
            params = self.__parse_request()
            d = params["data"]
            d = json.loads(d)
            try:
                for code in d:
                    CodeInMoneyManager().set_money(code, d[code])
            except Exception as e:
                logging.exception(e)
            result_str =json.dumps({"code": 0})
            self.__send_response(result_str)
    def __process_kpl_data(self, data_origin):
        def do_limit_up(result_list_):
@@ -920,13 +933,15 @@
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_top_5_industry(result_list)
        elif type_ == KPLDataType.JINGXUAN_RANK.value:
            result_list = kpl_util.parseMarketJingXuan(data["data"])
            # result_list = kpl_util.parseMarketJingXuan(data["data"])
            result_list = json.loads(data["data"])
            # 保存精选数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
                RealTimeKplMarketData.set_market_jingxuan_blocks(result_list)
        elif type_ == KPLDataType.JINGXUAN_RANK_OUT.value:
            result_list = kpl_util.parseMarketJingXuan(data["data"])
            # result_list = kpl_util.parseMarketJingXuan(data["data"])
            result_list = json.loads(data["data"])
            # 保存精选数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
@@ -970,48 +985,4 @@
if __name__ == "__main__":
    # 统计目前为止的代码涨停数量(分涨停原因)
    currents = LimitUpDataConstant.current_limit_up_datas
    records = LimitUpDataConstant.history_limit_up_datas
    if not currents:
        currents = KPLDataManager.get_data(KPLDataType.LIMIT_UP)
    # 获取历史涨停
    if not records:
        KPLLimitUpDataRecordManager.load_total_datas()
        records = KPLLimitUpDataRecordManager.total_datas
    records_map = {x[3]: x for x in records}
    current_codes = [d[0] for d in currents]
    record_codes = [d[3] for d in records]
    # 计算涨停时间排序
    record_reason_dict = {}
    current_reason_dict = {}
    for _code in record_codes:
        blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(_code)
        if not blocks:
            blocks = set()
        for b in blocks:
            if b not in record_reason_dict:
                record_reason_dict[b] = []
            record_reason_dict[b].append(_code)
    for _code in current_codes:
        blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(_code)
        if not blocks:
            blocks = set()
        for b in blocks:
            if b not in current_reason_dict:
                current_reason_dict[b] = []
            current_reason_dict[b].append(_code)
    # (板块名称,涨停代码数量,炸板数量,涨停时间)
    limit_up_reason_statistic_info = [(k, len(record_reason_dict[k]), len(record_reason_dict[k]) - len(
        current_reason_dict.get(k) if k in current_reason_dict else []),
                                       0) for k in record_reason_dict]
    limit_up_reason_statistic_info.sort(key=lambda x: x[1] - x[2])
    limit_up_reason_statistic_info.reverse()
    response_data = json.dumps({"code": 0, "data": {"limit_up_count": len(current_codes),
                                                    "open_limit_up_count": len(record_codes) - len(current_codes),
                                                    "limit_up_reason_statistic": limit_up_reason_statistic_info}})
    # data = code_info_output.get_output_params(code, self.__jingxuan_cache_dict, self.__industry_cache_dict,
    #                                           trade_record_date=date)
    run("", 9004)
servers/huaxin_trade_server.py
@@ -577,22 +577,6 @@
                logger_debug.exception(e)
def __recv_pipe_block_in_money(queue_custom_block_in_money: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_block_in_money 线程ID:{tool.get_thread_id()}")
    if queue_custom_block_in_money is not None:
        while True:
            try:
                val = queue_custom_block_in_money.get()
                if val:
                    val = json.loads(val)
                    in_list, out_list = val[0], val[1]
                    # logger_debug.info(f"接收到流入流出数据:{in_list[:20]}, {out_list[:20]}")
                    RealTimeKplMarketData.set_market_jingxuan_blocks_from_custom(in_list)
                    RealTimeKplMarketData.set_market_jingxuan_out_blocks_from_custom(out_list)
            except Exception as e:
                logger_debug.exception(e)
# 排得太远撤单
def __cancel_buy_for_too_far():
    while True:
@@ -1036,16 +1020,14 @@
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        queue_l1_trade_w_strategy_r, trade_ipc_addr, queue_custom_block_in_money):
        queue_l1_trade_w_strategy_r, trade_ipc_addr):
    """
    @param queue_strategy_r_trade_w:
    @param queue_l1_w_strategy_r:
    @param queue_strategy_w_trade_r:
    @param queue_strategy_w_trade_r_for_read:
    @param queue_l1_trade_w_strategy_r:
    @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址)
    @param queue_custom_block_in_money: 接收板块流入流出队列
    @return:
    """
    logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}")
@@ -1084,10 +1066,6 @@
        l2_log.codeLogQueueDistributeManager.run_async()
        t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True)
        t1.start()
        # 读取板块资金流入
        t1 = threading.Thread(target=lambda: __recv_pipe_block_in_money(queue_custom_block_in_money), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
third_data/code_plate_key_manager.py
@@ -360,8 +360,6 @@
        #         break
        #     blocks.add(data[1])
        # cls.__top_jx_blocks = blocks
        if True:
            return
        blocks = set()
        for data in datas:
            if data[1] in constant.KPL_INVALID_BLOCKS:
@@ -377,35 +375,12 @@
        cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def set_market_jingxuan_blocks_from_custom(cls, datas):
        """
        设置自定义精选流入数据
        @param datas:[(板块,流入金额)]
        @return:
        """
        blocks = set()
        for data in datas:
            if data[0] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[1] < 5e7:
                continue
            blocks.add(data[0])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[0]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}")
        cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def set_market_jingxuan_out_blocks(cls, datas):
        """
        设置精选流出数据
        @param datas:
        @return:
        """
        if True:
            return
        blocks = set()
        for data in datas:
            if data[1] in constant.KPL_INVALID_BLOCKS:
@@ -417,28 +392,6 @@
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[1]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks)
    @classmethod
    def set_market_jingxuan_out_blocks_from_custom(cls, datas):
        """
        设置自定义精选流出数据
        @param datas:[(板块,流入金额)]
        @return:
        """
        blocks = set()
        for data in datas:
            if data[0] in constant.KPL_INVALID_BLOCKS:
                continue
            if data[1] > -5e7:
                # 过滤5千万以上的
                break
            blocks.add(data[0])
            if len(blocks) >= 10:
                break
            blocks.add(kpl_util.filter_block(data[0]))
        # 记录精选流出日志
        async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}")
        cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks)
third_data/custom_block_in_money_manager.py
@@ -10,28 +10,39 @@
from utils import tool
@tool.singleton
class CodeInMoneyManager:
    """
    单个票流入的金额计算
    """
    __code_money_dict = {}
    def __init__(self):
        self.__code_money_dict = {}
        self.__load_data()
    @classmethod
    def set_market_info(cls, code, price, pre_price, current_amount):
        """
        设置市场行情信息
        @param code:
        @param price:
        @param pre_price:
        @param current_amount:
        @return:
        """
        money = (price - pre_price) * current_amount / pre_price
        cls.__code_money_dict[code] = int(money)
    def __load_data(self):
        with open(f"{constant.get_path_prefix()}\\logs\\huaxin_local\\l2\\upload.{tool.get_now_date_str()}.log") as f:
            lines = f.readlines()
            for line in lines:
                line = line.split(" - ")[1]
                item = eval(line)
                self.add_data(item)
    @classmethod
    def get_money(cls, code):
        return cls.__code_money_dict.get(code)
    def add_data(self, item):
        code = item[0]
        if code not in self.__code_money_dict:
            self.__code_money_dict[code] = 0
        if item[1] == 0:
            self.__code_money_dict[code] += item[2][2]
        else:
            self.__code_money_dict[code] -= item[2][2]
    def get_code_money_dict(self):
        return self.__code_money_dict
    def get_money(self, code):
        if code in self.__code_money_dict:
            return self.__code_money_dict.get(code)
        return 0
    def set_money(self, code, money):
        self.__code_money_dict[code] = money
@tool.singleton
@@ -76,7 +87,7 @@
        codes = self.codes
        block_money = {}
        for code in codes:
            money = CodeInMoneyManager.get_money(code)
            money = CodeInMoneyManager().get_money(code)
            if money is None:
                continue
            before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code)
@@ -101,3 +112,10 @@
    def get_out_list(self):
        return self.__out_list
if __name__ == '__main__':
    print(CodeInMoneyManager().get_money("300264"))
    # BlockInMoneyRankManager().compute()
    # print(BlockInMoneyRankManager().get_in_list()[:20])
    # print(BlockInMoneyRankManager().get_out_list()[:20])
third_data/kpl_data_manager.py
@@ -487,17 +487,17 @@
            logger_debug.info("任务修复-开盘啦:涨停列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        key = "jingxuan_rank"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:精选流入列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        key = "jingxuan_rank_out"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:精选流出列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
        # key = "jingxuan_rank"
        # if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
        #     logger_debug.info("任务修复-开盘啦:精选流入列表")
        #     # 大于20s就需要更新
        #     threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        #
        # key = "jingxuan_rank_out"
        # if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
        #     logger_debug.info("任务修复-开盘啦:精选流出列表")
        #     # 大于20s就需要更新
        #     threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
    @classmethod
    def run_limit_up_task(cls):
@@ -602,8 +602,8 @@
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
        # threading.Thread(target=cls.run_market_jingxuan_in, daemon=True).start()
        # threading.Thread(target=cls.run_market_jingxuan_out, daemon=True).start()
@tool.singleton
trade/buy_radical/block_special_codes_manager.py
@@ -193,5 +193,5 @@
    datas = AnalysisBlockSpecialCodesManager().get_block_special_codes()
    for d in datas:
        print(d)
    # BlockSpecialCodesManager().set_block_codes_list(datas)
    BlockSpecialCodesManager().set_block_codes_list(datas)
    print(datas)