Administrator
2024-11-19 cf32945520de905d86f87c04cad426a7402d12d8
huaxin_client/l1_client.py
@@ -11,8 +11,6 @@
from huaxin_client import tool, constant
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
from third_data import custom_block_in_money_manager
from third_data.custom_block_in_money_manager import BlockInMoneyRankManager, CodeInMoneyManager
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
from utils import tool as out_tool
################B类##################
@@ -137,11 +135,6 @@
            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
            pMarketDataField.BidVolume2, pMarketDataField.UpdateTime)
        try:
            custom_block_in_money_manager.CodeInMoneyManager.set_market_info(pMarketDataField.SecurityID, lastPrice,
                                                                             close_price, pMarketDataField.Turnover)
        except:
            pass
__latest_subscript_codes = set()
@@ -217,37 +210,13 @@
        finally:
            time.sleep(3)
def __test_block_in_money():
    codes = BlockInMoneyRankManager().get_codes()
    logger_debug.info("获取到测试净流入代码数量:{}", len(codes))
    page_size = 200
    total_page = len(codes) // page_size + 1
    for i in range(0, total_page):
        temp_codes = codes[i * page_size: (i + 1) * page_size]
        print(temp_codes)
        # 获取最近的信息
        latest_infos = HistoryKDatasUtils.get_gp_latest_info(temp_codes, "sec_id,pre_close")
        pre_close_dict = {x["sec_id"]: x["pre_close"] for x in latest_infos}
        current_infos = JueJinApi.get_gp_current_info(temp_codes, "symbol,price,cum_amount")
        current_infos = {x["symbol"].split(".")[1]: (x["price"], x["cum_amount"]) for x in
                         current_infos}
        for code in current_infos:
            if code not in pre_close_dict:
                continue
            CodeInMoneyManager.set_market_info(code, current_infos[code][0], pre_close_dict[code],
                                               current_infos[code][1])
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, queue_custom_block_in_money, fixed_codes=None):
def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None):
    """
    运行l1订阅任务
    @param queue_l1_w_strategy_r: L1方写,策略方读
    @param queue_l1_r_strategy_w: L1方读,策略方写
    @param queue_custom_block_in_money: 板块流入流出计算结果
    @param fixed_codes: 固定要返回数据的代码
    @return:
    """
@@ -306,19 +275,10 @@
    threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start()
    # 测试
    # __test_block_in_money()
    # 等待程序结束
    while True:
        # print("数量", len(level1_data_dict))
        try:
            # 计算流入流出并上传
            custom_block_in_money_manager.BlockInMoneyRankManager().compute()
            val = (custom_block_in_money_manager.BlockInMoneyRankManager().get_in_list(),  custom_block_in_money_manager.BlockInMoneyRankManager().get_out_list())
            val = json.dumps(val)
            queue_custom_block_in_money.put_nowait(val)
            if len(level1_data_dict) < 1:
                continue
            # 根据涨幅排序