Administrator
5 天以前 994079acd0ac30a32e2b0391881890be16b0afc0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
import multiprocessing
import threading
import time
 
import requests
import schedule
 
from api import outside_api_callback
from api.outside_api_command_manager import ApiCommandManager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l2_market_client, trade_client, l1_subscript_codes_manager
from log_module import async_log_util
from log_module.log import logger_debug
from server import data_server
from strategy import strategy_manager
from strategy.env_info import RealTimeEnvInfo
from third_data import hx_qc_value_util
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
from trade.huaxin import huaxin_trade_api
from utils import tool, middle_api_protocol
 
 
def __run_l2_market_subscript():
    def read_results():
        while True:
            try:
                data = queue_l1_w_strategy_r.get()
                if data.get("type") == 'set_target_codes':
                    # [(代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)]
                    market_data_list = data["data"]["data"]
                    if strategy_manager.low_suction_strtegy:
                        strategy_manager.low_suction_strtegy.add_ticks(market_data_list)
                    RealTimeEnvInfo().ticks = (tool.get_now_time_str(), len(market_data_list))
            except Exception as e:
                logger_debug.exception(e)
                time.sleep(0.1)
 
    queue_l1_w_strategy_r: multiprocessing.Queue = multiprocessing.Queue()
    l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
                                              args=(queue_l1_w_strategy_r,))
    l2MarketProcess.start()
    read_results()
 
 
def __init():
    """
    初始化
    @return:
    """
 
    # 定时更新代码精选板块
    def run_pending():
        # 更新今日代码精选板块
        codes = set()
        codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
        codes |= set([x.decode() for x in codes_sh])
        codes |= set([x.decode() for x in codes_sz])
        day = tool.get_now_date_str()
        schedule.every().day.at("11:05:00").do(lambda: KPLCodeJXBlocksManager(day, codes).start_download_blocks())
        while True:
            try:
                schedule.run_pending()
            except:
                pass
            finally:
                time.sleep(1)
 
    threading.Thread(target=run_pending, daemon=True).start()
 
 
def test():
    time.sleep(10)
    result = huaxin_trade_api.get_money(blocking=True)
    logger_debug.info(f"测试交易账户获取:{result}")
    # 发送信息
    requests.post("http://127.0.0.1:9008/upload_big_order_datas", json=json.dumps({"data": [(1, 2, 3, 4, 5)]}))
    # 获取增值服务API
    result = hx_qc_value_util.get_next_trading_date("2025-06-06")
    logger_debug.info(f"测试获取下一个交易日:{result}")
 
 
if __name__ == "__main__":
    # -------启动华鑫增值服务api------
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
    time.sleep(10)
    # -----启动data_server-----
    threading.Thread(target=lambda: data_server.run("127.0.0.1", 9008), daemon=True).start()
 
    # 启动本地日志
    threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
 
    # 启动redis同步
    threading.Thread(target=lambda: RedisUtils.run_loop(), daemon=True).start()
 
    # --------启动本地API接口----------
    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT,
                                outside_api_callback.MyAPICallback())
    manager.run(blocking=False)
 
    # --------启动交易----------
    huaxin_trade_api.run()
 
    threading.Thread(target=test, daemon=True).start()
    # test()
 
    # ----初始化------------
    __init()
 
    # 初始化数据
    strategy_manager.low_suction_strtegy = strategy_manager.LowSuctionStrategy(tool.get_now_date_str())
 
 
 
    # -------启动L2 market订阅------
    __run_l2_market_subscript()
    print("启动完成")