2025-06-09 | Administrator | ![]() |
2025-06-09 | Administrator | ![]() |
2025-06-09 | Administrator | ![]() |
2025-06-09 | Administrator | ![]() |
api/low_suction_data_pusher.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
huaxin_client/l2_client_test.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_test.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
servers/data_server.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade/buy_radical/radical_buy_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
api/low_suction_data_pusher.py
New file @@ -0,0 +1,53 @@ """ 低吸数据推送 """ import json import requests import concurrent.futures __data_push_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5) __big_order_data_push_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) def push_limit_up_list(datas): """ 推送涨停列表信息 @param datas: @return: """ def push(): requests.post("http://127.0.0.1:9008/upload_limit_up_list", json=json.dumps(datas), timeout=3) __data_push_thread_pool.submit( lambda: push()) def push_big_order(datas): """ 推送大单信息 @param datas: @return: """ def push(): requests.post("http://127.0.0.1:9008/upload_big_order_datas", json=json.dumps(datas), timeout=3) __big_order_data_push_thread_pool.submit( lambda: push()) def push_block_in(datas): """ 推送板块流入信息 @param datas: @return: """ def push(): requests.post("http://127.0.0.1:9008/upload_block_in_datas", json=json.dumps(datas), timeout=3) __data_push_thread_pool.submit( lambda: push()) huaxin_client/l2_client_test.py
@@ -160,7 +160,7 @@ order_time = item[4] if self.accurate_buy: self.add_transaction_data_for_accurate(item, big_order_money_threshold=50e4) self.add_transaction_data_for_accurate(item, big_order_money_threshold=100e4) if not self.__latest_buy_order: # (买单号, 量, 金额, 时间, 最新成交价格) l2_test.py
@@ -11,6 +11,7 @@ import psutil import requests from api import low_suction_data_pusher from code_attribute import global_data_loader from huaxin_client import l2_client_test, l1_subscript_codes_manager from log_module.log import logger_local_huaxin_l2_transaction_big_order, logger_system, \ @@ -143,8 +144,14 @@ def __save_accurate_big_order(big_accurate_order_queue): while True: try: data = big_accurate_order_queue.get() logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}") datas = [] while not big_accurate_order_queue.empty(): data = big_accurate_order_queue.get() datas.append(data) if datas: low_suction_data_pusher.push_big_order(datas) for data in datas: logger_local_huaxin_l2_transaction_accurate_big_order.info(f"{data}") except: pass @@ -222,6 +229,10 @@ __upload_data("jingxuan_rank", json.dumps(fins)) __upload_data("jingxuan_rank_out", json.dumps(fouts)) __upload_codes_in_money() try: low_suction_data_pusher.push_block_in(in_list) except: pass except Exception as e: logging.exception(e) finally: servers/data_server.py
@@ -10,6 +10,7 @@ import constant import inited_data from api import low_suction_data_pusher from code_attribute.gpcode_manager import BlackListCodeManager, HumanRemoveForbiddenManager from l2.huaxin import huaxin_target_codes_manager from l2.l2_transaction_data_manager import HuaXinBuyOrderManager @@ -1207,6 +1208,13 @@ lambda: request_new_blocks_codes(update_new_block_plates, new_block_codes.keys())) except Exception as e: logger_debug.exception(e) # 将数据推送至其他项目 try: low_suction_data_pusher.push_limit_up_list(result_list_) except: pass self.__kplDataManager.save_data(type_, result_list_) except Exception as e: logger_debug.exception(e) trade/buy_radical/radical_buy_data_manager.py
@@ -141,19 +141,19 @@ limit_up_price_money_list = [] pre_limit_up_price_money_list = [] deal_order_list = BigOrderDealManager().get_total_buy_data_list(code) deal_order_ids = set() exclude_deal_order_ids = set() if deal_order_list: for x in deal_order_list: if opened_time and int(opened_time.replace(":", "")) > int( l2_huaxin_util.convert_time(x[3]).replace(":", "")): # 开板时间之前 continue deal_order_ids.add(x[0]) # if opened_time and int(opened_time.replace(":", "")) > int( # l2_huaxin_util.convert_time(x[3]).replace(":", "")): # # 炸板之前成交的订单不计算在内 # continue exclude_deal_order_ids.add(x[0]) for info in buy_money_list: if info[1] != limit_up_price: continue limit_up_price_money_list.append(info[0]) if info[2] in deal_order_ids: if info[2] in exclude_deal_order_ids: continue pre_limit_up_price_money_list.append((info[0], info[2])) total_deal_buy_money += info[0] @@ -163,7 +163,7 @@ self.set_big_order_threshold(code, threshold_money) self.__already_total_deal_big_order_money[code] = (total_deal_buy_money, pre_limit_up_price_money_list) async_log_util.info(logger_l2_radical_buy_data, f"之前的大买单:{code}-{total_deal_buy_money}-{total_deal_buy_money_info_list}") f"开板时间:{opened_time} 之前的大买单:{code}-{total_deal_buy_money}-{total_deal_buy_money_info_list}") # 处理大卖单 pre_limit_up_price_money_sell_list = [] if True: @@ -243,6 +243,19 @@ """ if code in self.__already_total_deal_big_order_money: return self.__already_total_deal_big_order_money[code][0] return 0 def get_deal_big_order_money_of_lack(self, code, exclude_order_ids): """ 获取没有在本地L2成交队列中的成交大单 @param exclude_order_ids: @param code: @return: """ if code in self.__already_total_deal_big_order_money: # [(金额,价格,订单号)] money_info_list = self.__already_total_deal_big_order_money[code][1] return sum([x[0] for x in money_info_list if x[2] not in exclude_order_ids]) return 0 def get_sell_deal_big_order_money(self, code, threshold_money): @@ -1782,7 +1795,7 @@ TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT = round( code_volumn_manager.CodeVolumeManager().get_radical_buy_refer_volume(code, limit_up_price) * limit_up_price / 1e8, 2) * 2.5 2) * 3 if tool.is_ge_code(code): TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT *= 1.5 return TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT * threshold_money_per_order