import json
|
import multiprocessing
|
import threading
|
import time
|
|
import requests
|
import schedule
|
|
from api import outside_api_callback
|
from api.outside_api_command_manager import ApiCommandManager
|
from db.redis_manager_delegate import RedisUtils
|
from huaxin_client import l2_market_client, trade_client, l1_subscript_codes_manager
|
from log_module import async_log_util
|
from log_module.log import logger_debug
|
from server import data_server
|
from strategy import strategy_manager
|
from strategy.env_info import RealTimeEnvInfo
|
from third_data import hx_qc_value_util
|
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
|
from trade.huaxin import huaxin_trade_api
|
from utils import tool, middle_api_protocol
|
|
|
def __run_l2_market_subscript():
|
def read_results():
|
while True:
|
try:
|
data = queue_l1_w_strategy_r.get()
|
if data.get("type") == 'set_target_codes':
|
# [(代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)]
|
market_data_list = data["data"]["data"]
|
if strategy_manager.low_suction_strtegy:
|
strategy_manager.low_suction_strtegy.add_ticks(market_data_list)
|
RealTimeEnvInfo().ticks = (tool.get_now_time_str(), len(market_data_list))
|
except Exception as e:
|
logger_debug.exception(e)
|
time.sleep(0.1)
|
|
queue_l1_w_strategy_r: multiprocessing.Queue = multiprocessing.Queue()
|
l2MarketProcess = multiprocessing.Process(target=l2_market_client.run,
|
args=(queue_l1_w_strategy_r,))
|
l2MarketProcess.start()
|
read_results()
|
|
|
def __init():
|
"""
|
初始化
|
@return:
|
"""
|
|
# 定时更新代码精选板块
|
def run_pending():
|
# 更新今日代码精选板块
|
codes = set()
|
codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
|
codes |= set([x.decode() for x in codes_sh])
|
codes |= set([x.decode() for x in codes_sz])
|
day = tool.get_now_date_str()
|
schedule.every().day.at("11:05:00").do(lambda: KPLCodeJXBlocksManager(day, codes).start_download_blocks())
|
while True:
|
try:
|
schedule.run_pending()
|
except:
|
pass
|
finally:
|
time.sleep(1)
|
|
threading.Thread(target=run_pending, daemon=True).start()
|
|
|
def test():
|
time.sleep(10)
|
result = huaxin_trade_api.get_money(blocking=True)
|
logger_debug.info(f"测试交易账户获取:{result}")
|
# 发送信息
|
requests.post("http://127.0.0.1:9008/upload_big_order_datas", json=json.dumps({"data": [(1, 2, 3, 4, 5)]}))
|
# 获取增值服务API
|
result = hx_qc_value_util.get_next_trading_date("2025-06-06")
|
logger_debug.info(f"测试获取下一个交易日:{result}")
|
|
|
if __name__ == "__main__":
|
# -------启动华鑫增值服务api------
|
threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
|
time.sleep(10)
|
# -----启动data_server-----
|
threading.Thread(target=lambda: data_server.run("127.0.0.1", 9008), daemon=True).start()
|
|
# 启动本地日志
|
threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
|
|
# 启动redis同步
|
threading.Thread(target=lambda: RedisUtils.run_loop(), daemon=True).start()
|
|
# --------启动本地API接口----------
|
manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT,
|
outside_api_callback.MyAPICallback())
|
manager.run(blocking=False)
|
|
# --------启动交易----------
|
huaxin_trade_api.run()
|
|
threading.Thread(target=test, daemon=True).start()
|
# test()
|
|
# ----初始化------------
|
__init()
|
|
# 初始化数据
|
strategy_manager.low_suction_strtegy = strategy_manager.LowSuctionStrategy(tool.get_now_date_str())
|
|
|
|
# -------启动L2 market订阅------
|
__run_l2_market_subscript()
|
print("启动完成")
|