Administrator
2024-11-21 a0f4a1d5bed0b4be8be122e90d2f95b76f178a94
精简代码/代码归类
1个文件已删除
3个文件已修改
1个文件已添加
858 ■■■■ 已修改文件
main.py 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_api_server.py 538 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 142 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
task/task_manager.py 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py
@@ -11,35 +11,29 @@
import os
import threading
from task import task_manager
logger_system.info("程序启动Pre:{}", os.getpid())
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
import huaxin_client.l2_client
import huaxin_client.l1_client
from huaxin_client import l2_market_client
# 交易服务
# from huaxin_api import trade_client, l2_client, l1_client
from servers import server_util, huaxin_trade_api_server, huaxin_trade_server, server
from servers import server_util, huaxin_trade_server, server
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
                      queue_l1_w_strategy_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_: multiprocessing.Queue,
                      queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_,
                      queue_l1_trade_w_strategy_r_, trade_ipc_addr):
def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue,
                 queue_l1_w_strategy_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_: multiprocessing.Queue,
                 queue_strategy_w_trade_r_for_read_: multiprocessing.Queue,
                 trade_ipc_addr):
    """
    策略进程
    @param pipe_server:
    @param queue_strategy_r_trade_w_:
    @param queue_l1_w_strategy_r_:
    @param queue_strategy_w_trade_r_:
    @param queue_strategy_w_trade_r_for_read_:
    @param queue_l1_trade_r_strategy_w_:
    @param queue_l1_trade_w_strategy_r_:
    @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址)
    @return:
    """
@@ -48,25 +42,18 @@
    # 初始化参数
    server.global_data_loader.init()
    # 数据服务
    t1 = threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True)
    t1.start()
    # 开启数据服务器
    threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start()
    # 交易接口服务
    t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server",
                          args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_),
                          daemon=True)
    t1.start()
    #
    # redis后台服务
    t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True)
    t1.start()
    # 运行数据监听服务
    threading.Thread(target=task_manager.run_data_listener, name="task_manager",
                     args=(queue_other_w_l2_r, queue_l1_w_strategy_r_),
                     daemon=True).start()
    #
    # 启动华鑫交易服务
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_,
    huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_,
                            queue_strategy_w_trade_r_for_read_,
                            queue_l1_trade_w_strategy_r_, trade_ipc_addr)
                            trade_ipc_addr)
# 主服务
@@ -91,17 +78,12 @@
        logger_l2_trade.info("启动程序")
        logger_system.info("启动程序--------")
        log.close_print()
        # 策略与server间的通信
        pss_server, pss_strategy = multiprocessing.Pipe()
        # L2读其他写
        queue_other_w_l2_r = multiprocessing.Queue()
        # l1
        queue_l1_w_strategy_r = multiprocessing.Queue()
        queue_l1_r_strategy_w = multiprocessing.Queue()
        # l1交易
        queue_l1_trade_w_strategy_r = multiprocessing.Queue()
        queue_l1_trade_r_strategy_w = multiprocessing.Queue()
        # 交易读策略写
        queue_strategy_w_trade_r = multiprocessing.Queue()
@@ -112,9 +94,6 @@
        # 下单,撤单ipc地址
        order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc"
        # 托管环境下不创建
        # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
        # serverProcess.start()
        logger_system.info("主进程ID:{}", os.getpid())
        # L1订阅数据
@@ -149,9 +128,9 @@
        # cpu_indexes = [i for i in range(23, 30)]
        # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes)
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr))
        run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                     queue_strategy_w_trade_r_for_read,
                     (order_ipc_addr, cancel_order_ipc_addr))
        # 将tradeServer作为主进程
        l1Process.join()
