Administrator
2025-06-24 50805343f897aadad3da99c3dacefcbc095455e5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import json
import multiprocessing
import pickle
import threading
import time
 
import requests
import schedule
 
from api import outside_api_callback
from api.outside_api_command_manager import ApiCommandManager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l2_market_client, trade_client, l1_subscript_codes_manager
from log_module import async_log_util
from log_module.log import logger_debug, logger_system
from server import data_server
from strategy import strategy_manager
from strategy.env_info import RealTimeEnvInfo
from strategy.strategy_variable_factory import DataLoader
from third_data import hx_qc_value_util
from third_data.history_k_data_manager import TradeDateManager
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
from trade.huaxin import huaxin_trade_api
from utils import tool, middle_api_protocol
 
 
def __run_l2_market_subscript():
    def read_results():
        while True:
            try:
                data = queue_l1_w_strategy_r.get()
                data = pickle.loads(data)
                if data.get("type") == 'set_target_codes':
                    # [(代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)]
                    market_data_list = data["data"]["data"]
                    if strategy_manager.low_suction_strtegy:
                        strategy_manager.low_suction_strtegy.add_ticks(market_data_list)
                    RealTimeEnvInfo().ticks = (tool.get_now_time_str(), len(market_data_list))
            except Exception as e:
                logger_debug.exception(e)
                time.sleep(0.1)
 
    queue_l1_w_strategy_r: multiprocessing.Queue = multiprocessing.Queue()
    l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
                                              args=(queue_l1_w_strategy_r,))
    l2MarketProcess.start()
    read_results()
 
 
def __init():
    """
    初始化
    @return:
    """
 
    def update_leading_limit_up_datas():
        """
        更新领涨代码信息
        @return:
        """
        print("*******更新领涨信息********")
 
        def update():
            plates = __DataLoader.get_limit_up_reasons_with_plate_code()
            for p in plates:
                __DataLoader.load_plate_codes(p[0], p[1])
 
        if tool.get_now_time_str() < '16:00:00':
            # 如果在16:00之前采用当前日期
            day = tool.get_now_date_str()
        else:
            # 如果在16:00之后采用下一个交易日
            day = TradeDateManager().get_next_trade_day(tool.get_now_date_str())
        __DataLoader = DataLoader(day)
        threading.Thread(target=lambda: update(), daemon=True).start()
        return {"code": 0}
 
    # 定时更新代码精选板块
    def run_pending():
        # 更新今日代码精选板块
        codes = set()
        codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
        codes |= set([x.decode() for x in codes_sh])
        codes |= set([x.decode() for x in codes_sz])
        day = tool.get_now_date_str()
        schedule.every().day.at("08:05:00").do(lambda: KPLCodeJXBlocksManager(day, codes).start_download_blocks())
        schedule.every().day.at("08:10:00").do(lambda: update_leading_limit_up_datas())
        while True:
            try:
                schedule.run_pending()
            except:
                pass
            finally:
                time.sleep(1)
 
    threading.Thread(target=run_pending, daemon=True).start()
 
 
def test():
    time.sleep(10)
    result = huaxin_trade_api.get_money(blocking=True)
    logger_debug.info(f"测试交易账户获取:{result}")
    # 发送信息
    requests.post("http://127.0.0.1:9008/upload_big_order_datas", json=json.dumps({"data": [(1, 2, 3, 4, 5)]}))
    # 获取增值服务API
    result = hx_qc_value_util.get_next_trading_date("2025-06-06")
    logger_debug.info(f"测试获取下一个交易日:{result}")
 
 
if __name__ == "__main__":
    # -------启动华鑫增值服务api------
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
    time.sleep(10)
    # -----启动data_server-----
    threading.Thread(target=lambda: data_server.run("127.0.0.1", 9008), daemon=True).start()
 
    # 启动本地日志
    threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
 
    # 启动redis同步
    threading.Thread(target=lambda: RedisUtils.run_loop(), daemon=True).start()
 
    # --------启动本地API接口----------
    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT,
                                outside_api_callback.MyAPICallback())
    manager.run(blocking=False)
 
    try:
        # --------启动交易----------
        huaxin_trade_api.run()
    except:
        pass
 
    # ----初始化------------
    __init()
    # 初始化数据
    strategy_manager.low_suction_strtegy = strategy_manager.LowSuctionStrategy(tool.get_now_date_str())
    logger_system.info("初始化策略对象成功")
    try:
        strategy_manager.low_suction_strtegy.load_data()
        logger_system.info("加载策略数据成功")
    except Exception as e:
        logger_system.error(f"加载策略数据失败:{str(e)}")
        logger_system.exception(e)
 
    # -------启动L2 market订阅------
    try:
        __run_l2_market_subscript()
        logger_system.info("系统结束")
    except:
        pass