| | |
| | | import os |
| | | import threading |
| | | |
| | | from task import task_manager |
| | | |
| | | logger_system.info("程序启动Pre:{}", os.getpid()) |
| | | |
| | | from db import redis_manager_delegate as redis_manager |
| | | import huaxin_client.trade_client |
| | | import huaxin_client.l2_client |
| | | import huaxin_client.l1_client |
| | | from huaxin_client import l2_market_client |
| | | |
| | | # 交易服务 |
| | | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | from servers import server_util, huaxin_trade_api_server, huaxin_trade_server, server |
| | | from servers import server_util, huaxin_trade_server, server |
| | | |
| | | |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | def run_strategy(queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr): |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, |
| | | trade_ipc_addr): |
| | | """ |
| | | 策略进程 |
| | | |
| | | @param pipe_server: |
| | | @param queue_strategy_r_trade_w_: |
| | | @param queue_l1_w_strategy_r_: |
| | | @param queue_strategy_w_trade_r_: |
| | | @param queue_strategy_w_trade_r_for_read_: |
| | | @param queue_l1_trade_r_strategy_w_: |
| | | @param queue_l1_trade_w_strategy_r_: |
| | | @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址) |
| | | @return: |
| | | """ |
| | |
| | | # 初始化参数 |
| | | server.global_data_loader.init() |
| | | |
| | | # 数据服务 |
| | | t1 = threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True) |
| | | t1.start() |
| | | # 开启数据服务器 |
| | | threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True).start() |
| | | |
| | | # 交易接口服务 |
| | | t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", |
| | | args=(pipe_server, queue_other_w_l2_r, queue_l1_trade_r_strategy_w_), |
| | | daemon=True) |
| | | t1.start() |
| | | # |
| | | # redis后台服务 |
| | | t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) |
| | | t1.start() |
| | | |
| | | # 运行数据监听服务 |
| | | threading.Thread(target=task_manager.run_data_listener, name="task_manager", |
| | | args=(queue_other_w_l2_r, queue_l1_w_strategy_r_), |
| | | daemon=True).start() |
| | | # |
| | | # 启动华鑫交易服务 |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_, |
| | | queue_strategy_w_trade_r_for_read_, |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr) |
| | | trade_ipc_addr) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | logger_l2_trade.info("启动程序") |
| | | logger_system.info("启动程序--------") |
| | | log.close_print() |
| | | # 策略与server间的通信 |
| | | pss_server, pss_strategy = multiprocessing.Pipe() |
| | | |
| | | # L2读其他写 |
| | | queue_other_w_l2_r = multiprocessing.Queue() |
| | | # l1 |
| | | queue_l1_w_strategy_r = multiprocessing.Queue() |
| | | queue_l1_r_strategy_w = multiprocessing.Queue() |
| | | # l1交易 |
| | | queue_l1_trade_w_strategy_r = multiprocessing.Queue() |
| | | queue_l1_trade_r_strategy_w = multiprocessing.Queue() |
| | | |
| | | # 交易读策略写 |
| | | queue_strategy_w_trade_r = multiprocessing.Queue() |
| | |
| | | # 下单,撤单ipc地址 |
| | | order_ipc_addr, cancel_order_ipc_addr = "ipc://trade_order.ipc", "ipc://trade_cancel_order.ipc" |
| | | |
| | | # 托管环境下不创建 |
| | | # serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,)) |
| | | # serverProcess.start() |
| | | logger_system.info("主进程ID:{}", os.getpid()) |
| | | |
| | | # L1订阅数据 |
| | |
| | | # cpu_indexes = [i for i in range(23, 30)] |
| | | # psutil.Process(os.getpid()).cpu_affinity(cpu_indexes) |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w, |
| | | queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr)) |
| | | run_strategy(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, |
| | | (order_ipc_addr, cancel_order_ipc_addr)) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | |
| | | __process_l1_data_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | __updating_jx_blocks_codes = set() |
| | | |
| | | @classmethod |
| | | def sell(cls, datas): |
| | | rules = TradeRuleManager().list_can_excut_rules_cache(types=[TradeRuleManager.TYPE_SELL]) |
| | | excuted_rule_ids = set() |
| | | if rules: |
| | | for d in datas: |
| | | code = d[0] |
| | | # 格式 (代码,现价,涨幅,量,更新时间,买1价格,买1量) |
| | | buy1_volume = d[6] |
| | | buy1_price = d[5] |
| | | if buy1_volume: |
| | | for r in rules: |
| | | # 生效时间 |
| | | if r.code == code: |
| | | # --------判断是否可以执行-------- |
| | | can_excute = False |
| | | if round(float(buy1_price), 2) <= round(float(r.buy1_price), 2): |
| | | # 价格已经触发 |
| | | if r.buy1_volume: |
| | | if r.buy1_volume >= buy1_volume: |
| | | # 量价触发 |
| | | can_excute = True |
| | | async_log_util.info(logger_trade, f"触发卖规则:量触发{buy1_volume}/{r.buy1_volume}") |
| | | else: |
| | | can_excute = True |
| | | async_log_util.info(logger_trade, f"触发卖规则:价格触发{buy1_price}/{r.buy1_price}") |
| | | # 价格触发 |
| | | # 获取价格类型 |
| | | if not can_excute: |
| | | continue |
| | | |
| | | # 请求卖出锁 |
| | | TradeRuleManager().require_sell_lock(r.id_) |
| | | try: |
| | | if r.id_ in excuted_rule_ids: |
| | | continue |
| | | excuted_rule_ids.add(r.id_) |
| | | # 获取最新的执行状况 |
| | | r = TradeRuleManager().get_by_id(r.id_) |
| | | if r.excuted: |
| | | continue |
| | | # 提交卖 |
| | | limit_down_price = gpcode_manager.get_limit_down_price(code) |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | huaxin_sell_util.start_sell(code, r.sell_volume, r.sell_price_type, limit_up_price, |
| | | limit_down_price, |
| | | buy1_price) |
| | | TradeRuleManager().excuted(r.id_) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | finally: |
| | | TradeRuleManager().release_sell_lock(r.id_) |
| | | |
| | | # 保存现价 |
| | | @classmethod |
| | | def __save_l1_current_price(cls, datas): |
| | |
| | | else: |
| | | cls.__process_l1_data_thread_pool.submit( |
| | | lambda: HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)) |
| | | |
| | | @classmethod |
| | | def set_l1_trade_codes_info(cls, data_json): |
| | | data = data_json["data"] |
| | | request_id = data_json["request_id"] |
| | | datas = data["data"] |
| | | cls.__save_l1_current_price(datas) |
| | | cls.sell(datas) |
| | | |
| | | @classmethod |
| | | def l2_order(cls, code, _datas, timestamp): |
| | |
| | | def trading_order_canceled(cls, code, order_no): |
| | | pass |
| | | |
| | | @classmethod |
| | | def test_sell(cls): |
| | | # (代码, 现价, 涨幅, 量, 更新时间, 买1价格, 买1量) |
| | | datas = [("600571", 12.14, 9.96, 100000000, tool.get_now_time_str(), 12.14, 10210), |
| | | ("600571", 12.04, 9.96, 100000000, tool.get_now_time_str(), 12.04, 10210)] |
| | | cls.sell(datas) |
| | | |
| | | |
| | | def clear_invalid_client(): |
| | | logger_system.info(f"trade_server clear_invalid_client 线程ID:{tool.get_thread_id()}") |
| | |
| | | pass |
| | | finally: |
| | | time.sleep(2) |
| | | |
| | | |
| | | def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue): |
| | | logger_system.info(f"trade_server __recv_pipe_l1 线程ID:{tool.get_thread_id()}") |
| | | if queue_l1_w_strategy_r is not None: |
| | | while True: |
| | | try: |
| | | val = queue_l1_w_strategy_r.get() |
| | | if val: |
| | | val = json.loads(val) |
| | | # print("收到来自L1的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | timestamp = val.get("time") |
| | | # 大于10s的数据放弃处理 |
| | | if type_ == "set_target_codes": |
| | | async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据") |
| | | if time.time() * 1000 - timestamp > 10 * 1000: |
| | | continue |
| | | TradeServerProcessor.set_target_codes(val) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | |
| | | |
| | | # 排得太远撤单 |
| | |
| | | logger_debug.exception(e) |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | |
| | | def __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r: multiprocessing.Queue): |
| | | logger_system.info(f"trade_server __recv_pipe_l1_trade 线程ID:{tool.get_thread_id()}") |
| | | if queue_l1_trade_w_strategy_r is not None: |
| | | while True: |
| | | try: |
| | | val = queue_l1_trade_w_strategy_r.get() |
| | | if val: |
| | | async_log_util.info(logger_local_huaxin_l1_trade_info, f"客户端接收:{val}") |
| | | val = json.loads(val) |
| | | # print("收到来自L1的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | if type_ == "upload_l1_trade_datas": |
| | | # 处理专为交易提供的L1数据 |
| | | TradeServerProcessor.set_l1_trade_codes_info(val) |
| | | async_log_util.info(logger_local_huaxin_l1_trade_info, val) |
| | | |
| | | except Exception as e: |
| | | logger_local_huaxin_l1_trade_info.exception(e) |
| | | logging.exception(e) |
| | | |
| | | |
| | | class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack): |
| | |
| | | result_by_volume = radical_buy_strategy.process_limit_up_active_buy_deal(code, transaction_datas) |
| | | async_log_util.info(logger_l2_radical_buy, f"量买入结果判断:{code}, 结果:{result_by_volume} 板块:{buy_blocks}") |
| | | in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() |
| | | buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b),in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] |
| | | buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b), |
| | | in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] |
| | | if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE: |
| | | if tool.get_now_time_as_int() < 93200: |
| | | radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) |
| | |
| | | |
| | | threading.Thread(target=run_pending, daemon=True).start() |
| | | l2_data_util.load_l2_data_all(True) |
| | | # L2成交信号回调 |
| | | L2TradeSingleDataManager.set_callback(MyL2TradeSingleCallback()) |
| | | # 加载自由流通量 |
| | | global_data_loader.load_zyltgb_volume_from_db() |
| | | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, |
| | | queue_l1_trade_w_strategy_r, trade_ipc_addr): |
| | | def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr): |
| | | """ |
| | | @param queue_strategy_r_trade_w: |
| | | @param queue_l1_w_strategy_r: |
| | | @param queue_strategy_w_trade_r: |
| | | @param queue_strategy_w_trade_r_for_read: |
| | | @param queue_l1_trade_w_strategy_r: |
| | | @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址) |
| | | @return: |
| | | """ |
| | |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, trade_ipc_addr) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 监听l1交易那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1_trade(queue_l1_trade_w_strategy_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 下单距离太远取消订单 |
| | | t1 = threading.Thread(target=lambda: __cancel_buy_for_too_far(), daemon=True) |
| | | t1.start() |
| | | |
| | | # 同步异步日志 |
| | | t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True) |
| | | t1.start() |
| | | |
| | | # 同步L2的异步日志 |
| | | l2_log.codeLogQueueDistributeManager.run_async() |
| | | |
| | | t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True) |
| | | t1.start() |
| | | |
| | | logger_system.info("create TradeServer") |
| | | # 清理无用的客户端 |
| | | t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) |
| | | t1.start() |
| | | |
| | | logger_system.info("create TradeServer") |
| | | laddr = "0.0.0.0", 10008 |
| | | try: |
| | | tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle |
New file |
| | |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | from db import redis_manager_delegate as redis_manager |
| | | from l2 import l2_log |
| | | from l2.huaxin import huaxin_target_codes_manager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug |
| | | from servers.huaxin_trade_server import TradeServerProcessor |
| | | from third_data import block_info |
| | | from trade.huaxin import huaxin_trade_data_update |
| | | from trade.huaxin.huaxin_trade_api import ClientSocketManager |
| | | from utils import tool |
| | | |
| | | |
| | | def __listen_l1_target_codes(queue_l1_w_strategy_r: multiprocessing.Queue): |
| | | logger_system.info(f"__listen_l1_target_codes 线程ID:{tool.get_thread_id()}") |
| | | if queue_l1_w_strategy_r is not None: |
| | | while True: |
| | | try: |
| | | val = queue_l1_w_strategy_r.get() |
| | | if val: |
| | | val = json.loads(val) |
| | | # print("收到来自L1的数据:", val["type"]) |
| | | # 处理数据 |
| | | type_ = val["type"] |
| | | timestamp = val.get("time") |
| | | # 大于10s的数据放弃处理 |
| | | if type_ == "set_target_codes": |
| | | async_log_util.info(logger_l2_codes_subscript, f"策略接收到数据") |
| | | if time.time() * 1000 - timestamp > 10 * 1000: |
| | | continue |
| | | TradeServerProcessor.set_target_codes(val) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | |
| | | |
| | | def __listen_l2_subscript_target_codes(queue_other_w_l2_r: multiprocessing.Queue): |
| | | """ |
| | | 监听L2订阅目标代码 |
| | | @param queue_other_w_l2_r: |
| | | @return: |
| | | """ |
| | | logger_system.info("启动读取L2订阅代码队列") |
| | | while True: |
| | | try: |
| | | _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop() |
| | | if _datas: |
| | | times = _datas[0] |
| | | datas = _datas[1] |
| | | request_id = _datas[2] |
| | | logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}", request_id, len(datas)) |
| | | # 只处理20s内的数据 |
| | | if time.time() - times < 20: |
| | | # 获取涨停列表中的数据 |
| | | # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间) |
| | | if not datas: |
| | | # 没有数据需要处理 |
| | | continue |
| | | |
| | | # 再次获取代码 |
| | | codes = [d[0] for d in datas] |
| | | for code in codes: |
| | | block_info.init_code(code) |
| | | root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, |
| | | "data": datas} |
| | | queue_other_w_l2_r.put_nowait(json.dumps(root_data)) |
| | | # 如果在9:25-9:29 需要加载板块 |
| | | # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"): |
| | | # for d in datas: |
| | | # threading.Thread(target=lambda: KPLCodeJXBlockManager().load_jx_blocks(d[0], |
| | | # gpcode_manager.get_price( |
| | | # d[0]), |
| | | # float(d[2]), |
| | | # KPLLimitUpDataRecordManager.get_current_reasons()), |
| | | # daemon=True).start() |
| | | # time.sleep(0.2) |
| | | logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas)) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_l2_codes_subscript.exception(e) |
| | | finally: |
| | | time.sleep(0.01) |
| | | |
| | | |
| | | def run_data_listener(queue_other_w_l2_r, queue_l1_w_strategy_r): |
| | | """ |
| | | 运行数据监听器 |
| | | @param queue_other_w_l2_r: |
| | | @return: |
| | | """ |
| | | # 交易数据更新任务 |
| | | huaxin_trade_data_update.run() |
| | | |
| | | # 接收来自L1的数据 |
| | | threading.Thread(target=lambda: __listen_l1_target_codes(queue_l1_w_strategy_r), daemon=True).start() |
| | | |
| | | # 接收L2订阅 |
| | | threading.Thread(target=lambda: __listen_l2_subscript_target_codes(queue_other_w_l2_r), daemon=True).start() |
| | | # 运行异步redis同步服务 |
| | | threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True).start() |
| | | # 同步异步日志 |
| | | threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True).start() |
| | | # 同步L2的异步日志 |
| | | l2_log.codeLogQueueDistributeManager.run_async() |
| | | threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True).start() |
| | | while True: |
| | | time.sleep(5) |
| | |
| | | """ |
| | | 华鑫交易数据更新 |
| | | """ |
| | | import json |
| | | import logging |
| | | import queue |
| | | import threading |
| | |
| | | from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager |
| | | |
| | | from trade.huaxin.huaxin_trade_order_processor import HuaxinOrderEntity, TradeResultProcessor |
| | | from utils import huaxin_util, tool, init_data_util |
| | | from utils import huaxin_util |
| | | import concurrent.futures |
| | | |
| | | trade_data_request_queue = queue.Queue() |
| | |
| | | |
| | | |
| | | # 运行 |
| | | def run(queue_l1_trade_r_strategy_w_, queue_other_w_l2_r_): |
| | | global queue_l1_trade_r_strategy_w, queue_other_w_l2_r |
| | | queue_l1_trade_r_strategy_w = queue_l1_trade_r_strategy_w_ |
| | | queue_other_w_l2_r = queue_other_w_l2_r_ |
| | | def run(): |
| | | t1 = threading.Thread(target=lambda: __read_update_task_queue(), daemon=True) |
| | | t1.start() |