servers/huaxin_trade_api_server.py
File was deleted
servers/huaxin_trade_server.py
@@ -291,59 +291,6 @@
    __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    __updating_jx_blocks_codes = set()
    @classmethod
    def sell(cls, datas):
        rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL])
        excuted_rule_ids = set()
        if rules:
            for d in datas:
                code = d[0]
                # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量)
                buy1_volume = d[6]
                buy1_price = d[5]
                if buy1_volume:
                    for r in rules:
                        # 生效时间
                        if r.code == code:
                            # --------判断是否可以执行--------
                            can_excute = False
                            if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2):
                                # 价格已经触发
                                if r.buy1_volume:
                                    if r.buy1_volume >= buy1_volume:
                                        # 量价触发
                                        can_excute = True
                                        async_log_util.info(logger_trade, f"触发卖规则:量触发{buy1_volume}/{r.buy1_volume}")
                                else:
                                    can_excute = True
                                    async_log_util.info(logger_trade, f"触发卖规则:价格触发{buy1_price}/{r.buy1_price}")
                                    # 价格触发
                                # 获取价格类型
                                if not can_excute:
                                    continue
                                # 请求卖出锁
                                TradeRuleManager().require_sell_lock(r.id_)
                                try:
                                    if r.id_ in excuted_rule_ids:
                                        continue
                                    excuted_rule_ids.add(r.id_)
                                    # 获取最新的执行状况
                                    r = TradeRuleManager().get_by_id(r.id_)
                                    if r.excuted:
                                        continue
                                    # 提交卖
                                    limit_down_price = gpcode_manager.get_limit_down_price(code)
                                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                                    huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price,
                                                                limit_down_price,
                                                                buy1_price)
                                    TradeRuleManager().excuted(r.id_)
                                except Exception as e:
                                    logger_debug.exception(e)
                                finally:
                                    TradeRuleManager().release_sell_lock(r.id_)
    # 保存现价
    @classmethod
    def __save_l1_current_price(cls, datas):
@@ -400,14 +347,6 @@
        else:
            cls.__process_l1_data_thread_pool.submit(
                lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id))
    @classmethod
    def set_l1_trade_codes_info(cls, data_json):
        data = data_json["data"]
        request_id = data_json["request_id"]
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.sell(datas)
    @classmethod
    def l2_order(cls, code, _datas, timestamp):
@@ -536,13 +475,6 @@
    def trading_order_canceled(cls, code, order_no):
        pass
    @classmethod
    def test_sell(cls):
        # (代码, 现价, 涨幅, 量, 更新时间, 买1价格, 买1量)
        datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210),
                 ("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)]
        cls.sell(datas)
def clear_invalid_client():
    logger_system.info(f"trade_server clear_invalid_client 线程ID:{tool.get_thread_id()}")
@@ -553,28 +485,6 @@
            pass
        finally:
            time.sleep(2)
def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_l1 线程ID:{tool.get_thread_id()}")
    if queue_l1_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
                    # 大于10s的数据放弃处理
                    if type_ == "set_target_codes":
                        async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据")
                        if time.time() * 1000 - timestamp > 10 * 1000:
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logger_debug.exception(e)
# 排得太远撤单
@@ -641,28 +551,6 @@
            logger_debug.exception(e)
        finally:
            time.sleep(3)
def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}")
    if queue_l1_trade_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_trade_w_strategy_r.get()
                if val:
                    async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}")
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    if type_ == "upload_l1_trade_datas":
                        # 处理专为交易提供的L1数据
                        TradeServerProcessor.set_l1_trade_codes_info(val)
                        async_log_util.info(logger_local_huaxin_l1_trade_info, val)
            except Exception as e:
                logger_local_huaxin_l1_trade_info.exception(e)
                logging.exception(e)
class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack):
@@ -873,7 +761,8 @@
                    result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas)
                    async_log_util.info(logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}")
                    in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks()
                    buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),in_blocks.index(b) if b in  in_blocks else -1) for b in buy_blocks]
                    buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),
                                              in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks]
                    if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE:
                        if tool.get_now_time_as_int() < 93200:
                            radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code)
@@ -1018,19 +907,17 @@
    threading.Thread(target=run_pending, daemon=True).start()
    l2_data_util.load_l2_data_all(True)
    # L2成交信号回调
    L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback())
    # 加载自由流通量
    global_data_loader.load_zyltgb_volume_from_db()
def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read,
        queue_l1_trade_w_strategy_r, trade_ipc_addr):
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):
    """
    @param queue_strategy_r_trade_w:
    @param queue_l1_w_strategy_r:
    @param queue_strategy_w_trade_r:
    @param queue_strategy_w_trade_r_for_read:
    @param queue_l1_trade_w_strategy_r:
    @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址)
    @return:
    """
