L2与交易之间的通信采用队列方式/实现L撤单比例动态计算
| | |
| | | return cls._instance |
| | | |
| | | @classmethod |
| | | def init(cls, trade_action_callback: TradeActionCallback, pipe_l2, queue_strategy_trade_read: multiprocessing.Queue): |
| | | def init(cls, trade_action_callback: TradeActionCallback, queue_strategy_trade_read: multiprocessing.Queue): |
| | | cls.action_callback = trade_action_callback |
| | | cls.pipe_l2 = pipe_l2 |
| | | cls.queue_strategy_trade_read = queue_strategy_trade_read |
| | | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json, sk=None): |
| | | async_log_util.info(logger_local_huaxin_contact_debug, f"process_command: {result_json}") |
| | | # 查看是否是设置L2的代码 |
| | | if _type == CLIENT_TYPE_CMD_L2: |
| | | cls.pipe_l2.send( |
| | | json.dumps({"type": "set_l2_codes", "data": result_json["data"]})) |
| | | return |
| | | |
| | | try: |
| | | data = result_json["data"] |
| | | request_id = result_json.get('request_id') |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import os |
| | | import queue |
| | | import threading |
| | |
| | | api.Init() |
| | | |
| | | |
| | | def __receive_from_pipe_trade(pipe): |
| | | def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | value = pipe.recv() |
| | | value = queue_trade_w_l2_r.get() |
| | | if value: |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def run(pipe_trade, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None: |
| | | def run(queue_trade_w_l2_r:multiprocessing.Queue, _pipe_strategy, _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | log.close_print() |
| | | if pipe_trade is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True) |
| | | if queue_trade_w_l2_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True) |
| | | t1.start() |
| | | if _pipe_strategy is not None: |
| | | global pipe_strategy |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import contextlib |
| | | import json |
| | | import logging |
| | | import mmap |
| | | import queue |
| | | import random |
| | | import threading |
| | |
| | | ret = api.ReqOrderInsert(req_field, self.req_id) |
| | | if ret != 0: |
| | | raise Exception('ReqOrderInsert fail, ret[%d]' % ret) |
| | | if l2pipe is not None: |
| | | l2pipe.send(json.dumps({"type": "listen_volume", "data": {"code": code, "volume": count}}).encode('utf-8')) |
| | | async_log_util.info(logger_trade, f"{code}华鑫本地真实下单结束") |
| | | return |
| | | |
| | |
| | | pOrderField.OrderRef, pOrderField.OrderLocalID, |
| | | pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID, |
| | | pOrderField.OrderStatus, pOrderField.InsertTime)) |
| | | if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown: |
| | | if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_Unknown: |
| | | if queue_trade_w_l2_r is not None: |
| | | queue_trade_w_l2_r.put_nowait( |
| | | json.dumps({"type": "listen_volume", "data": {"code": pOrderField.SecurityID, |
| | | "volume": pOrderField.VolumeTotalOriginal}}).encode( |
| | | 'utf-8')) |
| | | else: |
| | | order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID, |
| | | "orderLocalID": pOrderField.OrderLocalID, |
| | | "direction": pOrderField.Direction, "orderSysID": pOrderField.OrderSysID, |
| | |
| | | addr, port = constant.SERVER_IP, constant.SERVER_PORT |
| | | |
| | | |
| | | def run(trade_response_: TradeResponse = None, pipe_l2=None, queue_strategy_trade_write_=None, |
| | | def run(trade_response_: TradeResponse = None, queue_trade_w_l2_r_: multiprocessing.Queue = None, |
| | | queue_strategy_trade_write_=None, |
| | | queue_strategy_trade_read=None): |
| | | try: |
| | | logger_system.info("交易进程ID:{}", os.getpid()) |
| | | logger_system.info(f"trade 线程ID:{tool.get_thread_id()}") |
| | | __init_trade_data_server() |
| | | global l2pipe |
| | | l2pipe = pipe_l2 |
| | | global queue_trade_w_l2_r |
| | | queue_trade_w_l2_r = queue_trade_w_l2_r_ |
| | | |
| | | global queue_strategy_trade_write |
| | | queue_strategy_trade_write = queue_strategy_trade_write_ |
| | |
| | | |
| | | global tradeCommandManager |
| | | tradeCommandManager = command_manager.TradeCommandManager() |
| | | tradeCommandManager.init(MyTradeActionCallback(), l2pipe, queue_strategy_trade_read) |
| | | tradeCommandManager.init(MyTradeActionCallback(), queue_strategy_trade_read) |
| | | logger_system.info("华鑫交易服务启动") |
| | | tradeCommandManager.run() |
| | | except Exception as e: |
| | |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from ths import client_manager |
| | | import constant |
| | | from trade.deal_big_money_manager import DealOrderNoManager |
| | | from trade.trade_manager import AccountAvailableMoneyManager |
| | | from utils import global_util, tool |
| | | import threading |
| | |
| | | AccountAvailableMoneyManager() |
| | | |
| | | # 9点25之前删除所有代码 |
| | | if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0 or True: |
| | | if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0: |
| | | # 删除L2监听代码 |
| | | gpcode_manager.clear_listen_codes() |
| | | # 删除首板代码 |
| | |
| | | LCancelBigNumComputer().clear() |
| | | # 清除D撤数据 |
| | | DCancelBigNumComputer().clear() |
| | | # 清除大单成交数据 |
| | | DealOrderNoManager().clear() |
| | | |
| | | |
| | | # 每日初始化 |
| | |
| | | |
| | | # 设置板块涨停数量(除开自己) |
| | | @classmethod |
| | | def set_block_limit_up_count(cls, code, count): |
| | | cls.__block_limit_up_count_dict[code] = count |
| | | def set_block_limit_up_count(cls, reason_codes_dict): |
| | | for reason in reason_codes_dict: |
| | | codes = reason_codes_dict[reason] |
| | | for c in codes: |
| | | cls.__block_limit_up_count_dict[c] = len(codes) - 1 |
| | | |
| | | # 设置大单成交金额➗固定m值比例 |
| | | @classmethod |
| | | def set_big_num_deal_rate(cls, code, rate): |
| | | cls.__big_num_deal_rate_dict[code] = rate |
| | | logger_l2_l_cancel.debug(code, f"设置大单成交金额比值:{rate}") |
| | | |
| | | |
| | | |
| | | # 计算成交位置之后的大单(特定笔数)的撤单比例 |
| | |
| | | MAX_COUNT = 10 |
| | | watch_indexes = set() |
| | | total_num = 0 |
| | | thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100) |
| | | # thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | # threshold_num = thresh_hold_money // (float(gpcode_manager.get_limit_up_price(code)) * 100) |
| | | for i in range(start_index, end_index): |
| | | data = total_datas[i] |
| | | val = data['val'] |
| | |
| | | if left_count > 0: |
| | | total_num += val['num'] * left_count |
| | | watch_indexes.add(i) |
| | | if len(watch_indexes) >= MAX_COUNT or total_num >= threshold_num: |
| | | if len(watch_indexes) >= MAX_COUNT: |
| | | break |
| | | # 保存数据 |
| | | l2_log.l_cancel_debug(code, f"设置成交位临近撤单监控范围:{watch_indexes} 计算范围:{start_index}-{end_index}") |
| | |
| | | |
| | | # 开始撤单 |
| | | def start_cancel(self, code, buy_no, total_datas, buy_order_no_map, local_operate_map, m_val_num): |
| | | # TODO 暂时注释掉G撤 |
| | | return False,"暂时不执行G撤" |
| | | thresh_num = int(m_val_num * 1) |
| | | place_order_index = self.__SecondCancelBigNumComputer.get_real_place_order_index_cache(code) |
| | | if place_order_index is None: |
| | |
| | | __last_buy_single_dict = {} |
| | | __TradeBuyQueue = transaction_progress.TradeBuyQueue() |
| | | __latest_process_order_unique_keys = {} |
| | | __latest_process_not_order_unique_keys = {} |
| | | __latest_process_not_order_unique_keys_count = {} |
| | | # 初始化 |
| | | __TradePointManager = l2_data_manager.TradePointManager() |
| | | __SecondCancelBigNumComputer = SecondCancelBigNumComputer() |
| | |
| | | return False, True, f"尚未获取到当前成交价" |
| | | if float(limit_up_price) - float(trade_price) > 0.00001: |
| | | # 计算信号起始位置到当前的手数 |
| | | buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(code) |
| | | buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos( |
| | | code) |
| | | num_operate_map = local_today_num_operate_map.get(code) |
| | | total_num = 0 |
| | | for i in range(buy_single_index, total_data[-1]["index"] + 1): |
| | |
| | | if compute_end_index < compute_start_index: |
| | | return |
| | | |
| | | unique_key = f"{compute_start_index}-{compute_end_index}" |
| | | if cls.__latest_process_not_order_unique_keys.get(code) == unique_key: |
| | | unique_key = f"{code}-{compute_start_index}-{compute_end_index}" |
| | | if cls.__latest_process_not_order_unique_keys_count.get(unique_key) and cls.__latest_process_not_order_unique_keys_count.get(unique_key) > 2: |
| | | async_log_util.error(logger_l2_error, |
| | | f"重复处理数据:code-{code} start_index-{compute_start_index} end_index-{compute_end_index}") |
| | | return |
| | | cls.__latest_process_not_order_unique_keys[code] = unique_key |
| | | if unique_key not in cls.__latest_process_not_order_unique_keys_count: |
| | | cls.__latest_process_not_order_unique_keys_count[unique_key] = 0 |
| | | cls.__latest_process_not_order_unique_keys_count[unique_key] += 1 |
| | | |
| | | _start_time = tool.get_now_timestamp() |
| | | total_datas = local_today_datas[code] |
| | |
| | | if cls.__last_buy_single_dict.get(code) == _index: |
| | | has_single = None |
| | | _index = None |
| | | |
| | | if _index == 106: |
| | | print("进入调试") |
| | | buy_single_index = _index |
| | | if has_single: |
| | | cls.__last_buy_single_dict[code] = buy_single_index |
| | |
| | | # compute_data_count 用于计算的l2数据数量 |
| | | @classmethod |
| | | def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index): |
| | | |
| | | second_930 = 9 * 3600 + 30 * 60 + 0 |
| | | # 倒数100条数据查询 |
| | | datas = local_today_datas[code] |
| | |
| | | continue |
| | | |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | |
| | | # 寻找前面continue_count-1个涨停买 |
| | | # for j in range(start_index - 1, -1, -1): |
| | | # if datas[j]["val"] |
| | | if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]): |
| | | if start is None: |
| | | start = i |
| | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | |
| | | |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w, pipe_l1, pipe_l2, ptl2_l2, psl2_l2, queue_strategy_w_trade_r): |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | # |
| | | # 启动L2订阅服务 |
| | | t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client", |
| | | args=(ptl2_l2, psl2_l2, huaxin_trade_server.my_l2_data_callback), |
| | | args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback), |
| | | daemon=True) |
| | | t1.start() |
| | | # |
| | |
| | | # 策略与server间的通信 |
| | | pss_server, pss_strategy = multiprocessing.Pipe() |
| | | |
| | | # 交易与l2之间的通信 |
| | | ptl2_trade, ptl2_l2 = multiprocessing.Pipe() |
| | | # 交易写L2读 |
| | | queue_trade_w_l2_r = multiprocessing.Queue() |
| | | # 策略与l2之间的通信 |
| | | psl2_strategy, psl2_l2 = multiprocessing.Pipe() |
| | | |
| | |
| | | |
| | | # 交易进程 |
| | | tradeProcess = multiprocessing.Process( |
| | | target=lambda: huaxin_client.trade_client.run(None, ptl2_trade, queue_strategy_r_trade_w, queue_strategy_w_trade_r)) |
| | | target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r)) |
| | | tradeProcess.start() |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, ptl2_l2, psl2_l2, queue_strategy_w_trade_r) |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | |
| | | # @unittest.skip("跳过此单元测试") |
| | | def test_trade(self): |
| | | threading.Thread(target=async_log_util.run_sync,daemon=True).start() |
| | | |
| | | code = "000010" |
| | | code = "002640" |
| | | clear_trade_data(code) |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) |
| | |
| | | from utils import global_util, tool |
| | | from code_attribute import gpcode_manager |
| | | from log_module import log, log_analyse, log_export |
| | | from l2 import code_price_manager, l2_data_util, l2_data_manager_new |
| | | from l2 import code_price_manager, l2_data_util, l2_data_manager_new, cancel_buy_strategy |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer |
| | | from output.limit_up_data_filter import IgnoreCodeManager |
| | | from third_data import kpl_util, kpl_data_manager, kpl_api, block_info |
| | |
| | | code_price_manager.Buy1PriceManager().set_limit_up_time(code, limit_up_time) |
| | | add_codes = codes_set - self.__latest_limit_up_codes_set |
| | | self.__latest_limit_up_codes_set = codes_set |
| | | |
| | | if limit_up_reasons: |
| | | # 统计涨停原因的票的个数 |
| | | limit_up_reason_code_dict = {} |
| | | for code in limit_up_reasons: |
| | | b = limit_up_reasons[code] |
| | | if b not in limit_up_reason_code_dict: |
| | | limit_up_reason_code_dict[b] = set() |
| | | limit_up_reason_code_dict[b].add(code) |
| | | cancel_buy_strategy.LCancelRateManager.set_block_limit_up_count(limit_up_reason_code_dict) |
| | | |
| | | |
| | | if add_codes: |
| | | for code in add_codes: |
| | | # 根据涨停原因判断是否可以买 |
| | |
| | | before_blocks_dict): |
| | | pass |
| | | # TODO 测试暂时注释 |
| | | # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, f"涨停原因({ limit_up_reasons.get(code)})不是老大撤单", "板块撤") |
| | | l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, |
| | | f"涨停原因({limit_up_reasons.get(code)})不是老大撤单", |
| | | "板块撤") |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list_) |
| | |
| | | from l2 import l2_data_util, l2_data_source_util |
| | | |
| | | |
| | | class DealComputeProgressManager: |
| | | # 成交进度计算 |
| | | class DealOrderNoManager: |
| | | __db = 2 |
| | | __redisManager = redis_manager.RedisManager(2) |
| | | __deal_compute_progress_cache = {} |
| | | __deal_orderno_cache = {} |
| | | __last_progress = {} |
| | | __instance = None |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls.__instance: |
| | | cls.__instance = super(DealComputeProgressManager, cls).__new__(cls, *args, **kwargs) |
| | | cls.__instance = super(DealOrderNoManager, cls).__new__(cls, *args, **kwargs) |
| | | cls.__load_datas() |
| | | return cls.__instance |
| | | |
| | |
| | | def __load_datas(cls): |
| | | __redis = cls.__get_redis() |
| | | try: |
| | | keys = RedisUtils.keys(__redis, "deal_compute_info-*") |
| | | keys = RedisUtils.keys(__redis, "deal_orderno-*") |
| | | for k in keys: |
| | | code = k.split("-")[-1] |
| | | val = RedisUtils.get(__redis, k) |
| | | val = RedisUtils.smembers(__redis, k) |
| | | val = json.loads(val) |
| | | tool.CodeDataCacheUtil.set_cache(cls.__deal_compute_progress_cache, code, val) |
| | | val = set(val) |
| | | tool.CodeDataCacheUtil.set_cache(cls.__deal_orderno_cache, code, val) |
| | | |
| | | finally: |
| | | RedisUtils.realse(__redis) |
| | | |
| | | # 获取成交计算进度 |
| | | def __get_deal_compute_progress(self, code): |
| | | val = RedisUtils.get(self.__get_redis(), f"deal_compute_info-{code}") |
| | | if val is None: |
| | | return -1, 0 |
| | | val = json.loads(val) |
| | | return val[0], val[1] |
| | | # 添加订单号 |
| | | def __add_orderno(self, code, orderno): |
| | | RedisUtils.sadd_async(self.__db, f"deal_orderno-{code}", orderno) |
| | | RedisUtils.expire_async(self.__db, f"deal_orderno-{code}", tool.get_expire()) |
| | | |
| | | # 获取成交计算进度 |
| | | def get_deal_compute_progress_cache(self, code): |
| | | cache_result = tool.CodeDataCacheUtil.get_cache(self.__deal_compute_progress_cache, code) |
| | | if cache_result[0]: |
| | | return cache_result[1] |
| | | return -1, 0 |
| | | # 移除订单号 |
| | | def __remove_orderno(self, code, orderno): |
| | | RedisUtils.srem_async(self.__db, f"deal_orderno-{code}", orderno) |
| | | RedisUtils.expire_async(self.__db, f"deal_orderno-{code}", tool.get_expire()) |
| | | |
| | | # 清除数据 |
| | | def clear(self): |
| | | self.__deal_orderno_cache.clear() |
| | | keys = RedisUtils.keys(self.__get_redis(), "deal_orderno-*") |
| | | for k in keys: |
| | | RedisUtils.delete_async(self.__db, k) |
| | | |
| | | def remove_orderno(self, code, orderno): |
| | | if code in self.__deal_orderno_cache: |
| | | if orderno in self.__deal_orderno_cache[code]: |
| | | self.__deal_orderno_cache[code].discard(orderno) |
| | | self.__remove_orderno(code, orderno) |
| | | |
| | | def add_orderno(self, code, orderno): |
| | | if code not in self.__deal_orderno_cache: |
| | | self.__deal_orderno_cache[code] = set() |
| | | self.__deal_orderno_cache[code].add(orderno) |
| | | self.__add_orderno(code, orderno) |
| | | |
| | | # 设置成交进度 |
| | | def __set_deal_compute_progress(self, code, index, money): |
| | | tool.CodeDataCacheUtil.set_cache(self.__deal_compute_progress_cache, code, (index, money)) |
| | | RedisUtils.setex_async(self.__db, f"deal_compute_info-{code}", tool.get_expire(), json.dumps((index, money))) |
| | | |
| | | # 设置成交进度 |
| | | def set_trade_progress(self, code, progress, total_data, local_today_num_operate_map): |
| | | if self.__last_progress.get(code) == progress: |
| | | return |
| | | self.__last_progress[code] = progress |
| | | # 计算从开始位置到成交位置 |
| | | c_index, deal_num = self.get_deal_compute_progress_cache(code) |
| | | process_index = c_index |
| | | for i in range(c_index + 1, progress): |
| | | data = total_data[i] |
| | | val = data['val'] |
| | | process_index = i |
| | | # 是否有大单 |
| | | if not l2_data_util.is_big_money(val): |
| | | continue |
| | | if l2_data_util.L2DataUtil.is_limit_up_price_buy(val): |
| | | # 是否已经取消 |
| | | cancel_data = self.__get_cancel_data(code, data, local_today_num_operate_map) |
| | | if cancel_data is None: |
| | | deal_num += val["num"] * data["re"] |
| | | self.__save_traded_index(code, data["index"]) |
| | | |
| | | self.__set_deal_compute_progress(code, process_index, deal_num) |
| | | |
| | | def get_deal_big_money_num(self, code): |
| | | if code in self.__deal_compute_progress_cache: |
| | | return self.__deal_compute_progress_cache.get(code)[1] |
| | | compute_index, num = self.get_deal_compute_progress_cache(code) |
| | | return num |
| | | |
| | | def __get_cancel_data(self, code, buy_data, local_today_num_operate_map): |
| | | val = buy_data['val'] |
| | | cancel_datas = local_today_num_operate_map.get( |
| | | "{}-{}-{}".format(val["num"], "1", val["price"])) |
| | | if cancel_datas: |
| | | for cancel_data in cancel_datas: |
| | | buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data, |
| | | local_today_num_operate_map) |
| | | if buy_index == buy_data["index"]: |
| | | return cancel_data |
| | | return None |
| | | |
| | | def __save_traded_index(self, code, index): |
| | | RedisUtils.sadd(self.__get_redis(), f"deal_indexes-{code}", index) |
| | | RedisUtils.expire(self.__get_redis(), f"deal_indexes-{code}", tool.get_expire()) |
| | | |
| | | def __get_traded_indexes(self, code): |
| | | return RedisUtils.smembers(self.__get_redis(), f"deal_indexes-{code}") |
| | | |
| | | # 获取成交的索引 |
| | | def get_traded_indexes(self, code): |
| | | return self.__get_traded_indexes(code) |
| | | def get_deal_nums(self, code, orderno_map: dict): |
| | | if code not in self.__deal_orderno_cache: |
| | | return 0 |
| | | total_num = 0 |
| | | for orderno in self.__deal_orderno_cache[code]: |
| | | if orderno_map: |
| | | if orderno in orderno_map: |
| | | data = orderno_map[orderno] |
| | | total_num += data["val"]["num"] * data["re"] |
| | | return total_num |
| | | |
| | | |
| | | def get_deal_big_money_num(code): |
| | | val = DealComputeProgressManager().get_deal_compute_progress_cache(code) |
| | | return val[1] |
| | | val = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code)) |
| | | return val |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | pass |
| | |
| | | import constant |
| | | import inited_data |
| | | import outside_api_command_manager |
| | | from code_attribute import gpcode_manager, code_volumn_manager |
| | | from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader |
| | | from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from huaxin_client import l1_subscript_codes_manager, l2_data_transform_protocol |
| | |
| | | from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress, \ |
| | | l2_data_log |
| | | from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer, \ |
| | | GCancelBigNumComputer, SecondCancelBigNumComputer |
| | | GCancelBigNumComputer, SecondCancelBigNumComputer, LCancelRateManager |
| | | from l2.huaxin import huaxin_target_codes_manager |
| | | from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager |
| | | from l2.l2_data_manager_new import L2TradeDataProcessor |
| | | from l2.l2_data_util import L2DataUtil |
| | | from log_module import async_log_util, log_export |
| | | from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \ |
| | | hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue, \ |
| | |
| | | from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils |
| | | from third_data.kpl_data_manager import KPLDataManager |
| | | from third_data.kpl_util import KPLDataType |
| | | from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util |
| | | from trade import deal_big_money_manager, current_price_process_manager, trade_huaxin, trade_manager, l2_trade_util, \ |
| | | l2_trade_factor |
| | | from trade.deal_big_money_manager import DealOrderNoManager |
| | | |
| | | from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \ |
| | | huaxin_trade_record_manager |
| | |
| | | buyno_map = l2_data_util.local_today_buyno_map.get(code) |
| | | async_log_util.info(hx_logger_l2_transaction, |
| | | f"{code}的买入订单号数量:{len(buyno_map.keys()) if buyno_map else 0}") |
| | | if buyno_map is None: |
| | | buyno_map = {} |
| | | buy_progress_index = None |
| | | for i in range(len(datas) - 1, -1, -1): |
| | | d = datas[i] |
| | |
| | | async_log_util.info(hx_logger_l2_transaction, f"{code}成交进度:{buyno_map[buy_no]['index']}") |
| | | buy_progress_index = buyno_map[buy_no]["index"] |
| | | break |
| | | # ------统计保存成交大单订单号------ |
| | | origin_ordernos = set() |
| | | for d in datas: |
| | | buy_no = f"{d[6]}" |
| | | origin_ordernos.add(buy_no) |
| | | big_money_count = 0 |
| | | for buy_no in origin_ordernos: |
| | | # 查询是否为大单 |
| | | if buy_no in buyno_map: |
| | | data = buyno_map[buy_no] |
| | | val = data["val"] |
| | | if not L2DataUtil.is_limit_up_price_buy(val): |
| | | continue |
| | | if l2_data_util.is_big_money(val): |
| | | # 涨停买的大单 |
| | | big_money_count += 1 |
| | | DealOrderNoManager().add_orderno(code, buy_no) |
| | | if big_money_count > 0: |
| | | total_deal_nums = DealOrderNoManager().get_deal_nums(code, buyno_map) |
| | | thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price: |
| | | rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2) |
| | | LCancelRateManager().set_big_num_deal_rate(code, rate) |
| | | |
| | | if buy_progress_index is not None: |
| | | # 获取执行位时间 |
| | |
| | | m_val = L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | m_val_num = int(m_val / (float(limit_up_price) * 100)) |
| | | |
| | | # 处理成交大单 |
| | | DealOrderNoManager().remove_orderno(code, f"{order_no}") |
| | | total_deal_nums = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code)) |
| | | thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | if limit_up_price: |
| | | rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2) |
| | | LCancelRateManager().set_big_num_deal_rate(code, rate) |
| | | try: |
| | | need_cancel, msg = cls.__GCancelBigNumComputer.start_cancel(code, f"{order_no}", |
| | | l2_data_util.local_today_datas.get( |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(int(tool.to_time_str(1684132912).replace(":", ""))) |
| | | # code = "000536" |
| | | # TradeServerProcessor.l2_transaction(code, |
| | | # [('000536', 2.08, 20000, 143436460, 2014, 31126359, 16718956, 31126358, '1')]) |
| | | while True: |
| | | time.sleep(10) |
| | | code = "002640" |
| | | global_data_loader.init() |
| | | l2_data_util.load_l2_data(code, False, False) |
| | | # DealOrderNoManager().add_orderno(code, "18972810") |
| | | # DealOrderNoManager().add_orderno(code, "18972232") |
| | | # DealOrderNoManager().add_orderno(code, "18972434") |
| | | total_deal_nums = DealOrderNoManager().get_deal_nums(code, l2_data_util.local_today_buyno_map.get(code)) |
| | | print("成交大单手数", total_deal_nums) |
| | | thresh_hold_money = l2_trade_factor.L2PlaceOrderParamsManager.get_base_m_val(code) |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price: |
| | | rate = round(total_deal_nums / (thresh_hold_money // (float(limit_up_price) * 100)), 2) |
| | | LCancelRateManager().set_big_num_deal_rate(code, rate) |
| | | print("撤单比例", LCancelRateManager().get_cancel_rate(code)) |
| | | |