| | |
| | | # 获取空闲的位置数量 |
| | | def get_free_queue_count(self): |
| | | return len(self.queue_list) - len(self.distibuted_code_queue_dict.keys()) |
| | | |
| | | # 回调对象分配 |
| | | class CodeDataCallbackDistributeManager: |
| | | # queue_list |
| | | def __init__(self, callback_list: list): |
| | | flist = [] |
| | | for callback in callback_list: |
| | | flist.append((id(callback), callback)) |
| | | self.callback_list = flist |
| | | self.distibuted_code_callback_dict = {} |
| | | |
| | | # 获取可用的队列 |
| | | def get_available_callback(self): |
| | | distibuted_callbacks_ids = set() |
| | | for code in self.distibuted_code_callback_dict: |
| | | distibuted_callbacks_ids.add(self.distibuted_code_callback_dict[code][0]) |
| | | for callback in self.callback_list: |
| | | if callback[0] not in distibuted_callbacks_ids: |
| | | return callback |
| | | return None |
| | | |
| | | # 为代码分配队列 |
| | | def distribute_callback(self, code): |
| | | if code in self.distibuted_code_callback_dict: |
| | | return self.distibuted_code_callback_dict.get(code) |
| | | callback_info = self.get_available_callback() |
| | | if not callback_info: |
| | | raise Exception("无可用的回调对象") |
| | | self.distibuted_code_callback_dict[code] = callback_info |
| | | return callback_info |
| | | |
| | | # 获取代码分配的队列 |
| | | def get_distributed_callback(self, code): |
| | | if code in self.distibuted_code_callback_dict: |
| | | return self.distibuted_code_callback_dict.get(code)[1] |
| | | else: |
| | | return None |
| | | |
| | | def release_distribute_callback(self, code): |
| | | if code in self.distibuted_code_callback_dict: |
| | | self.distibuted_code_callback_dict.pop(code) |
| | | |
| | | # 获取空闲的位置数量 |
| | | def get_free_queue_count(self): |
| | | return len(self.callback_list) - len(self.distibuted_code_callback_dict.keys()) |
| | |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager |
| | | from huaxin_client.command_manager import L2ActionCallback |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import log, async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ |
| | |
| | | |
| | | |
| | | def test_add_codes(queue_r): |
| | | time.sleep(5) |
| | | time.sleep(10) |
| | | # if value: |
| | | # if type(value) == bytes: |
| | | # value = value.decode("utf-8") |
| | |
| | | ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)] |
| | | |
| | | queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) |
| | | time.sleep(1) |
| | | time.sleep(10) |
| | | while True: |
| | | try: |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | time.sleep(0.1) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | time.sleep(10) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list) -> None: |
| | | # def test_add_codes(): |
| | | # time.sleep(5) |
| | | # # if value: |
| | | # # if type(value) == bytes: |
| | | # # value = value.decode("utf-8") |
| | | # # data = json.loads(value) |
| | | # # _type = data["type"] |
| | | # # if _type == "listen_volume": |
| | | # # volume = data["data"]["volume"] |
| | | # # code = data["data"]["code"] |
| | | # # spi.set_code_special_watch_volume(code, volume) |
| | | # # elif _type == "l2_cmd": |
| | | # # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | # |
| | | # demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), |
| | | # ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] |
| | | # |
| | | # queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) |
| | | # time.sleep(1) |
| | | # |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | # time.sleep(0.1) |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, order_ipc_hosts: list, data_callbacks:list) -> None: |
| | | |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | |
| | | # 初始化 |
| | | order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) |
| | | transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) |
| | | data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) |
| | | l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, |
| | | transaction_queue_distribute_manager, market_queue, order_ipc_hosts) |
| | | transaction_queue_distribute_manager, market_queue, order_ipc_hosts, data_callback_distribute_manager) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_log() |
| | | # 测试 |
| | | # TODO 测试 |
| | | # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | |
| | | import queue |
| | | import threading |
| | | import time |
| | | |
| | | import constant |
| | | from huaxin_client import socket_util |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ |
| | |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts): |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager): |
| | | |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | self.data_callback_distribute_manager = data_callback_distribute_manager |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | | self.temp_log_queue_dict = {} |
| | | |
| | | self.filter_order_condition_dict = {} |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | | self.market_data_queue.put_nowait(data) |
| | | # self.market_data_queue.put_nowait(data) |
| | | code = data['securityID'] |
| | | callback = self.data_callback_distribute_manager.get_distributed_callback(code) |
| | | if callback: |
| | | callback.OnMarketData(code, data) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | if not self.transaction_queue_distribute_manager.get_distributed_queue(code): |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code) |
| | | |
| | | |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | |
| | | if code not in self.temp_log_queue_dict: |
| | | self.temp_log_queue_dict[code] = queue.Queue() |
| | | # 分配订单上传协议 |
| | | if not constant.is_windows(): |
| | | self.l2_order_upload_protocol.distribute_upload_host(code) |
| | | |
| | | if code not in self.upload_l2_data_task_dict: |
| | |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | self.l2_order_upload_protocol.release_distributed_upload_host(code) |
| | | self.data_callback_distribute_manager.release_distribute_callback(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | | self.temp_order_queue_dict.pop(code) |
| | |
| | | if temp_list: |
| | | # 上传数据 |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.__upload_l2_order_data(code, temp_list) |
| | | # self.__upload_l2_order_data(code, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time()) |
| | | temp_list = [] |
| | | |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | | self.l2_order_codes.discard(code) |
| | |
| | | temp_list.append(data) |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list) |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_transaction_queue_dict: |
| | |
| | | self.code_socket_client_dict = {} |
| | | self.rlock = threading.RLock() |
| | | context = zmq.Context() |
| | | if constant.is_windows(): |
| | | return |
| | | for host in self.ipchosts: |
| | | socket = context.socket(zmq.REQ) |
| | | socket.connect(host) |
| | |
| | | return None |
| | | |
| | | |
| | | __re_compute_threading_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | |
| | | |
| | | # L2数据列表 |
| | | # 范围(位置信息,可信的类型) |
| | | def get_l2_place_order_position(code, limit_up_price, datas): |
| | | order_info = get_order_info(code) |
| | | if not order_info: |
| | | # 暂无下单信息 |
| | | return None |
| | | return None, order_info |
| | | price = order_info[0] |
| | | volume = order_info[1] |
| | | exec_data = order_info[2] |
| | |
| | | if code in _place_order_info_dict: |
| | | _place_order_info_dict.pop(code) |
| | | __place_order_position[code] = real_place_index_info[0] |
| | | try: |
| | | __re_compute_threading_pool.submit(__recompute_for_slow_time, code, order_info, real_place_index_info[0]) |
| | | except: |
| | | pass |
| | | return real_place_index_info[0] |
| | | return real_place_index_info[0], order_info |
| | | else: |
| | | return None |
| | | return None, order_info |
| | | |
| | | |
| | | # TODO: 需要测试该方法 |
| | | # 因为L2数据慢的问题而重新计算 |
| | | def __recompute_for_slow_time(code, order_info, real_place_index): |
| | | def recompute_for_slow_time(code, order_info, real_place_index): |
| | | try: |
| | | # 计算当前时间是否满足时间条件 |
| | | now_time_str = tool.get_now_time_str() |
| | |
| | | findexes_info.sort(key=lambda x: x[0] - x[1]) |
| | | # 获取成功 |
| | | real_place_index = findexes_info[0][1] |
| | | async_log_util.info(logger_real_place_order_position, |
| | | f"真实下单位矫正:{code}-{real_place_index} 下单数据:{order_info}") |
| | | |
| | | return real_place_index |
| | | finally: |
| | | pass |
| | | else: |
| | | return |
| | | return None |
| | | pass |
| | | except Exception as e: |
| | | logger_real_place_order_position.exception(e) |
| | | return None |
| | | |
| | | |
| | | # 获取真实下单位置 |
| | |
| | | |
| | | import zmq |
| | | |
| | | import constant |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | |
| | |
| | | t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True) |
| | | t3.start() |
| | | # 接收订单hosts |
| | | if not constant.is_windows(): |
| | | self.__create_ipc_server_hosts(order_ipc_hosts) |
| | | |
| | | def get_active_count(self, type_): |
| | |
| | | local_latest_datas, local_today_canceled_buyno_map |
| | | import l2.l2_data_util |
| | | from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_debug, \ |
| | | logger_l2_not_buy_reasons |
| | | logger_l2_not_buy_reasons, logger_real_place_order_position |
| | | |
| | | from trade.trade_data_manager import CodeActualPriceProcessor, PlaceOrderCountManager |
| | | |
| | | from trade.trade_manager import TradeTargetCodeModeManager, AccountAvailableMoneyManager, MarketSituationManager |
| | | |
| | | import concurrent.futures |
| | | |
| | | |
| | | class L2DataManager: |
| | |
| | | __PlaceOrderCountManager = PlaceOrderCountManager() |
| | | __CodeNatureRecordManager = code_nature_analyse.CodeNatureRecordManager() |
| | | __MarketSituationManager = MarketSituationManager() |
| | | __re_compute_threading_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | |
| | | # 获取代码评分 |
| | | @classmethod |
| | |
| | | origin_datas.clear() |
| | | |
| | | @classmethod |
| | | def __recompute_real_order_index(cls, code, pre_real_order_index, order_info): |
| | | real_order_index = huaxin_delegate_postion_manager.recompute_for_slow_time(code, order_info, |
| | | pre_real_order_index) |
| | | if real_order_index: |
| | | exec_data = order_info[2] |
| | | order_begin_pos = cls.__get_order_begin_pos( |
| | | code) |
| | | if order_begin_pos and order_begin_pos.buy_exec_index == exec_data["index"]: |
| | | cls.set_real_place_order_index(code, real_order_index, order_begin_pos) |
| | | async_log_util.info(logger_real_place_order_position, |
| | | f"真实下单位矫正:{code}-{real_order_index} 下单数据:{order_info}") |
| | | |
| | | @classmethod |
| | | def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time): |
| | | now_time_str = tool.get_now_time_str() |
| | | if len(add_datas) > 0: |
| | |
| | | # cls.set_real_place_order_index(code, place_order_index, order_begin_pos.buy_single_index) |
| | | else: |
| | | # 获取下单位置 |
| | | place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, float( |
| | | place_order_index, order_info = huaxin_delegate_postion_manager.get_l2_place_order_position( |
| | | code, float( |
| | | gpcode_manager.get_limit_up_price(code)), add_datas) |
| | | if place_order_index: |
| | | order_begin_pos = cls.__get_order_begin_pos( |
| | | code) |
| | | cls.set_real_place_order_index(code, place_order_index, order_begin_pos) |
| | | try: |
| | | cls.__re_compute_threading_pool.submit( |
| | | cls.__recompute_real_order_index, code, place_order_index, order_info) |
| | | except: |
| | | pass |
| | | async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index) |
| | | except: |
| | | async_log_util.error(logger_l2_error, f"{code} 处理真实下单位置出错") |
| | |
| | | |
| | | if cls.__PauseBuyCodesManager.is_in_cache(code): |
| | | return False, True, f"该代码被暂停交易" |
| | | |
| | | if int(tool.get_now_time_str().replace(":", "")) >= 145700: |
| | | now_time_int = int(tool.get_now_time_str().replace(":", "")) |
| | | if now_time_int>= 145700: |
| | | return False, True, f"14:57后不能交易" |
| | | |
| | | |
| | | if 130100>=now_time_int>= 112900: |
| | | return False, True, f"11:29:00-13:01:00不能交易" |
| | | |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | |
| | | average_rate = cls.__Buy1PriceManager.get_average_rate(code) |
| | | if average_rate and average_rate <= 0.01 and tool.trade_time_sub(tool.get_now_time_str(), "10:30:00") >= 0: |
| | | return False, True, f"均价涨幅({average_rate})小于1%" |
| | | |
| | | |
| | | |
| | | total_data = local_today_datas.get(code) |
| | | order_begin_pos = cls.__get_order_begin_pos( |
| | |
| | | max_space_time_ms = 9*1000 |
| | | break |
| | | |
| | | |
| | | |
| | | if not threshold_num: |
| | | # 目标手数 |
| | | threshold_num = round(threshold_money / (limit_up_price * 100)) |
| | |
| | | |
| | | # place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) |
| | | |
| | | |
| | | # buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"]) |
| | | |
| | | # 可以触发买,当有涨停买信号时才会触发买 |
| | | trigger_buy = True |
| | | |
| | | |
| | | # 如果大单含有率大于50%,则时间囊括范围提高到3s |
| | | if max_num_set and origin_count: |
| | |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | order_ipc_hosts.append(f"ipc://l2order{i}.ipc") |
| | | |
| | | # 此处将L2的进程与策略进程合并 |
| | | # L2 |
| | | l2Process = multiprocessing.Process( |
| | | target=huaxin_client.l2_client.run, |
| | | args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts)) |
| | | l2Process.start() |
| | | # l2Process = multiprocessing.Process( |
| | | # target=huaxin_client.l2_client.run, |
| | | # args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts,huaxin_trade_server.my_l2_data_callback)) |
| | | # l2Process.start() |
| | | # 将L2的进程改为进程执行 |
| | | threading.Thread(target=huaxin_client.l2_client.run, args=( |
| | | queue_other_w_l2_r, order_queues, transaction_queues, market_queue, order_ipc_hosts, |
| | | huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | | l2Process.join() |
| | | # l2Process.join() |
| | | tradeProcess.join() |
| | | except Exception as e: |
| | | logging.exception(e) |
| | |
| | | |
| | | # 回调 |
| | | my_l2_data_callback = MyL2DataCallback() |
| | | my_l2_data_callbacks = [MyL2DataCallback() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT)] |
| | | my_trade_response = MyTradeResponse() |
| | | l2DataListenManager: L2DataListenManager = None |
| | | |