admin
2025-04-14 bf0c5badfe9c0efe7340af7d0a6356461d9ea961
真实交易环境准备
8个文件已修改
130 ■■■■ 已修改文件
constant.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/command_manager.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/constant.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/order_methods.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_trade_api.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -81,3 +81,6 @@
# 订阅L2代码数据
SUBSCRIPT_L2_CODES = set()
# 是否是仿真交易
IS_SIMULATED_TRADE = False
data_server.py
@@ -10,6 +10,7 @@
import psutil
import constant
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from db.redis_manager_delegate import RedisUtils
from log_module import log_export
@@ -239,12 +240,13 @@
            print("接收到POST请求:", str(path))
            url = urlparse.urlparse(path)
            if url.path == "/trade_callback":
                # 接受开盘啦数据
                body = self.__parse_request()
                if type(body) != str:
                    huaxin_trade_api.add_trade_callback_data(json.dumps(body))
                else:
                    huaxin_trade_api.add_trade_callback_data(body)
                if constant.IS_SIMULATED_TRADE:
                    # 接受开盘啦数据
                    body = self.__parse_request()
                    if type(body) != str:
                        huaxin_trade_api.add_trade_callback_data(json.dumps(body))
                    else:
                        huaxin_trade_api.add_trade_callback_data(body)
                result_str = json.dumps({"code": 0})
            elif url.path == "/buy":
                # 签名验证
huaxin_client/command_manager.py
@@ -72,9 +72,12 @@
    @classmethod
    def init(cls, trade_action_callback: TradeActionCallback,
             queue_strategy_trade_read_for_trade: multiprocessing.Queue):
             queue_strategy_trade_read_for_trade: multiprocessing.Queue,
             queue_strategy_w_trade_for_query: multiprocessing.Queue
             ):
        cls.action_callback = trade_action_callback
        cls.queue_strategy_w_trade_r = queue_strategy_trade_read_for_trade
        cls.queue_strategy_w_trade_r_for_query = queue_strategy_w_trade_for_query
    @classmethod
    def process_command(cls, _type, client_id, result_json, sk=None):
@@ -128,13 +131,14 @@
            async_log_util.exception(logger_local_huaxin_trade_debug, e)
    # 维护连接数的稳定
    def run(self, blocking=True):
        if blocking:
            self.run_process_command(self.queue_strategy_w_trade_r)
        else:
            # 接受命令
            t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r), daemon=True)
            t1.start()
    def run(self):
        # 接受命令
        t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r), daemon=True)
        t1.start()
        t1 = threading.Thread(target=lambda: self.run_process_command(self.queue_strategy_w_trade_r_for_query),
                              daemon=True)
        t1.start()
# L2指令管理
huaxin_client/constant.py
@@ -2,7 +2,7 @@
# addr, port = "111.230.16.67", 10008
# SERVER_IP = '192.168.3.122'
# 公用服务器:192.168.84.71 托管服务器:192.168.84.126
LOCAL_IP = "192.168.84.126"
LOCAL_IP = "192.168.84.71"
TEST = True
L1_MIN_RATE = 3.0
huaxin_client/trade_client.py
@@ -17,18 +17,16 @@
    logger_local_huaxin_trade_debug, printlog
from utils import socket_util, tool
IS_TEST = True
########B类########
UserID = 'xxxx'
UserID = '388000013942'
# 登陆密码
Password = 'xxxx'
Password = '110808'
# 投资者账户
InvestorID = '388000013349'
InvestorID = '388000013942'
# 经济公司部门代码
DepartmentID = '0003'
# 资金账户
AccountID = '388000013349'
AccountID = '388000013942'
# 沪市股东账号
SSE_ShareHolderID = 'A641420991'
# 深市股东账号
@@ -37,25 +35,6 @@
LOCAL_IP = constant.LOCAL_IP
FRONT_ADDRESS = "tcp://192.168.84.31:6500"
FRONT_ADDRESS1 = "tcp://192.168.84.32:26500"
if IS_TEST:
    # 仿真
    UserID = '00043201'
    # 登陆密码
    Password = '45249973'
    # 投资者账户
    InvestorID = '11160150'
    # 经济公司部门代码
    DepartmentID = '0003'
    # 资金账户
    AccountID = '00043201'
    # 沪市股东账号
    SSE_ShareHolderID = 'A00043201'
    # 深市股东账号
    SZSE_ShareHolderID = '700043201'
    LOCAL_IP = "127.0.0.1"
    FRONT_ADDRESS = "tcp://210.14.72.21:42370"
    FRONT_ADDRESS1 = "tcp://210.14.72.21:42370"
@@ -1105,7 +1084,7 @@
addr, port = "127.0.0.1", 9004
def run(queue_strategy_r_trade_w=None, queue_strategy_w_trade_r=None):
def run(queue_strategy_r_trade_w=None, queue_strategy_w_trade_r=None, queue_strategy_w_trade_for_query_r=None):
    try:
        logger_system.info("交易进程ID:{}", os.getpid())
        logger_system.info(f"trade 线程ID:{tool.get_thread_id()}")
@@ -1119,7 +1098,7 @@
        global tradeCommandManager
        tradeCommandManager = command_manager.TradeCommandManager()
        tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_w_trade_r)
        tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r)
        logger_system.info("华鑫交易服务启动")
        tradeCommandManager.run()
    except Exception as e:
main.py
@@ -1,6 +1,7 @@
# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
import logging
import multiprocessing
import threading
import time
@@ -19,7 +20,7 @@
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \
    order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis, \
    selling_strategy
