import json import multiprocessing import threading import time import requests import schedule from api import outside_api_callback from api.outside_api_command_manager import ApiCommandManager from db.redis_manager_delegate import RedisUtils from huaxin_client import l2_market_client, trade_client, l1_subscript_codes_manager from log_module import async_log_util from log_module.log import logger_debug from server import data_server from strategy import strategy_manager from strategy.env_info import RealTimeEnvInfo from third_data import hx_qc_value_util from third_data.kpl_block_manager import KPLCodeJXBlocksManager from trade.huaxin import huaxin_trade_api from utils import tool, middle_api_protocol def __run_l2_market_subscript(): def read_results(): while True: try: data = queue_l1_w_strategy_r.get() if data.get("type") == 'set_target_codes': # [(代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)] market_data_list = data["data"]["data"] if strategy_manager.low_suction_strtegy: strategy_manager.low_suction_strtegy.add_ticks(market_data_list) RealTimeEnvInfo().ticks = (tool.get_now_time_str(), len(market_data_list)) except Exception as e: logger_debug.exception(e) time.sleep(0.1) queue_l1_w_strategy_r: multiprocessing.Queue = multiprocessing.Queue() l2MarketProcess = multiprocessing.Process(target=l2_market_client.run, args=(queue_l1_w_strategy_r,)) l2MarketProcess.start() read_results() def __init(): """ 初始化 @return: """ # 定时更新代码精选板块 def run_pending(): # 更新今日代码精选板块 codes = set() codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() codes |= set([x.decode() for x in codes_sh]) codes |= set([x.decode() for x in codes_sz]) day = tool.get_now_date_str() schedule.every().day.at("08:05:00").do(lambda: KPLCodeJXBlocksManager(day, codes).start_download_blocks()) while True: try: schedule.run_pending() except: pass finally: time.sleep(1) threading.Thread(target=run_pending, daemon=True).start() def test(): time.sleep(10) result = huaxin_trade_api.get_money(blocking=True) logger_debug.info(f"测试交易账户获取:{result}") # 发送信息 requests.post("http://127.0.0.1:9008/upload_big_order_datas", json=json.dumps({"data": [(1, 2, 3, 4, 5)]})) # 获取增值服务API result = hx_qc_value_util.get_next_trading_date("2025-06-06") logger_debug.info(f"测试获取下一个交易日:{result}") if __name__ == "__main__": # -------启动华鑫增值服务api------ threading.Thread(target=hx_qc_value_util.run, daemon=True).start() time.sleep(10) # -----启动data_server----- threading.Thread(target=lambda: data_server.run("127.0.0.1", 9008), daemon=True).start() # 启动本地日志 threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() # 启动redis同步 threading.Thread(target=lambda: RedisUtils.run_loop(), daemon=True).start() # --------启动本地API接口---------- manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, outside_api_callback.MyAPICallback()) manager.run(blocking=False) # --------启动交易---------- huaxin_trade_api.run() threading.Thread(target=test, daemon=True).start() # test() # ----初始化------------ __init() # 初始化数据 strategy_manager.low_suction_strtegy = strategy_manager.LowSuctionStrategy(tool.get_now_date_str()) # -------启动L2 market订阅------ __run_l2_market_subscript() print("启动完成")