From a9681c7b03a6fde559bf77ef65917d6d4db5d84c Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 17 七月 2023 13:22:57 +0800
Subject: [PATCH] 华鑫适配

---
 trade/trade_result_manager.py                |   11 ++-
 l2/huaxin/huaxin_delegate_postion_manager.py |   47 +++++++++++++++
 trade/trade_manager.py                       |    2 
 trade/huaxin/trade_server.py                 |   31 ++++++++-
 trade/trade_huaxin.py                        |   19 +++---
 trade/huaxin/huaxin_trade_api.py             |    8 ++
 l2/cancel_buy_strategy.py                    |   19 +++++
 l2/l2_data_manager_new.py                    |   11 ++-
 8 files changed, 122 insertions(+), 26 deletions(-)

diff --git a/l2/cancel_buy_strategy.py b/l2/cancel_buy_strategy.py
index 01f9b10..19efcbf 100644
--- a/l2/cancel_buy_strategy.py
+++ b/l2/cancel_buy_strategy.py
@@ -674,12 +674,23 @@
             return int(val)
         return None
 
+    @classmethod
+    def clear(cls, code=None):
+        if code:
+            cls.__getRedis().delete(f"d_cancel_real_order_index-{code}")
+        else:
+            keys = cls.__getRedis().keys("d_cancel_real_order_index-*")
+            if keys:
+                for k in keys:
+                    cls.__getRedis().delete(k)
+
     # 璁剧疆鎴愪氦浣�
     @classmethod
     def set_trade_progress(cls, code, index, buy_exec_index, total_data, local_today_num_operate_map, m_value,
                            limit_up_price):
         # 绂讳笅鍗曟墽琛屼綅2鍒嗛挓鍐呯殑鏈夋晥
-        if tool.trade_time_sub(total_data[-1]['val']['time'], total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME:
+        if tool.trade_time_sub(total_data[-1]['val']['time'],
+                               total_data[buy_exec_index]['val']['time']) > constant.D_CANCEL_EXPIRE_TIME:
             return False, "瓒呰繃D鎾ゅ畧鎶ゆ椂闂�"
 
         real_order_index = cls.__get_real_order_index(code)
@@ -709,7 +720,11 @@
     # 璁剧疆鐪熷疄鐨勪笅鍗曚綅缃�
     @classmethod
     def set_real_order_index(cls, code, index):
-        pass
+        cls.__set_real_order_index(code, index)
+
+    @classmethod
+    def cancel_success(cls, code):
+        cls.clear(code)
 
 
 # ---------------------------------L鎾�-------------------------------
diff --git a/l2/huaxin/huaxin_delegate_postion_manager.py b/l2/huaxin/huaxin_delegate_postion_manager.py
new file mode 100644
index 0000000..e45245f
--- /dev/null
+++ b/l2/huaxin/huaxin_delegate_postion_manager.py
@@ -0,0 +1,47 @@
+"""
+鍗庨懌濮旀墭瀹為檯浣嶇疆绠$悊
+"""
+import time
+
+from log_module.log import hx_logger_trade_debug
+
+_place_order_info_dict = {}
+
+
+# 涓嬪崟
+def place_order(code, price, volume, exec_index):
+    _place_order_info_dict[code] = (price, volume, exec_index, time.time())
+
+
+# 鑾峰彇涓嬪崟淇℃伅
+def get_order_info(code):
+    info = _place_order_info_dict.get(code)
+    if info and time.time() - info[3] > 3:
+        # 闂撮殧3s浠ヤ笂灏辨棤鏁堜簡
+        info = None
+        _place_order_info_dict.pop(code)
+    return info
+
+
+# L2鏁版嵁鍒楄〃
+def get_l2_place_order_position(code, datas):
+    order_info = get_order_info(code)
+    if not order_info:
+        # 鏆傛棤涓嬪崟淇℃伅
+        return None
+    price = order_info[0]
+    volume = order_info[1]
+    exec_index = order_info[2]
+    # 鑾峰彇閲�
+    for d in datas:
+        if d["val"]["num"] != volume:
+            continue
+        if abs(float(price) - float(d["val"]["price"])) >= 0.01:
+            continue
+        # 涓嶅彲鑳芥瘮涓嬪崟鎵ц浣嶇疆杩樻棭
+        if d["index"] <= exec_index:
+            continue
+        # 鑾峰彇鍒颁簡涓嬪崟浣嶇疆
+        hx_logger_trade_debug.info(f"鐪熷疄涓嬪崟浣嶇疆锛歿code}-{d['index']}")
+        return d["index"]
+    return None
diff --git a/l2/l2_data_manager_new.py b/l2/l2_data_manager_new.py
index 9dc5807..9bde625 100644
--- a/l2/l2_data_manager_new.py
+++ b/l2/l2_data_manager_new.py
@@ -4,7 +4,7 @@
 from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, industry_codes_sort, \
     limit_up_time_manager, global_data_loader, gpcode_manager
 import constant
-from l2.huaxin import l2_huaxin_util
+from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager
 from utils import global_util, ths_industry_util, tool
 import l2_data_util
 from db import redis_manager
@@ -13,7 +13,7 @@
     trade_result_manager, first_code_score_manager
 from l2 import safe_count_manager, l2_data_manager, l2_data_log, l2_log, l2_data_source_util, code_price_manager
 from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
-    L2LimitUpSellStatisticUtil
+    L2LimitUpSellStatisticUtil, DCancelBigNumComputer
 from l2.l2_data_manager import L2DataException, TradePointManager
 from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \
     local_latest_datas
@@ -231,10 +231,13 @@
                 _start_index = local_today_datas[code][-1]["index"] + 1
             datas = l2_huaxin_util.get_format_l2_datas(code, datas,
                                                        gpcode_manager.get_limit_up_price(code), _start_index)
+            # 鑾峰彇涓嬪崟浣嶇疆
+            place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, datas)
+            if place_order_index:
+                DCancelBigNumComputer.set_real_order_index(code, place_order_index)
+
             __start_time = round(t.time() * 1000)