@@ -1050,32 +937,15 @@
        huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r,
                                        queue_strategy_w_trade_r_for_read, trade_ipc_addr)
        # 监听l1那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True)
        t1.start()
        # 监听l1交易那边传过来的代码
        t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True)
        t1.start()
        # 下单距离太远取消订单
        t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True)
        t1.start()
        # 同步异步日志
        t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True)
        t1.start()
        # 同步L2的异步日志
        l2_log.codeLogQueueDistributeManager.run_async()
        t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
        # 清理无用的客户端
        t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
        t1.start()
        logger_system.info("create TradeServer")
        laddr = "0.0.0.0", 10008
        try:
            tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
task/task_manager.py
New file
@@ -0,0 +1,111 @@
import json
import logging
import multiprocessing
import threading
import time
from db import redis_manager_delegate as redis_manager
from l2 import l2_log
from l2.huaxin import huaxin_target_codes_manager
from log_module import async_log_util
from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug
from servers.huaxin_trade_server import TradeServerProcessor
from third_data import block_info
from trade.huaxin import huaxin_trade_data_update
from trade.huaxin.huaxin_trade_api import ClientSocketManager
from utils import tool
def __listen_l1_target_codes(queue_l1_w_strategy_r: multiprocessing.Queue):
    logger_system.info(f"__listen_l1_target_codes 线程ID:{tool.get_thread_id()}")
    if queue_l1_w_strategy_r is not None:
        while True:
            try:
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
                    # 大于10s的数据放弃处理
                    if type_ == "set_target_codes":
                        async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据")
                        if time.time() * 1000 - timestamp > 10 * 1000:
                            continue
                        TradeServerProcessor.set_target_codes(val)
            except Exception as e:
                logger_debug.exception(e)
def __listen_l2_subscript_target_codes(queue_other_w_l2_r: multiprocessing.Queue):
    """
    监听L2订阅目标代码
    @param queue_other_w_l2_r:
    @return:
    """
    logger_system.info("启动读取L2订阅代码队列")
    while True:
        try:
            _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop()
            if _datas:
                times = _datas[0]
                datas = _datas[1]
                request_id = _datas[2]
                logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}", request_id, len(datas))
                # 只处理20s内的数据
                if time.time() - times < 20:
                    # 获取涨停列表中的数据
                    # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间)
                    if not datas:
                        # 没有数据需要处理
                        continue
                    # 再次获取代码
                    codes = [d[0] for d in datas]
                    for code in codes:
                        block_info.init_code(code)
                    root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                 "data": datas}
                    queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    # 如果在9:25-9:29 需要加载板块
                    # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"):
                    #     for d in datas:
                    #         threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0],
                    #                                                                                gpcode_manager.get_price(
                    #                                                                                    d[0]),
                    #                                                                                float(d[2]),
                    #                                                                                KPLLimitUpDataRecordManager.get_current_reasons()),
                    #                          daemon=True).start()
                    #         time.sleep(0.2)
                    logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            time.sleep(0.01)
def run_data_listener(queue_other_w_l2_r, queue_l1_w_strategy_r):
    """
    运行数据监听器
    @param queue_other_w_l2_r:
    @return:
    """
    # 交易数据更新任务
    huaxin_trade_data_update.run()
    # 接收来自L1的数据
    threading.Thread(target=lambda: __listen_l1_target_codes(queue_l1_w_strategy_r), daemon=True).start()
    # 接收L2订阅
    threading.Thread(target=lambda: __listen_l2_subscript_target_codes(queue_other_w_l2_r), daemon=True).start()
    # 运行异步redis同步服务
    threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True).start()
    # 同步异步日志
    threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start()
    # 同步L2的异步日志
    l2_log.codeLogQueueDistributeManager.run_async()
    threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start()
    while True:
        time.sleep(5)
trade/huaxin/huaxin_trade_data_update.py
@@ -1,7 +1,6 @@
"""
华鑫交易数据更新
"""
import json
import logging
import queue
import threading
@@ -16,7 +15,7 @@
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager
from trade.huaxin.huaxin_trade_order_processor import HuaxinOrderEntity, TradeResultProcessor
from utils import huaxin_util, tool, init_data_util
from utils import huaxin_util
import concurrent.futures
trade_data_request_queue = queue.Queue()
@@ -185,9 +184,6 @@
# 运行
def run(queue_l1_trade_r_strategy_w_, queue_other_w_l2_r_):
    global queue_l1_trade_r_strategy_w, queue_other_w_l2_r
    queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_
    queue_other_w_l2_r = queue_other_w_l2_r_
def run():
    t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True)
    t1.start()