Administrator
2025-06-09 8b7972581d0324e3f634b5b5a57a9ed7db1addaf
低吸数据采集
3个文件已修改
1个文件已添加
73 ■■■■■ 已修改文件
api/low_suction_data_pusher.py 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client_test.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/low_suction_data_pusher.py
New file
@@ -0,0 +1,53 @@
"""
低吸数据推送
"""
import json
import requests
import concurrent.futures
__data_push_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
__big_order_data_push_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
def push_limit_up_list(datas):
    """
    推送涨停列表信息
    @param datas:
    @return:
    """
    def push():
        requests.post("http://127.0.0.1:9008/upload_limit_up_list", json=json.dumps(datas), timeout=3)
    __data_push_thread_pool.submit(
        lambda: push())
def push_big_order(datas):
    """
    推送大单信息
    @param datas:
    @return:
    """
    def push():
        requests.post("http://127.0.0.1:9008/upload_big_order_datas", json=json.dumps(datas), timeout=3)
    __big_order_data_push_thread_pool.submit(
        lambda: push())
def push_block_in(datas):
    """
    推送板块流入信息
    @param datas:
    @return:
    """
    def push():
        requests.post("http://127.0.0.1:9008/upload_block_in_datas", json=json.dumps(datas), timeout=3)
    __data_push_thread_pool.submit(
        lambda: push())
huaxin_client/l2_client_test.py
@@ -160,7 +160,7 @@
        order_time = item[4]
        if self.accurate_buy:
            self.add_transaction_data_for_accurate(item, big_order_money_threshold=50e4)
            self.add_transaction_data_for_accurate(item, big_order_money_threshold=100e4)
        if not self.__latest_buy_order:
            # (买单号, 量, 金额, 时间, 最新成交价格)
l2_test.py
@@ -11,6 +11,7 @@
import psutil
import requests
from api import low_suction_data_pusher
from code_attribute import global_data_loader
from huaxin_client import l2_client_test, l1_subscript_codes_manager
from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \
@@ -143,7 +144,12 @@
def __save_accurate_big_order(big_accurate_order_queue):
    while True:
        try:
            datas = []
            while not big_accurate_order_queue.empty():
            data = big_accurate_order_queue.get()
                datas.append(data)
            low_suction_data_pusher.push_big_order(datas)
            for data in datas:
            logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}")
        except:
            pass
@@ -222,6 +228,10 @@
            __upload_data("jingxuan_rank", json.dumps(fins))
            __upload_data("jingxuan_rank_out", json.dumps(fouts))
            __upload_codes_in_money()
            try:
                low_suction_data_pusher.push_block_in(in_list)
            except:
                pass
        except Exception as e:
            logging.exception(e)
        finally:
servers/data_server.py
@@ -10,6 +10,7 @@
import constant
import inited_data
from api import low_suction_data_pusher
from code_attribute.gpcode_manager import BlackListCodeManager, HumanRemoveForbiddenManager
from l2.huaxin import huaxin_target_codes_manager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
@@ -1207,6 +1208,13 @@
                                    lambda: request_new_blocks_codes(update_new_block_plates, new_block_codes.keys()))
                    except Exception as e:
                        logger_debug.exception(e)
                    # 将数据推送至其他项目
                    try:
                        low_suction_data_pusher.push_limit_up_list(result_list_)
                    except:
                        pass
                    self.__kplDataManager.save_data(type_, result_list_)
            except Exception as e:
                logger_debug.exception(e)