-            print("鏍煎紡鍖朙2鏁版嵁鎴愬姛", code)
             cls.process_add_datas(code, datas, 0, __start_time)
-            print("huaxin L2鏁版嵁澶勭悊鎴愬姛", code)
         except Exception as e:
             print("huaxin L2鏁版嵁澶勭悊寮傚父", code, str(e))
             logging.exception(e)
diff --git a/trade/huaxin/huaxin_trade_api.py b/trade/huaxin/huaxin_trade_api.py
index 26f1a50..2497d12 100644
--- a/trade/huaxin/huaxin_trade_api.py
+++ b/trade/huaxin/huaxin_trade_api.py
@@ -90,6 +90,14 @@
     def heart(cls, rid):
         cls.active_client_dict[rid] = time.time()
 
+    @classmethod
+    def del_invalid_clients(cls):
+        # 娓呴櫎闀挎椂闂存棤蹇冭烦鐨勫鎴风閫氶亾
+        for k in cls.active_client_dict.keys():
+            if time.time() - cls.active_client_dict[k] > 20:
+                # 蹇冭烦鏃堕棿闂撮殧20s浠ヤ笂瑙嗕负鏃犳晥
+                cls.del_client(k)
+
 
 TRADE_DIRECTION_BUY = 1
 TRADE_DIRECTION_SELL = 2
diff --git a/trade/huaxin/trade_server.py b/trade/huaxin/trade_server.py
index b2344a1..ccfd1f5 100644
--- a/trade/huaxin/trade_server.py
+++ b/trade/huaxin/trade_server.py
@@ -5,17 +5,18 @@
 import random
 import socket
 import socketserver
+import threading
 import time
 
 from code_attribute import gpcode_manager
 from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager
-from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer
+from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
 from l2.huaxin import huaxin_target_codes_manager
 from log_module.log import hx_logger_l2_upload, hx_logger_contact_debug, hx_logger_trade_callback, \
     hx_logger_l2_orderdetail, hx_logger_l2_transaction, hx_logger_l2_market_data, logger_l2_trade_buy_queue
 from trade import deal_big_money_manager
 
-from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server
+from trade.huaxin import huaxin_trade_api as trade_api, trade_api_server, huaxin_trade_api
 
 trade_data_request_queue = queue.Queue()
 
@@ -173,8 +174,10 @@
                                 code)
                             if True:
                                 if buy_progress_index is not None:
-                                    logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code, buy_progress_index)
-                                    buy_time = l2_data_util.local_today_datas.get(code)[buy_progress_index]["val"]["time"]
+                                    logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code,
+                                                                   buy_progress_index)
+                                    buy_time = l2_data_util.local_today_datas.get(code)[buy_progress_index]["val"][
+                                        "time"]
                                     HourCancelBigNumComputer.set_trade_progress(code, buy_time, buy_exec_index,
                                                                                 buy_progress_index,
                                                                                 l2_data_util.local_today_datas.get(
@@ -185,13 +188,18 @@
                                                                              l2_data_util.local_today_datas.get(
                                                                                  code))
 
-
                                     # 璁$畻澶у崟鎴愪氦棰�
                                     deal_big_money_manager.set_trade_progress(code, buy_progress_index,
                                                                               l2_data_util.local_today_datas.get(
                                                                                   code),
                                                                               l2_data_util.local_today_num_operate_map.get(
                                                                                   code))
+                                    DCancelBigNumComputer.set_trade_progress(code, buy_progress_index, buy_exec_index,
+                                                                             l2_data_util.local_today_datas.get(
+                                                                                 code),
+                                                                             l2_data_util.local_today_num_operate_map.get(
+                                                                                 code), 1000 * 10000,
+                                                                             gpcode_manager.get_limit_up_price(code))
                         except Exception as e:
                             hx_logger_l2_transaction.exception(e)
 
@@ -232,8 +240,21 @@
         super().finish()
 
 
+def clear_invalid_client():
+    while True:
+        try:
+            huaxin_trade_api.ClientSocketManager.del_invalid_clients()
+        except:
+            pass
+        finally:
+            time.sleep(2)
+
+
 def run():
     print("create TradeServer")
+    t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True)
+    t1.start()
+
     laddr = "0.0.0.0", 10008
     tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 娉ㄦ剰锛氬弬鏁版槸MyBaseRequestHandle
     tcpserver.serve_forever()
