| | |
| | | |
| | | class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): |
| | | latest_codes_set = set() |
| | | codes_volume_and_price_dict = {} |
| | | |
| | | special_code_volume_for_order_dict = {} |
| | | # 已经订阅的代码 |
| | | subscripted_codes = set() |
| | |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.l2_data_upload_manager = l2_data_upload_manager |
| | | self.codes_volume_and_price_dict = {} |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | |
| | | code = d[0] |
| | | codes.add(code) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], d[2]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], float(d[2])) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | |
| | | def set_code_special_watch_volume(self, code, volume): |
| | | # 有效期为3s |
| | | # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3) |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | d = self.codes_volume_and_price_dict.get(code) |
| | | if d: |
| | | min_volume, limit_up_price = d[0], d[1] |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, |
| | | {volume, constant.SHADOW_ORDER_VOLUME}, time.time() + 3) |
| | | {volume, constant.SHADOW_ORDER_VOLUME}, |
| | | time.time() + 3) |
| | | |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}") |
| | | |
| | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: |
| | | # def test_add_codes(): |
| | | # time.sleep(5) |
| | | # # if value: |
| | | # # if type(value) == bytes: |
| | | # # value = value.decode("utf-8") |
| | | # # data = json.loads(value) |
| | | # # _type = data["type"] |
| | | # # if _type == "listen_volume": |
| | | # # volume = data["data"]["volume"] |
| | | # # code = data["data"]["code"] |
| | | # # spi.set_code_special_watch_volume(code, volume) |
| | | # # elif _type == "l2_cmd": |
| | | # # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | # |
| | | # demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), |
| | | # ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] |
| | | # |
| | | # queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) |
| | | # time.sleep(1) |
| | | # |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | # time.sleep(0.1) |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | |
| | | |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_log() |
| | | # 测试 |
| | | # threading.Thread(target=lambda: test_add_codes(),daemon=True).start() |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | |
| | | time.sleep(2) |
| | | |
| | | |
| | | def test(): |
| | | def test_add_codes(): |
| | | time.sleep(5) |
| | | # if value: |
| | | # if type(value) == bytes: |
| | | # value = value.decode("utf-8") |
| | | # data = json.loads(value) |
| | | # _type = data["type"] |
| | | # if _type == "listen_volume": |
| | | # volume = data["data"]["volume"] |
| | | # code = data["data"]["code"] |
| | | # spi.set_code_special_watch_volume(code, volume) |
| | | # elif _type == "l2_cmd": |
| | | # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | |
| | | demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), |
| | | ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] |
| | | |
| | | queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) |
| | | time.sleep(1) |
| | | |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | time.sleep(0.1) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | |
| | | |
| | | |
| | | |
| | | queue_r = multiprocessing.Queue() |
| | | order_queues = [] |
| | | transaction_queues = [] |
| | | market_queue = multiprocessing.Queue() |
| | | for i in range(20): |
| | | order_queues.append(multiprocessing.Queue()) |
| | | transaction_queues.append(multiprocessing.Queue()) |
| | | threading.Thread(target=test_add_codes).start() |
| | | |
| | | run(queue_r, order_queues, transaction_queues, market_queue) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # run(None, None, None) |
| | | # spi.set_codes_data([("000333", 12000)]) |
| | | input() |
| | |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | | self.filter_order_condition_dict = {} |
| | | self.upload_l2_data_task_dict = {} |
| | | self.l2_order_codes = set() |
| | | self.l2_transaction_codes = set() |
| | | |
| | | # 设置订单过滤条件 |
| | | def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None, |
| | |
| | | if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time: |
| | | self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price) |
| | | else: |
| | | self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes, special_volumes_expire_time] |
| | | self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes, |
| | | special_volumes_expire_time] |
| | | |
| | | # 过滤订单 |
| | | def __filter_order(self, item): |
| | |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | if not self.order_queue_distribute_manager.get_distributed_queue(code): |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | if not self.transaction_queue_distribute_manager.get_distributed_queue(code): |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code] = collections.deque() |
| | | threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() |
| | | threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() |
| | | |
| | | if code not in self.upload_l2_data_task_dict: |
| | | t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) |
| | | t1.start() |
| | | t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) |
| | | t2.start() |
| | | self.upload_l2_data_task_dict[code] = (t1, t2) |
| | | # 释放已经分配的队列 |
| | | |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | | self.temp_order_queue_dict.pop(code) |
| | | if code in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code].clear() |
| | | self.temp_transaction_queue_dict.pop(code) |
| | | if code in self.upload_l2_data_task_dict: |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait((code, datas, time.time())) |
| | |
| | | upload_queue = queue_info[1] |
| | | while True: |
| | | try: |
| | | if not q: |
| | | if len(q) > 0: |
| | | data = q.popleft() |
| | | # 前置数据处理,过滤掉无用的数据 |
| | | data = self.__filter_order(data) |
| | |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | temp_list.clear() |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_order_queue_dict: |
| | | self.l2_order_codes.discard(code) |
| | | break |
| | | self.l2_order_codes.add(code) |
| | | time.sleep(0.001) |
| | | |
| | | except: |
| | | pass |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | pass |
| | | |
| | |
| | | temp_list = [] |
| | | while True: |
| | | try: |
| | | if not q: |
| | | if len(q) > 0: |
| | | data = q.popleft() |
| | | data = self.__filter_transaction(data) |
| | | if data: |
| | |
| | | if temp_list: |
| | | # 上传数据 |
| | | self.__upload_l2_data(code, upload_queue, temp_list) |
| | | temp_list.clear() |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_transaction_queue_dict: |
| | | self.l2_transaction_codes.discard(code) |
| | | break |
| | | self.l2_transaction_codes.add(code) |
| | | time.sleep(0.002) |
| | | except: |
| | | pass |
| | | finally: |
| | | pass |
| | | |
| | | # 释放已经分配的队列 |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | | self.temp_order_queue_dict.pop(code) |
| | | if code in self.temp_transaction_queue_dict: |
| | | self.temp_transaction_queue_dict[code].clear() |
| | | self.temp_transaction_queue_dict.pop(code) |
| | | |
| | | |
| | | def add_target_code(code): |
| | |
| | | |
| | | |
| | | def __test(): |
| | | code = "002073" |
| | | # 分配数据 |
| | | pass |
| | | |
| | | |
| | |
| | | |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, market_queue_): |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, |
| | | market_queue_): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | logger_system.error(f"端口服务器:{9004} 启动失败") |
| | | |
| | | |
| | | if __name__ == '__main__1': |
| | | huaxin_client.l2_client.test() |
| | | |
| | | if __name__ == '__main__': |
| | | try: |
| | | logger_l2_trade.info("启动程序") |