from huaxin_client import l2_market_client, l2_client
from huaxin_client import l2_market_client, l2_client, trade_client
from log_module import async_log_util, log
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
@@ -111,7 +112,8 @@
    # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
    # 涨停概念线程
    # threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
    threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,),
                     daemon=True).start()
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
@@ -201,6 +203,7 @@
l2_data_callbacks = []
# 订阅持仓L2数据
def __subscript_position_l2():
    """
@@ -227,6 +230,8 @@
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    log.close_print()
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
        def on_markets(self, datas):
            """
@@ -250,8 +255,17 @@
    # redis 数据同步
    threading.Thread(target=RedisUtils.run_loop, daemon=True).start()
    # 策略与交易通信队列
    queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
    # 不是模拟盘的时候启动交易
    if not constant.IS_SIMULATED_TRADE:
        multiprocessing.Process(target=trade_client.run, args=(
            queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r,),
                                daemon=True).start()
    # 启动交易
    order_methods.run()
    order_methods.run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r)
    # 运行华鑫增值服务进程,用于获取K线与交易日历
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
strategy/order_methods.py
@@ -135,7 +135,8 @@
    :return:  尝试返回的订单数据
    """
    logger.info(f"当前个股持仓手数【当前函数被调用时传进来的同步数据data_cache中的持仓数据】==={position_volume_yesterday}")
    logger.info(
        f"当前个股持仓手数【当前函数被调用时传进来的同步数据data_cache中的持仓数据】==={position_volume_yesterday}")
    # sell_order_volume = int(position_volume_yesterday * part_of_volume)
    sell_order_volume = round(position_volume_yesterday * part_of_volume / 100) * 100
    logger.info(f"当前计划比例==={part_of_volume},当前委托量==={sell_order_volume}")
@@ -152,7 +153,8 @@
            # data_cache.account_positions[index]['volume'] = position_volume_yesterday - sell_order_volume
            if data_cache.account_positions_dict[index]['currentPosition'] <= 0:
                logger.info(f"data_cache.account_positions == {data_cache.account_positions_dict}")
                logger.info(f"下单后,【{sec_name}】的剩余可用持仓数量==={data_cache.account_positions_dict[index]['currentPosition']}")
                logger.info(
                    f"下单后,【{sec_name}】的剩余可用持仓数量==={data_cache.account_positions_dict[index]['currentPosition']}")
                # 本票本次卖票,可用仓位为0或小于0,,移除【可用持仓代码】集合
                '''
                全局变量中的可用个股数量,由于只在【集合竞价】阶段用,如果移除会影响进入次数,暂不考虑使用
@@ -173,7 +175,7 @@
        sell_order_by_volume(symbol, 100, sec_name, current_price)
def run():
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r):
    class MyTradeCallback(huaxin_trade_api.TradeCallback):
        def on_order(self, order_info):
            """
@@ -195,6 +197,6 @@
            threading.Thread(target=lambda: middle_api_protocol.push(
                middle_api_protocol.load_push_msg({"type": "order", "data": order_info})), daemon=True).start()
    queue = multiprocessing.Queue()
    huaxin_trade_api.run_trade(queue, MyTradeCallback())
    huaxin_trade_api.run_trade(queue_strategy_r_trade_w, MyTradeCallback(), queue_strategy_w_trade_r,
                               queue_strategy_w_trade_for_query_r)
    threading.Thread(target=data_server.run, daemon=True).start()
trade/huaxin_trade_api.py
@@ -9,6 +9,7 @@
import threading
import time
import constant
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade, \
    logger_system
@@ -85,16 +86,20 @@
# 设置交易通信队列
# 暂时不会使用该方法
def run_trade(queue_strategy_r_trade_w_, trade_callback_: TradeCallback):
def run_trade(queue_strategy_r_trade_w_, trade_callback_: TradeCallback, queue_strategy_w_trade_r_,
              queue_strategy_w_trade_for_query_r_):
    """
    :param queue_strategy_w_trade_for_query_r_: 策略写交易读(用于数据查询)
    :param queue_strategy_w_trade_r_: 策略写交易读
    :param trade_callback_: 订单回调
    :param queue_strategy_r_trade_w_: 接收交易结果数据队列
    :return:
    """
    global queue_strategy_r_trade_w, trade_callback
    global queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r, trade_callback
    queue_strategy_r_trade_w = queue_strategy_r_trade_w_
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    queue_strategy_w_trade_for_query_r = queue_strategy_w_trade_for_query_r_
    trade_callback = trade_callback_
    # 读取交易结果
    threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True).start()
@@ -110,11 +115,6 @@
# 交易通道的错误次数
trade_pipe_channel_error_count = 0
# pipe的交易通道是否正常
def is_pipe_channel_normal():
    return True
# 测试交易通道
@@ -314,7 +314,15 @@
                     }
        root_data = socket_util.encryp_client_params_sign(root_data)
        start_time = time.time()
        threading.Thread(target=__request_delegate, args=(request_id, _type, data,), daemon=True).start()
        if constant.IS_SIMULATED_TRADE:
            # =========模拟盘交易代理请求==========
            threading.Thread(target=__request_delegate, args=(request_id, _type, data,), daemon=True).start()
        else:
            # ===========真实盘交易===============
            if is_trade:
                queue_strategy_w_trade_r.put_nowait(root_data)
            else:
                queue_strategy_w_trade_for_query_r.put_nowait(root_data)
        use_time = int((time.time() - start_time) * 1000)
        if use_time > 10: