From a9681c7b03a6fde559bf77ef65917d6d4db5d84c Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期一, 17 七月 2023 13:22:57 +0800 Subject: [PATCH] 华鑫适配 --- trade/trade_result_manager.py | 11 ++- l2/huaxin/huaxin_delegate_postion_manager.py | 47 +++++++++++++++ trade/trade_manager.py | 2 trade/huaxin/trade_server.py | 31 ++++++++- trade/trade_huaxin.py | 19 +++--- trade/huaxin/huaxin_trade_api.py | 8 ++ l2/cancel_buy_strategy.py | 19 +++++ l2/l2_data_manager_new.py | 11 ++- 8 files changed, 122 insertions(+), 26 deletions(-) diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py index 01f9b10..19efcbf 100644 --- a/l2/cancel_buy_strategy.py +++ b/l2/cancel_buy_strategy.py @@ -674,12 +674,23 @@ return int(val) return None + @classmethod + def clear(cls, code=None): + if code: + cls.__getRedis().delete(f"d_cancel_real_order_index-{code}") + else: + keys = cls.__getRedis().keys("d_cancel_real_order_index-*") + if keys: + for k in keys: + cls.__getRedis().delete(k) + # 璁剧疆鎴愪氦浣� @classmethod def set_trade_progress(cls, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value, limit_up_price): # 绂讳笅鍗曟墽琛屼綅2鍒嗛挓鍐呯殑鏈夋晥 - if tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME: + if tool.trade_time_sub(total_data[-1]['val']['time'], + total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME: return False, "瓒呰繃D鎾ゅ畧鎶ゆ椂闂�" real_order_index = cls.__get_real_order_index(code) @@ -709,7 +720,11 @@ # 璁剧疆鐪熷疄鐨勪笅鍗曚綅缃� @classmethod def set_real_order_index(cls, code, index): - pass + cls.__set_real_order_index(code, index) + + @classmethod + def cancel_success(cls, code): + cls.clear(code) # ---------------------------------L鎾�------------------------------- diff --git a/l2/huaxin/huaxin_delegate_postion_manager.py b/l2/huaxin/huaxin_delegate_postion_manager.py new file mode 100644 index 0000000..e45245f --- /dev/null +++ b/l2/huaxin/huaxin_delegate_postion_manager.py @@ -0,0 +1,47 @@ +""" +鍗庨懌濮旀墭瀹為檯浣嶇疆绠$悊 +""" +import time + +from log_module.log import hx_logger_trade_debug + +_place_order_info_dict = {} + + +# 涓嬪崟 +def place_order(code, price, volume, exec_index): + _place_order_info_dict[code] = (price, volume, exec_index, time.time()) + + +# 鑾峰彇涓嬪崟淇℃伅 +def get_order_info(code): + info = _place_order_info_dict.get(code) + if info and time.time() - info[3] > 3: + # 闂撮殧3s浠ヤ笂灏辨棤鏁堜簡 + info = None + _place_order_info_dict.pop(code) + return info + + +# L2鏁版嵁鍒楄〃 +def get_l2_place_order_position(code, datas): + order_info = get_order_info(code) + if not order_info: + # 鏆傛棤涓嬪崟淇℃伅 + return None + price = order_info[0] + volume = order_info[1] + exec_index = order_info[2] + # 鑾峰彇閲� + for d in datas: + if d["val"]["num"] != volume: + continue + if abs(float(price) - float(d["val"]["price"])) >= 0.01: + continue + # 涓嶅彲鑳芥瘮涓嬪崟鎵ц浣嶇疆杩樻棭 + if d["index"] <= exec_index: + continue + # 鑾峰彇鍒颁簡涓嬪崟浣嶇疆 + hx_logger_trade_debug.info(f"鐪熷疄涓嬪崟浣嶇疆锛歿code}-{d['index']}") + return d["index"] + return None diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py index 9dc5807..9bde625 100644 --- a/l2/l2_data_manager_new.py +++ b/l2/l2_data_manager_new.py @@ -4,7 +4,7 @@ from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, industry_codes_sort, \ limit_up_time_manager, global_data_loader, gpcode_manager import constant -from l2.huaxin import l2_huaxin_util +from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager from utils import global_util, ths_industry_util, tool import l2_data_util from db import redis_manager @@ -13,7 +13,7 @@ trade_result_manager, first_code_score_manager from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \ - L2LimitUpSellStatisticUtil + L2LimitUpSellStatisticUtil, DCancelBigNumComputer from l2.l2_data_manager import L2DataException, TradePointManager from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \ local_latest_datas @@ -231,10 +231,13 @@ _start_index = local_today_datas[code][-1]["index"] + 1 datas = l2_huaxin_util.get_format_l2_datas(code, datas, gpcode_manager.get_limit_up_price(code), _start_index) + # 鑾峰彇涓嬪崟浣嶇疆 + place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas) + if place_order_index: + DCancelBigNumComputer.set_real_order_index(code, place_order_index) + __start_time = round(t.time() * 1000) - print("鏍煎紡鍖朙2鏁版嵁鎴愬姛", code) cls.process_add_datas(code, datas, 0, __start_time) - print("huaxin L2鏁版嵁澶勭悊鎴愬姛", code) except Exception as e: print("huaxin L2鏁版嵁澶勭悊寮傚父", code, str(e)) logging.exception(e) diff --git a/trade/huaxin/huaxin_trade_api.py b/trade/huaxin/huaxin_trade_api.py index 26f1a50..2497d12 100644 --- a/trade/huaxin/huaxin_trade_api.py +++ b/trade/huaxin/huaxin_trade_api.py @@ -90,6 +90,14 @@ def heart(cls, rid): cls.active_client_dict[rid] = time.time() + @classmethod + def del_invalid_clients(cls): + # 娓呴櫎闀挎椂闂存棤蹇冭烦鐨勫鎴风閫氶亾 + for k in cls.active_client_dict.keys(): + if time.time() - cls.active_client_dict[k] > 20: + # 蹇冭烦鏃堕棿闂撮殧20s浠ヤ笂瑙嗕负鏃犳晥 + cls.del_client(k) + TRADE_DIRECTION_BUY = 1 TRADE_DIRECTION_SELL = 2 diff --git a/trade/huaxin/trade_server.py b/trade/huaxin/trade_server.py index b2344a1..ccfd1f5 100644 --- a/trade/huaxin/trade_server.py +++ b/trade/huaxin/trade_server.py @@ -5,17 +5,18 @@ import random import socket import socketserver +import threading import time from code_attribute import gpcode_manager from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager -from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer +from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer from l2.huaxin import huaxin_target_codes_manager from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \ hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue from trade import deal_big_money_manager -from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server +from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api trade_data_request_queue = queue.Queue() @@ -173,8 +174,10 @@ code) if True: if buy_progress_index is not None: - logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code, buy_progress_index) - buy_time = l2_data_util.local_today_datas.get(code)[buy_progress_index]["val"]["time"] + logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code, + buy_progress_index) + buy_time = l2_data_util.local_today_datas.get(code)[buy_progress_index]["val"][ + "time"] HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index, buy_progress_index, l2_data_util.local_today_datas.get( @@ -185,13 +188,18 @@ l2_data_util.local_today_datas.get( code)) - # 璁$畻澶у崟鎴愪氦棰� deal_big_money_manager.set_trade_progress(code, buy_progress_index, l2_data_util.local_today_datas.get( code), l2_data_util.local_today_num_operate_map.get( code)) + DCancelBigNumComputer.set_trade_progress(code, buy_progress_index, buy_exec_index, + l2_data_util.local_today_datas.get( + code), + l2_data_util.local_today_num_operate_map.get( + code), 1000 * 10000, + gpcode_manager.get_limit_up_price(code)) except Exception as e: hx_logger_l2_transaction.exception(e) @@ -232,8 +240,21 @@ super().finish() +def clear_invalid_client(): + while True: + try: + huaxin_trade_api.ClientSocketManager.del_invalid_clients() + except: + pass + finally: + time.sleep(2) + + def run(): print("create TradeServer") + t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) + t1.start() + laddr = "0.0.0.0", 10008 tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle tcpserver.serve_forever() diff --git a/trade/trade_huaxin.py b/trade/trade_huaxin.py index 699e5b4..2c9611b 100644 --- a/trade/trade_huaxin.py +++ b/trade/trade_huaxin.py @@ -9,6 +9,7 @@ from log_module.log import logger_juejin_trade from trade.huaxin import huaxin_trade_api from utils import tool, huaxin_util +from l2 import huaxin __context_dict = {} @@ -52,17 +53,15 @@ # 閫氳繃閲忎笅鍗�,杩斿洖(浠g爜锛岃处鍙稩D锛岃鍗曞彿) -def order_volume(code, price, count): - if not constant.TRADE_ENABLE: - return +def order_volume(code, price, count, last_data_index): + if code.find("00") != 0 and code.find("60") != 0: raise Exception("鍙敮鎸�00寮�澶翠笌60寮�澶寸殑浠g爜涓嬪崟") - code_str = code - if code[0:2] == '00': - code_str = f"SZSE.{code}" - elif code[0:2] == '60': - code_str = f"SHSE.{code}" start_time = time.time() + # 淇濆瓨涓嬪崟淇℃伅 + huaxin.huaxin_delegate_postion_manager.place_order(code, price, count, last_data_index) + if not constant.TRADE_ENABLE: + return result = huaxin_trade_api.order(1, code, count, price) print("鍗庨懌涓嬪崟鑰楁椂", time.time() - start_time) logger_juejin_trade.info(f"{code}锛氫笅鍗曡�楁椂{round(time.time() - start_time, 3)}s") @@ -72,8 +71,8 @@ if result['code'] == 0: result = result["data"] if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected: - logger_juejin_trade.info(f"{code}锛氫笅鍗曞け璐ワ細{result['statusMsg']}") - raise Exception(result["statusMsg"]) + logger_juejin_trade.info(f"{code}锛氫笅鍗曞け璐ワ細{result.get('statusMsg')}") + raise Exception(result.get('statusMsg')) else: TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"]) logger_juejin_trade.info(f"{code}锛氫笅鍗曟垚鍔� orderSysID:{result['orderSysID']}") diff --git a/trade/trade_manager.py b/trade/trade_manager.py index 5ef7525..b523699 100644 --- a/trade/trade_manager.py +++ b/trade/trade_manager.py @@ -357,7 +357,7 @@ if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN: trade_juejin.order_volume(code, price, count) elif constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: - trade_huaxin.order_volume(code, price, count) + trade_huaxin.order_volume(code, price, count,last_data_index) else: guiTrade.buy(code, price) __place_order_success(code, capture_timestamp, last_data, last_data_index) diff --git a/trade/trade_result_manager.py b/trade/trade_result_manager.py index 4982b86..ea550cb 100644 --- a/trade/trade_result_manager.py +++ b/trade/trade_result_manager.py @@ -5,7 +5,7 @@ from l2 import l2_data_manager from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil, \ - LCancelBigNumComputer + LCancelBigNumComputer, DCancelBigNumComputer from l2.l2_data_util import local_today_datas, local_today_num_operate_map from l2.safe_count_manager import BuyL2SafeCountManager from log_module.log import logger_l2_error @@ -33,7 +33,8 @@ f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index, total_datas[-1]["index"]) f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) - dask.compute(f1, f2, f3, f4, f5, f6) + f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code) + dask.compute(f1, f2, f3, f4, f5, f6, f7) # 鐪熷疄涔版垚鍔� @@ -91,7 +92,8 @@ f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code) f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code) f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) - dask.compute(f1, f2, f3, f4, f5, f6) + f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code) + dask.compute(f1, f2, f3, f4, f5, f6, f7) if __name__ == "__main__": @@ -101,4 +103,5 @@ f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code) f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code) f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code) - dask.compute(f2, f3, f4, f5, f6) + f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code) + dask.compute(f2, f3, f4, f5, f6, f7) -- Gitblit v1.8.0