From 4231dc5dba02568b70e2caa4a0fe7c6455223c5c Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期二, 10 十二月 2024 10:13:08 +0800 Subject: [PATCH] 订阅 --- trade/current_price_process_manager.py | 20 ++++++--- trade/buy_radical/radical_buy_data_manager.py | 31 ++++++++++++--- l2/huaxin/huaxin_target_codes_manager.py | 2 third_data/code_plate_key_manager.py | 1 servers/huaxin_trade_server.py | 38 +++++++------------ l2/cancel_buy_strategy.py | 20 ++++++++- 6 files changed, 71 insertions(+), 41 deletions(-) diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py index d1d3cb1..717c977 100644 --- a/l2/cancel_buy_strategy.py +++ b/l2/cancel_buy_strategy.py @@ -261,8 +261,9 @@ """ watch_indexes = self.__watch_indexes_cache.get(code) if not watch_indexes: - return False,None, "鏃犲ぇ鍗曠洃鍚�" + return False, None, "鏃犲ぇ鍗曠洃鍚�" total_datas = local_today_datas.get(code) + need_compute = False for i in range(start_index, end_index + 1): data = total_datas[i] val = data["val"] @@ -276,9 +277,22 @@ if buy_index is None: continue if buy_index in watch_indexes: - return True, data, f"澶у崟鎾ゅ崟({buy_index})" - return False,None, "鏃犲ぇ鍗曟挙鍗�" + need_compute = True + break + if need_compute: + cancel_count = 0 + cancel_data = None + for index in watch_indexes: + cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code, index, + total_datas, + local_today_canceled_buyno_map.get( + code)) + if cancel_data: + cancel_count += 1 + if cancel_count >= len(watch_indexes): + return True, cancel_data, f"澶у崟鎾ゅ崟({watch_indexes})" + return False, None, "鏃犲ぇ鍗曟挙鍗�" def __clear_data(self, code): if code in self.__watch_indexes_cache: diff --git a/l2/huaxin/huaxin_target_codes_manager.py b/l2/huaxin/huaxin_target_codes_manager.py index 136c45d..ce3e62d 100644 --- a/l2/huaxin/huaxin_target_codes_manager.py +++ b/l2/huaxin/huaxin_target_codes_manager.py @@ -14,7 +14,7 @@ from third_data import kpl_data_manager, kpl_api, history_k_data_manager from third_data.code_plate_key_manager import RealTimeKplMarketData from trade import current_price_process_manager -from utils import tool, global_util, init_data_util +from utils import tool, global_util redisManager = redis_manager.RedisManager(4) l2_codes_queue = queue.Queue(maxsize=1000) diff --git a/servers/huaxin_trade_server.py b/servers/huaxin_trade_server.py index ec24b7f..8cebfae 100644 --- a/servers/huaxin_trade_server.py +++ b/servers/huaxin_trade_server.py @@ -3,7 +3,6 @@ import hashlib import json import logging -import multiprocessing import queue import random import socket @@ -11,7 +10,6 @@ import threading import time -import requests import schedule import constant @@ -19,7 +17,8 @@ from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer from code_attribute import gpcode_manager, code_volumn_manager, global_data_loader, zyltgb_util from code_attribute.code_l1_data_manager import L1DataManager -from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, GreenListCodeManager +from code_attribute.gpcode_manager import CodePrePriceManager, CodesNameManager, \ + WantBuyCodesManager from huaxin_client import l2_data_transform_protocol from huaxin_client.trade_transform_protocol import TradeResponse from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, transaction_progress, \ @@ -38,17 +37,17 @@ from log_module import async_log_util, log_export from log_module.log import hx_logger_contact_debug, hx_logger_trade_callback, \ hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \ - logger_system, logger_trade, logger_local_huaxin_l1_trade_info, logger_l2_codes_subscript, logger_l2_radical_buy + logger_system, logger_trade, logger_l2_radical_buy from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager, RealTimeKplMarketData from third_data.history_k_data_util import JueJinApi from trade import l2_trade_util, \ trade_data_manager, trade_constant, buy_open_limit_up_strategy -from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy, block_special_codes_manager +from trade.buy_radical import radical_buy_data_manager, radical_buy_strategy from trade.buy_money_count_setting import BuyMoneyAndCountSetting, BuyMoneyUtil from trade.huaxin import huaxin_trade_api as trade_api, huaxin_trade_api, huaxin_trade_data_update, \ - huaxin_trade_record_manager, huaxin_sell_util + huaxin_trade_record_manager from api.outside_api_command_callback import OutsideApiCommandCallback from trade.huaxin.huaxin_trade_record_manager import DelegateRecordManager from trade.order_statistic import DealAndDelegateWithBuyModeDataManager @@ -406,18 +405,6 @@ finally: cls.__updating_jx_blocks_codes.discard(code_) - def pull_pre_deal_big_orders(code_): - response_data = requests.get( - "http://127.0.0.1:9005/get_big_buy_order_list?code=" + code_) - r_str = response_data.text - response_data = json.loads(r_str) - if response_data["code"] == 0: - datas = response_data["data"] - logger_debug.info(f"鎷夊彇鐐告澘鍓嶆垚浜ょ殑澶у崟锛歿code_}-{datas}") - if datas: - RadicalBigOrderThresholdManager().set_big_deal_order_list(code_, datas, gpcode_manager.get_limit_up_price_as_num(code_)) - - time_str = f"{data['dataTimeStamp']}" if time_str.startswith("9"): @@ -490,14 +477,17 @@ L2MarketSellManager().set_current_total_sell_data(code, time_str, data["totalAskVolume"] * data["avgAskPrice"], data["totalAskVolume"], sell_1_info, data.get("sell")) + # 鐐告澘 + if sell_1_info and sell_1_info[1] > 0: + if RadicalBigOrderThresholdManager().is_need_update(code): + # 鐐告澘鏇存柊鏁版嵁 + cls.__sell_thread_pool.submit( + lambda: radical_buy_data_manager.pull_pre_deal_big_orders(code)) if data["sell"] and len(data["sell"]) > 1 and data["sell"][1][1] > 0: # 鍑虹幇鍗栦簩 radical_buy_strategy.clear_data(code, force=True) - if RadicalBigOrderThresholdManager().is_need_update(code): - # 鐐告澘鏇存柊鏁版嵁 - cls.__sell_thread_pool.submit( - lambda: pull_pre_deal_big_orders(code)) + # 璁剧疆鎵叆鏁版嵁 RadicalCodeMarketInfoManager().set_market_info(code, time_str, round(float(limit_up_price), 2), data["buy"][0], @@ -748,7 +738,7 @@ # 鍒ゆ柇浠婃棩鎵叆鐨勪唬鐮佹暟閲忔槸鍚﹀ぇ浜庨槇鍊� radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting() MAX_COUNT = 4 if radical_buy_setting is None else radical_buy_setting[0] - if not GreenListCodeManager().is_in_cache(code): + if not WantBuyCodesManager().is_in_cache(code): # 鍔犵豢涓嶅垽鏂澘鍧楁槸鍚︽垚浜� if len(deal_codes) >= MAX_COUNT: async_log_util.info(logger_l2_radical_buy, f"鎵叆鎴愪氦浠g爜涓暟澶т簬{MAX_COUNT}涓細{code}-{deal_codes}") @@ -807,7 +797,7 @@ buy_blocks_with_money = [(b, RealTimeKplMarketData.get_jx_block_in_money(b), in_blocks.index(b) if b in in_blocks else -1) for b in buy_blocks] if result_by_volume[0] != radical_buy_strategy.BUY_MODE_NONE: - if not GreenListCodeManager().is_in_cache(code): + if not WantBuyCodesManager().is_in_cache(code): # 鍔犵豢鐨勪笉闇�瑕佸垽鏂涓嬮棶棰� if tool.get_now_time_as_int() < 93100: radical_buy_data_manager.ExcludeIndexComputeCodesManager.add_code(code) diff --git a/third_data/code_plate_key_manager.py b/third_data/code_plate_key_manager.py index 0c4314e..1ba3ba9 100644 --- a/third_data/code_plate_key_manager.py +++ b/third_data/code_plate_key_manager.py @@ -398,6 +398,7 @@ # 杩囨护鍑烘潵涓哄悓涓�涓澘鍧楀氨鍙畻1涓暟閲� fb = BlockMapManager().filter_blocks({data[1]}) if blocks & fb: + blocks.add(data[1]) continue for b in fb: diff --git a/trade/buy_radical/radical_buy_data_manager.py b/trade/buy_radical/radical_buy_data_manager.py index c4fbc16..5ebc05d 100644 --- a/trade/buy_radical/radical_buy_data_manager.py +++ b/trade/buy_radical/radical_buy_data_manager.py @@ -5,11 +5,13 @@ import logging import time +import requests + import constant import l2_data_util from code_attribute import code_nature_analyse, code_volumn_manager, gpcode_manager from code_attribute.code_l1_data_manager import L1DataManager -from code_attribute.gpcode_manager import GreenListCodeManager +from code_attribute.gpcode_manager import WantBuyCodesManager from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from l2.huaxin import l2_huaxin_util @@ -92,7 +94,7 @@ average_money = sum(limit_up_price_money_list) // len(limit_up_price_money_list) self.set_big_order_threshold(code, average_money) self.__already_total_deal_big_order_money[code] = total_deal_money - async_log_util.info(logger_l2_radical_buy_data, f"涔嬪墠鐨勫ぇ鍗曪細{code}-{total_deal_money_info_list}") + async_log_util.info(logger_l2_radical_buy_data, f"涔嬪墠鐨勫ぇ鍗曪細{code}-{total_deal_money}-{total_deal_money_info_list}") def get_big_order_threshold(self, code): """ @@ -205,8 +207,8 @@ @param total_sell_volume: 鎬诲崠閲� @return: 鏄惁鍙互涔�, 鍘熷洜 """ - if GreenListCodeManager().is_in_cache(code): - return True, "宸插姞缁�" + if WantBuyCodesManager().is_in_cache(code): + return True, "宸插姞鎯�" k_format = None if not constant.TEST: @@ -965,8 +967,8 @@ keys_, info = cls.get_code_blocks(code) if not keys_: return set(), "娌¤幏鍙栧埌鏉垮潡" - if GreenListCodeManager().is_in_cache(code): - return set(keys_), "宸插姞缁�" + if WantBuyCodesManager().is_in_cache(code): + return set(keys_), "宸插姞鎯�" # 鑾峰彇绮鹃�夊噣娴佸叆 jx_in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() @@ -1355,5 +1357,22 @@ return round(0 - 0.44 * fvolume_rate + 0.722, 3) +def pull_pre_deal_big_orders(code_): + """ + 鎷夊彇璁㈤槄涔嬪墠鎴愪氦鐨勫ぇ鍗� + @param code_: + @return: + """ + response_data = requests.get( + "http://127.0.0.1:9005/get_big_buy_order_list?code=" + code_) + r_str = response_data.text + response_data = json.loads(r_str) + if response_data["code"] == 0: + datas = response_data["data"] + if datas: + RadicalBigOrderThresholdManager().set_big_deal_order_list(code_, datas, + gpcode_manager.get_limit_up_price_as_num(code_)) + + if __name__ == '__main__': pass diff --git a/trade/current_price_process_manager.py b/trade/current_price_process_manager.py index a9c32a3..7653318 100644 --- a/trade/current_price_process_manager.py +++ b/trade/current_price_process_manager.py @@ -11,15 +11,19 @@ import constant from code_attribute import gpcode_manager from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant +from trade.buy_radical import radical_buy_data_manager from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, RadicalBuyDataManager from utils import tool, import_util from trade import trade_manager, l2_trade_util, trade_constant from trade.trade_data_manager import CodeActualPriceProcessor, RadicalBuyDealCodesManager +import concurrent.futures trade_gui = import_util.import_lib("trade.trade_gui") __actualPriceProcessor = CodeActualPriceProcessor() + +__pre_big_order_deal_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) latest_add_codes = set() @@ -179,17 +183,19 @@ # 鏄惁鍜屼笂娆′竴鏍� try: add_code_set = set(add_code_list) - # global latest_add_codes - # if not latest_add_codes: - # latest_add_codes = set() + global latest_add_codes + if not latest_add_codes: + latest_add_codes = set() # # 鍒ゆ柇璁剧疆鐨勪唬鐮佹槸鍚︾浉鍚� # dif1 = latest_add_codes - add_code_set - # dif2 = add_code_set - latest_add_codes - # if dif1 or dif2: + dif2 = add_code_set - latest_add_codes + if dif2: + # 鏂板鍔犵殑璁㈤槄闇�瑕佹媺鍙栦箣鍓嶇殑澶у崟 + for c in dif2: + __pre_big_order_deal_thread_pool.submit(radical_buy_data_manager.pull_pre_deal_big_orders, c) if True: - global latest_add_codes async_log_util.info(logger_l2_codes_subscript, - f"({request_id})棰勫鐞嗘柊澧炶闃呬唬鐮侊細{add_code_set - latest_add_codes}") + f"({request_id})棰勫鐞嗘柊澧炶闃呬唬鐮侊細{dif2}") latest_add_codes = add_code_set add_datas = [] for d in add_code_list: -- Gitblit v1.8.0