diff --git a/trade/trade_huaxin.py b/trade/trade_huaxin.py
index 699e5b4..2c9611b 100644
--- a/trade/trade_huaxin.py
+++ b/trade/trade_huaxin.py
@@ -9,6 +9,7 @@
 from log_module.log import logger_juejin_trade
 from trade.huaxin import huaxin_trade_api
 from utils import tool, huaxin_util
+from l2 import huaxin
 
 __context_dict = {}
 
@@ -52,17 +53,15 @@
 
 
 # 閫氳繃閲忎笅鍗�,杩斿洖(浠g爜锛岃处鍙稩D锛岃鍗曞彿)
-def order_volume(code, price, count):
-    if not constant.TRADE_ENABLE:
-        return
+def order_volume(code, price, count, last_data_index):
+
     if code.find("00") != 0 and code.find("60") != 0:
         raise Exception("鍙敮鎸�00寮�澶翠笌60寮�澶寸殑浠g爜涓嬪崟")
-    code_str = code
-    if code[0:2] == '00':
-        code_str = f"SZSE.{code}"
-    elif code[0:2] == '60':
-        code_str = f"SHSE.{code}"
     start_time = time.time()
+    # 淇濆瓨涓嬪崟淇℃伅
+    huaxin.huaxin_delegate_postion_manager.place_order(code, price, count, last_data_index)
+    if not constant.TRADE_ENABLE:
+        return
     result = huaxin_trade_api.order(1, code, count, price)
     print("鍗庨懌涓嬪崟鑰楁椂", time.time() - start_time)
     logger_juejin_trade.info(f"{code}锛氫笅鍗曡�楁椂{round(time.time() - start_time, 3)}s")
@@ -72,8 +71,8 @@
         if result['code'] == 0:
             result = result["data"]
             if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected:
-                logger_juejin_trade.info(f"{code}锛氫笅鍗曞け璐ワ細{result['statusMsg']}")
-                raise Exception(result["statusMsg"])
+                logger_juejin_trade.info(f"{code}锛氫笅鍗曞け璐ワ細{result.get('statusMsg')}")
+                raise Exception(result.get('statusMsg'))
             else:
                 TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"])
                 logger_juejin_trade.info(f"{code}锛氫笅鍗曟垚鍔� orderSysID:{result['orderSysID']}")
diff --git a/trade/trade_manager.py b/trade/trade_manager.py
index 5ef7525..b523699 100644
--- a/trade/trade_manager.py
+++ b/trade/trade_manager.py
@@ -357,7 +357,7 @@
             if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN:
                 trade_juejin.order_volume(code, price, count)
             elif constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN:
-                trade_huaxin.order_volume(code, price, count)
+                trade_huaxin.order_volume(code, price, count,last_data_index)
         else:
             guiTrade.buy(code, price)
         __place_order_success(code, capture_timestamp, last_data, last_data_index)
diff --git a/trade/trade_result_manager.py b/trade/trade_result_manager.py
index 4982b86..ea550cb 100644
--- a/trade/trade_result_manager.py
+++ b/trade/trade_result_manager.py
@@ -5,7 +5,7 @@
 
 from l2 import l2_data_manager
 from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil, \
-    LCancelBigNumComputer
+    LCancelBigNumComputer, DCancelBigNumComputer
 from l2.l2_data_util import local_today_datas, local_today_num_operate_map
 from l2.safe_count_manager import BuyL2SafeCountManager
 from log_module.log import logger_l2_error
@@ -33,7 +33,8 @@
     f5 = dask.delayed(__buyL2SafeCountManager.save_place_order_info)(code, buy_single_index, buy_exec_index,
                                                                      total_datas[-1]["index"])
     f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
-    dask.compute(f1, f2, f3, f4, f5, f6)
+    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
+    dask.compute(f1, f2, f3, f4, f5, f6, f7)
 
 
 # 鐪熷疄涔版垚鍔�
@@ -91,7 +92,8 @@
     f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
     f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
     f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
-    dask.compute(f1, f2, f3, f4, f5, f6)
+    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
+    dask.compute(f1, f2, f3, f4, f5, f6, f7)
 
 
 if __name__ == "__main__":
@@ -101,4 +103,5 @@
     f4 = dask.delayed(l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy)(code)
     f5 = dask.delayed(l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy)(code)
     f6 = dask.delayed(SecondCancelBigNumComputer.cancel_success)(code)
-    dask.compute(f2, f3, f4, f5, f6)
+    f7 = dask.delayed(DCancelBigNumComputer.cancel_success)(code)
+    dask.compute(f2, f3, f4, f5, f6, f7)

--
Gitblit v1.8.0