From b7000cbf5e67e90abe53e96a4ea931afbf906e24 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 16 九月 2022 18:51:47 +0800
Subject: [PATCH] l2数据计算优化

---
 l2_data_manager.py |  768 +++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 615 insertions(+), 153 deletions(-)

diff --git a/l2_data_manager.py b/l2_data_manager.py
index 2ece3bd..3e81904 100644
--- a/l2_data_manager.py
+++ b/l2_data_manager.py
@@ -1,18 +1,19 @@
 import decimal
 import json
+import os
 import time as t
 from datetime import datetime
 
 import data_process
 import l2_data_util
-import mysql
 
 import gpcode_manager
-import mongo_data
+
 import redis_manager
 import tool
 import trade_manager
 from log import logger_l2_trade
+from trade_data_manager import TradeBuyDataManager
 
 _redisManager = redis_manager.RedisManager(1)
 # l2鏁版嵁绠$悊
@@ -91,25 +92,30 @@
     @staticmethod
     def get_buy_cancel_compute_start_data(code):
         redis = TradePointManager.__get_redis()
-        index = redis.get("buy_cancel_compute_index-{}".format(code))
-        total_num = redis.get("buy_cancel_compute_num-{}".format(code))
-        if index is None:
-            return None, 0
+        info = redis.get("buy_cancel_compute_info-{}".format(code))
+        if info is None:
+            return None, None , None
         else:
-            return int(index), int(total_num)
+            info=json.loads(info)
+            return info[0],info[1],info[2]
 
     # 璁剧疆涔版挙鐐逛俊鎭�
-    @staticmethod
-    def set_buy_cancel_compute_start_data(code, num_add, index=None):
+    # buy_num 绾拱棰�  computed_index璁$畻鍒扮殑涓嬫爣  index鎾や拱淇″彿璧风偣
+
+    @classmethod
+    def set_buy_cancel_compute_start_data(cls,code, buy_num,computed_index, index):
         redis = TradePointManager.__get_redis()
         expire = tool.get_expire()
-        if index is not None:
-            redis.setex("buy_cancel_compute_index-{}".format(code), expire, index)
-        key = "buy_cancel_compute_num-{}".format(code)
-        if redis.get(key) is None:
-            redis.setex(key, expire, num_add)
-        else:
-            redis.incrby(key, num_add)
+        redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index,buy_num,computed_index)))
+
+    # 澧炲姞鎾や拱鐨勭函涔伴
+    @classmethod
+    def add_buy_nums_for_cancel(cls,code,num_add,computed_index):
+        cancel_index,nums,c_index= cls.get_buy_cancel_compute_start_data(code)
+        if cancel_index is None:
+            raise Exception("鏃犳挙涔颁俊鍙疯褰�")
+        nums+=num_add
+        cls.set_buy_cancel_compute_start_data(code,nums,computed_index)
 
 
 def load_l2_data(code, force=False):
@@ -140,7 +146,8 @@
     l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
 
 
-def saveL2Data(code, datas):
+def saveL2Data(code, datas, msg=""):
+    start_time = round(t.time() * 1000)
     # 鏌ヨ绁ㄦ槸鍚﹀湪寰呯洃鍚殑绁ㄩ噷闈�
     if not gpcode_manager.is_in_gp_pool(code):
         return None
@@ -152,22 +159,21 @@
 
             # 璁$畻淇濈暀鐨勬椂闂�
             expire = tool.get_expire()
-            index = 0
             start_index = redis_instance.get("l2-maxindex-{}".format(code))
             if start_index is None:
-                start_index = 0
+                start_index = -1
             else:
                 start_index = int(start_index)
             max_index = start_index
+            i = 0
             for _data in datas:
-                index = index + 1
-
+                i += 1
                 key = "l2-" + _data["key"]
                 value = redis_instance.get(key)
                 if value is None:
                     # 鏂板
-                    max_index = start_index + index
-                    value = {"index": start_index + index, "re": _data['re']}
+                    max_index = start_index + i
+                    value = {"index": start_index + i, "re": _data["re"]}
                     redis_instance.setex(key, expire, json.dumps(value))
                 else:
                     json_value = json.loads(value)
@@ -179,77 +185,29 @@
     finally:
         redis_instance.delete("l2-save-{}".format(code))
 
+    print("淇濆瓨鏂版暟鎹敤鏃讹細", msg, round(t.time() * 1000) - start_time)
     return datas
 
 
+# TODO 鑾峰彇l2鐨勬暟鎹�
+def get_l2_data_index(code, key):
+    pass
+
+
 def parseL2Data(str):
-    now = int(t.time())
     day = datetime.now().strftime("%Y%m%d")
     dict = json.loads(str)
     data = dict["data"]
     client = dict["client"]
     code = data["code"]
     channel = data["channel"]
+    capture_time = data["captureTime"]
+    process_time = data["processTime"]
     data = data["data"]
-    datas = []
-    dataIndexs = {}
-
-    # 鑾峰彇娑ㄥ仠浠�
     limit_up_price = gpcode_manager.get_limit_up_price(code)
-    same_time_num = {}
-    for item in data:
-        # 瑙f瀽鏁版嵁
-        time = item["time"]
-        if time in same_time_num:
-            same_time_num[time] = same_time_num[time] + 1
-        else:
-            same_time_num[time] = 1
-
-        price = float(item["price"])
-        num = item["num"]
-        limitPrice = item["limitPrice"]
-        # 娑ㄥ仠浠�
-        if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)):
-            limitPrice = 1
-            item["limitPrice"] = "{}".format(limitPrice)
-        operateType = item["operateType"]
-        cancelTime = item["cancelTime"]
-        cancelTimeUnit = item["cancelTimeUnit"]
-        key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
-                                               cancelTimeUnit)
-        if key in dataIndexs:
-            # 鏁版嵁閲嶅娆℃暟+1
-            datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
-        else:
-            # 鏁版嵁閲嶅娆℃暟榛樿涓�1
-            datas.append({"key": key, "val": item, "re": 1})
-            dataIndexs.setdefault(key, len(datas) - 1)
-    for key in same_time_num:
-        if same_time_num[key] > 50:
-            # 鍙兘淇濆瓨杩�3s鐨勬暟鎹�
-            ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"])
-            ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S"))
-            if abs(ts1 - ts_now) <= 3:
-                # TODO 淇濆瓨鏁版嵁
-                redis = _redisManager.getRedis()
-                redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
-
-    return day, client, channel, code, datas
-
-
-# 绾犳鏁版嵁锛屽皢re瀛楁鏇挎崲涓鸿緝澶у��
-def correct_data(code, _datas):
-    latest_data = local_latest_datas.get(code)
-    if latest_data is None:
-        latest_data = []
-
-    for data in _datas:
-        for _ldata in latest_data:
-            if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
-                max_re = max(_ldata["re"], data["re"])
-                _ldata["re"] = max_re
-                data["re"] = max_re
-    return _datas
+    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
+    # 鑾峰彇娑ㄥ仠浠�
+    return day, client, channel, code, capture_time, process_time, datas
 
 
 # 淇濆瓨l2鏁版嵁
@@ -267,55 +225,543 @@
         saveL2Data(code, add_datas)
 
 
-# 鑾峰彇澧為噺鏁版嵁
-def get_add_data(code, datas):
-    if datas is not None and len(datas) < 1:
-        return []
-    last_key = ""
-    __latest_datas = local_latest_datas.get(code)
-    if __latest_datas is not None and len(__latest_datas) > 0:
-        last_key = __latest_datas[-1]["key"]
-    count = 0
-    start_index = -1
-    # 濡傛灉鍘熸潵娌℃湁鏁版嵁
-
-    for n in reversed(datas):
-        count += 1
-        if n["key"] == last_key:
-            start_index = len(datas) - count
-            break
-    if len(last_key) > 0:
-        if start_index < 0 or start_index + 1 >= len(datas):
-            return []
+class L2DataUtil:
+    @classmethod
+    def is_same_time(cls, time1, time2):
+        # TODO 娴嬭瘯
+        # if 1 > 0:
+        #     return True
+        time1_s = time1.split(":")
+        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
+        time2_s = time2.split(":")
+        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
+        if abs(time2_second - time1_second) < 3:
+            return True
         else:
-            return datas[start_index + 1:]
-    else:
-        return datas[start_index + 1:]
+            return False
 
+    # 鑾峰彇澧為噺鏁版嵁
+    @classmethod
+    def get_add_data(cls, code, datas, _start_index):
+        if datas is not None and len(datas) < 1:
+            return []
+        last_key = ""
+        __latest_datas = local_latest_datas.get(code)
+        if __latest_datas is not None and len(__latest_datas) > 0:
+            last_key = __latest_datas[-1]["key"]
+        count = 0
+        start_index = -1
+        # 濡傛灉鍘熸潵娌℃湁鏁版嵁
+        # TODO 璁剧疆add_data鐨勫簭鍙�
+        for n in reversed(datas):
+            count += 1
+            if n["key"] == last_key:
+                start_index = len(datas) - count
+                break
 
-def __is_same_time(time1, time2):
-    # TODO 娴嬭瘯
-    # if 1 > 0:
-    #     return True
-    time1_s = time1.split(":")
-    time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
-    time2_s = time2.split(":")
-    time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
-    if abs(time2_second - time1_second) < 3:
+        _add_datas = []
+        if len(last_key) > 0:
+            if start_index < 0 or start_index + 1 >= len(datas):
+                _add_datas = []
+            else:
+                _add_datas = datas[start_index + 1:]
+        else:
+            _add_datas = datas[start_index + 1:]
+        for i in range(0, len(_add_datas)):
+            _add_datas[i]["index"] = _start_index + i
+
+        return _add_datas
+
+    # 绾犳鏁版嵁锛屽皢re瀛楁鏇挎崲涓鸿緝澶у��
+    @classmethod
+    def correct_data(cls, code, _datas):
+        latest_data = local_latest_datas.get(code)
+        if latest_data is None:
+            latest_data = []
+        save_list = []
+        for data in _datas:
+            for _ldata in latest_data:
+                if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
+                    max_re = max(_ldata["re"], data["re"])
+                    _ldata["re"] = max_re
+                    data["re"] = max_re
+                    # 淇濆瓨鍒版暟鎹簱锛屾洿鏂皉e鐨勬暟鎹�
+                    save_list.append(_ldata)
+        if len(save_list) > 0:
+            saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁")
+        return _datas
+
+    # 澶勭悊l2鏁版嵁
+    @classmethod
+    def format_l2_data(cls, data, code, limit_up_price):
+        datas = []
+        dataIndexs = {}
+        same_time_num = {}
+        for item in data:
+            # 瑙f瀽鏁版嵁
+            time = item["time"]
+            if time in same_time_num:
+                same_time_num[time] = same_time_num[time] + 1
+            else:
+                same_time_num[time] = 1
+
+            price = float(item["price"])
+            num = item["num"]
+            limitPrice = item["limitPrice"]
+            # 娑ㄥ仠浠�
+            if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)):
+                limitPrice = 1
+                item["limitPrice"] = "{}".format(limitPrice)
+            operateType = item["operateType"]
+            cancelTime = item["cancelTime"]
+            cancelTimeUnit = item["cancelTimeUnit"]
+            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
+                                                   cancelTimeUnit)
+            if key in dataIndexs:
+                # 鏁版嵁閲嶅娆℃暟+1
+                datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
+            else:
+                # 鏁版嵁閲嶅娆℃暟榛樿涓�1
+                datas.append({"key": key, "val": item, "re": 1})
+                dataIndexs.setdefault(key, len(datas) - 1)
+        l2_data_util.save_big_data(code, same_time_num, data)
+        return datas
+
+    @classmethod
+    def get_time_as_second(time_str):
+        ts = time_str.split(":")
+        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
+
+    # 鏄惁鏄定鍋滀环涔�
+    def is_limit_up_price_buy(val):
+        if int(val["limitPrice"]) != 1:
+            return False
+
+        if int(val["operateType"]) != 0:
+            return False
+
+        price = float(val["price"])
+        num = int(val["num"])
+        if price * num * 100 < 50 * 10000:
+            return False
         return True
-    else:
-        return False
+
+    # 鏄惁娑ㄥ仠涔版挙
+    def is_limit_up_price_buy_cancel(val):
+        if int(val["limitPrice"]) != 1:
+            return False
+
+        if int(val["operateType"]) != 1:
+            return False
+
+        price = float(val["price"])
+        num = int(val["num"])
+        if price * num * 100 < 50 * 10000:
+            return False
+        return True
 
 
-def process_data(code, datas):
+# L2浜ゆ槗鏁版嵁澶勭悊鍣�
+class L2TradeDataProcessor:
+    unreal_buy_dict = {}
+
+    @classmethod
+    # 鏁版嵁澶勭悊鍏ュ彛
+    # datas: 鏈鎴浘鏁版嵁
+    # capture_timestamp:鎴浘鏃堕棿鎴�
+    def process(cls, code, datas, capture_timestamp):
+        now_time_str = datetime.now().strftime("%H:%M:%S")
+        __start_time = round(t.time() * 1000)
+        try:
+            if len(datas) > 0:
+                # 鍒ゆ柇浠锋牸鍖洪棿鏄惁姝g‘
+                if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
+                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
+                                          "鑲′环涓嶅尮閰� code-{} price-{}".format(code, datas[0]["val"]["price"]))
+                # 鍔犺浇鍘嗗彶鏁版嵁
+                load_l2_data(code)
+                # 绾犳鏁版嵁
+                datas = L2DataUtil.correct_data(code, datas)
+                _start_index = 0
+                if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
+                    _start_index = local_today_datas[code][-1]["index"]
+                add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
+                if len(add_datas) > 0:
+                    # 鎷兼帴鏁版嵁
+                    local_today_datas[code].extend(add_datas)
+                    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
+                total_datas = local_today_datas[code]
+                # 涔板叆纭鐐瑰鐞�
+                TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
+                                                                   total_datas[-1],
+                                                                   add_datas)
+                if len(add_datas) > 0:
+                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
+                    # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞�
+                    if L2DataUtil.is_same_time(now_time_str, latest_time):
+                        # 鍒ゆ柇鏄惁宸茬粡鎸傚崟
+                        state = trade_manager.get_trade_state(code)
+                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
+                            # 宸叉寕鍗�
+                            cls.process_order(code, add_datas)
+                        else:
+                            # 鏈寕鍗�
+                            cls.process_not_order(code, add_datas)
+                # 淇濆瓨鏁版嵁
+                save_l2_data(code, datas, add_datas)
+        finally:
+            if code in cls.unreal_buy_dict:
+                cls.unreal_buy_dict.pop(code)
+
+    # 澶勭悊鏈寕鍗�
+    @classmethod
+    def process_not_order(cls, code, add_datas):
+
+
+    # 澶勭悊宸叉寕鍗�
+    @classmethod
+    def process_order(cls, code, add_datas):
+        # 鑾峰彇涔嬪墠鏄惁鏈夎褰曠殑鎾や拱淇″彿
+        cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code)
+        buy_index, buy_num = cls.get_order_begin_pos(code)
+        if cancel_index is None:
+            # 鏃犳挙鍗曚俊鍙疯捣濮嬬偣璁板綍
+            cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3)
+            buy_num_for_cancel = 0
+            computed_index=buy_index
+        if cancel_index is not None:
+            # 鑾峰彇闃堝�� 鏈変拱鎾や俊鍙凤紝缁熻鎾や拱绾拱棰�
+            threshold_money=10000000
+            cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money)
+        else:
+            # 鏃犱拱鎾や俊鍙�,缁堟鎵ц
+            pass
+
+    #寮�濮嬭绠楁挙鐨勪俊鍙�
+    @classmethod
+    def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money):
+        # sure_type 0-铏氭嫙鎸備拱浣�  1-鐪熷疄鎸備拱浣�
+        computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money)
+        total_datas = local_today_datas[code]
+        if computed_index is not None:
+            # 鍙戝嚭鎾や拱淇″彿锛岄渶瑕佹挙涔�
+            if cls.unreal_buy_dict.get(code) is not None:
+                # 鏈夎櫄鎷熶笅鍗�
+                # 鍒犻櫎铏氭嫙涓嬪崟鏍囪
+                cls.unreal_buy_dict.pop(code)
+                # TODO 鍒犻櫎涓嬪崟鏍囪浣嶇疆
+                pass
+            else:
+                # 鏃犺櫄鎷熶笅鍗曪紝闇�瑕佹墽琛屾挙鍗�
+                logger_l2_trade.info(
+                    "鎵ц鎾ら攢锛歿} - {}".format(code, json.dumps(total_datas[computed_index])))
+                try:
+                    trade_manager.start_cancel_buy(code)
+                    # 鍙栨秷涔板叆鏍囪瘑
+                    TradePointManager.delete_buy_point(code)
+                    TradePointManager.delete_buy_cancel_point(code)
+                except Exception as e:
+                    pass
+
+            if computed_index < len(local_today_datas[code])-1:
+                # TODO鏁版嵁灏氭湭澶勭悊瀹�,閲嶆柊杩涘叆涓嬪崟璁$畻娴佺▼
+                cls.start_compute_buy(code,computed_index+1,0,threshold_money)
+                pass
+        else:
+            #鏃犻渶鎾や拱锛岃褰曟挙涔颁俊鍙�
+            TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index)
+            # 鍒ゆ柇鏄惁鏈夎櫄鎷熶笅鍗�
+            unreal_buy_info=cls.unreal_buy_dict.get(code)
+            if unreal_buy_info is not None:
+                # unreal_buy_info 鐨勫唴瀹规牸寮忎负锛�(瑙︽硶涔版搷浣滀笅鏍�,鎴浘鏃堕棿)
+                # 鐪熷疄涓嬪崟
+                logger_l2_trade.info(
+                    "鎵ц涔板叆锛歿} ".format(code))
+                try:
+                    trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
+                                            unreal_buy_info[0])
+                    TradePointManager.delete_buy_cancel_point(code)
+                except Exception as e:
+                    pass
+                pass
+            else:
+                #缁堟鎵ц
+                pass
+
+
+
+    @classmethod
+    def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money):
+        total_datas=local_today_datas[code]
+        # 鑾峰彇涔板叆淇″彿璁$畻璧峰浣嶇疆
+        index, num = cls.get_order_begin_pos(code)
+        # 鏄惁涓烘柊鑾峰彇鍒扮殑浣嶇疆
+        new_get_pos = False
+        if index is None:
+            # 鏈変拱鍏ヤ俊鍙�
+            has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3)
+            if has_single:
+                num = 0
+                new_get_pos = True
+                # TODO 璁板綍涔板叆淇″彿浣嶇疆
+        if index is None:
+            # 鏈幏鍙栧埌涔板叆淇″彿锛岀粓姝㈢▼搴�
+            return None
+
+
+        # 涔板叆绾拱棰濈粺璁�
+        # TODO 鑾峰彇闃堝��
+        threshold_money=10000000
+        compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money)
+        if compute_index is not None:
+            # 杈惧埌涓嬪崟鏉′欢
+            # 铏氭嫙涓嬪崟
+            cls.unreal_buy_dict[code]=(compute_index,capture_time)
+        else:
+            # TODO 鏈揪鍒颁笅鍗曟潯浠讹紝淇濆瓨绾拱棰濓紝璁剧疆绾拱棰�
+
+
+        pass
+
+
+
+    # 鑾峰彇涓嬪崟璧峰淇″彿
+    @classmethod
+    def get_order_begin_pos(cls, code):
+        index, num = TradePointManager.get_buy_compute_start_data(code)
+        return index, num
+
+    # 鑾峰彇鎾ゅ崟璧峰浣嶇疆
+    @classmethod
+    def has_order_cancel_begin_pos(cls):
+        # cancel_index:鎾ゅ崟淇″彿璧风偣
+        # buy_num_for_cancel锛氫粠鎸傚叆鐐硅绠楃殑绾拱棰�
+        # computed_index 璁$畻鐨勬渶鍚庝綅缃�
+        cancel_index, buy_num_for_cancel,computed_index = TradePointManager.get_buy_cancel_compute_start_data(code)
+        return cancel_index, buy_num_for_cancel,computed_index
+
+    # 璁$畻涓嬪崟璧峰淇″彿
+    # compute_data_count 鐢ㄤ簬璁$畻鐨刲2鏁版嵁鏁伴噺
+    def compute_order_begin_pos(self, code, compute_data_count, continue_count):
+        # 鍊掓暟100鏉℃暟鎹煡璇�
+        datas = local_today_datas[code]
+        __len = len(datas)
+        if __len < continue_count:
+            return None
+        start_index = 0
+        if compute_data_count > __len:
+            compute_data_count = __len
+
+        if __len > compute_data_count:
+            start_index = __len - compute_data_count
+        __time = None
+        _limit_up_count_1s = 0
+        _limit_up_count_1s_start_index = -1
+
+        for i in range(start_index, __len - (continue_count - 1)):
+            _val = datas[i]["val"]
+            # 鏃堕棿瑕�>=09:30:00
+            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
+                continue
+
+            # 鏈夎繛缁�4涓定鍋滀拱灏辨爣璁拌绠楄捣濮嬬偣
+            if L2DataUtil.is_limit_up_price_buy(_val):
+                index_0 = i
+                index_1 = -1
+                index_2 = -1
+                # index_3 = -1
+                for j in range(index_0 + 1, __len):
+                    # 娑ㄥ仠涔�
+                    if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
+                        index_1 = j
+                        break
+
+                if index_1 > 0:
+                    for j in range(index_1 + 1, __len):
+                        # 娑ㄥ仠涔�
+                        if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
+                            index_2 = j
+                            break
+                # if index_2 > 0:
+                #     for j in range(index_2 + 1, __len):
+                #         # 娑ㄥ仠涔�
+                #         if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
+                #             index_3 = j
+                if index_1 - index_0 == 1 and index_2 - index_1 == 1:  # and index_3 - index_2 == 1
+                    logger_l2_trade.info("鎵惧埌鐗╃悊杩炵画娑ㄥ仠涔� {},{},{}".format(code, i, datas[i]))
+                    return i
+            # 鍚�1s鍐呮湁涓嶈繛缁殑4涓定鍋滀拱锛堝鏋滈亣涔版挙灏遍噸鏂拌绠楋紝涓棿鍙棿闅斾笉娑ㄥ仠涔帮級鏍囪璁$畻璧峰鐐�
+            if L2DataUtil.is_limit_up_price_buy(_val):
+                # 娑ㄥ仠涔�
+                if __time is None:
+                    _time = datas[i]["val"]["time"]
+                    _limit_up_count_1s = 1
+                    _limit_up_count_1s_start_index = i
+                elif _time == _val["time"]:
+                    _limit_up_count_1s += 1
+                else:
+                    _time = datas[i]["val"]["time"]
+                    _limit_up_count_1s = 1
+                    _limit_up_count_1s_start_index = i
+            elif _val["operateType"] == 1:
+                # 涔版挙
+                _time = None
+                _limit_up_count_1s = 0
+                _limit_up_count_1s_start_index = -1
+
+            if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
+                logger_l2_trade.info("鎵惧埌鍚屼竴绉掕繛缁定鍋滀拱 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
+                return _limit_up_count_1s_start_index
+
+        return None
+
+    # 鏄惁鏈夋挙閿�淇″彿
+    @classmethod
+    def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count):
+        datas = local_today_datas[code]
+        __len = len(datas)
+        if __len < continue_count:
+            return None
+        start_index = 0
+        if compute_data_count > __len:
+            compute_data_count = __len
+
+        if __len > compute_data_count:
+            start_index = __len - compute_data_count
+        for i in range(start_index, __len - (continue_count - 1)):
+            _val = datas[i]["val"]
+            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
+                continue
+            # 鏈夎繛缁�3涓拱鎾�
+            if L2DataUtil.is_limit_up_price_buy_cancel(_val):
+                index_0 = i
+                index_1 = -1
+                index_2 = -1
+                for j in range(index_0 + 1, __len):
+                    # 娑ㄥ仠涔�
+                    if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
+                        index_1 = j
+                        break
+
+                if index_1 > 0:
+                    for j in range(index_1 + 1, __len):
+                        # 娑ㄥ仠涔�
+                        if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
+                            index_2 = j
+                            break
+                if index_1 - index_0 == 1 and index_2 - index_1 == 1:
+                    logger_l2_trade.info("杩炵画3涓定鍋滀拱鎾� {},{},{}".format(code, i, json.dumps(datas[i])))
+                    return i
+        return None
+
+    # 淇濆瓨涓嬪崟浣嶇疆
+    def save_order_pos(self):
+        pass
+
+    # 鏄惁鍙互涓嬪崟
+    def is_can_order(self):
+        pass
+
+    # 铏氭嫙涓嬪崟
+    def unreal_order(self):
+        pass
+
+    # 璁剧疆铏氭嫙鎸備拱浣�
+    def set_unreal_sure_order_pos(self):
+        pass
+
+    # 鑾峰彇棰勪及鎸備拱浣�
+    @classmethod
+    def get_sure_order_pos(cls, code):
+        index, data = TradeBuyDataManager.get_buy_sure_position(code)
+        if index is None:
+            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
+        else:
+            return 1, index, data
+
+    # 缁熻涔板叆鍑�涔伴噺
+    @classmethod
+    def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
+        total_datas = local_today_datas[code]
+        buy_nums = origin_num
+        limit_up_price = gpcode_manager.get_limit_up_price(code)
+        if limit_up_price is None:
+            raise Exception("娑ㄥ仠浠锋棤娉曡幏鍙�")
+        threshold_num = threshold_money / (limit_up_price * 100)
+        for i in range(compute_start_index, len(total_datas)):
+            _val = total_datas[i]["val"]
+            # 鏈夎繛缁�4涓定鍋滀拱灏辨爣璁拌绠楄捣濮嬬偣
+            if L2DataUtil.is_limit_up_price_buy(_val):
+                # 娑ㄥ仠涔�
+                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
+                if buy_nums >= threshold_num:
+                    return i, buy_nums
+            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
+                # 娑ㄥ仠涔版挙
+                buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
+        return None, buy_nums
+
+    # 鍚屼竴鏃堕棿涔板叆鐨勬鐜囪绠�
+    @classmethod
+    def get_same_time_property(cls, code):
+        # TODO 涓庢澘鍧楃儹搴︽湁鍏�
+        return 0.5
+
+    # 缁熻涔版挙鍑�涔伴噺
+    @classmethod
+    def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
+        buy_nums = origin_num
+        total_datas = local_today_datas[code]
+        limit_up_price = gpcode_manager.get_limit_up_price(code)
+        if limit_up_price is None:
+            raise Exception("娑ㄥ仠浠锋棤娉曡幏鍙�")
+        threshold_num = threshold_money / (limit_up_price * 100)
+        # 鑾峰彇棰勪及鎸備拱浣� sure_type:0 铏氭嫙鎸備拱 1 瀹為檯鎸備拱
+        sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code)
+        same_time_property = cls.get_same_time_property(code)
+        # 鍚屼竴绉掞紝鍦ㄩ浼颁拱鍏ヤ綅涔嬪悗鐨勬暟鎹箣鍜�
+        property_buy_num_count = 0
+        for i in range(start_index, len(total_datas)):
+            data = total_datas[i]
+            _val = data["val"]
+            if L2DataUtil.is_limit_up_price_buy(_val):
+                # 娑ㄥ仠涔�
+                if i < sure_pos:
+                    buy_nums += int(_val["num"]) * int(data["re"])
+                elif sure_data["val"]["time"] == _val["time"]:
+                    # 鍚屼竴绉掍拱鍏ワ紝鑰屼笖杩樺湪棰勪及涔板叆浣嶄箣鍚�
+                    property_buy_num_count += int(_val["num"]) * int(data["re"])
+
+            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
+                # 娑ㄥ仠鎾や拱
+                # 鍒ゆ柇涔板叆浣嶇疆鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓�
+                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas)
+                if buy_index is not None:
+                    # 鎵惧埌涔版挙鏁版嵁鐨勪拱鍏ョ偣
+                    if buy_index < sure_pos:
+                        buy_nums -= int(_val["num"]) * int(data["re"])
+                    elif sure_data["val"]["time"] == _val["time"]:
+                        # 鍚屼竴绉�,鑰屼笖杩樺湪棰勪及涔板叆浣嶄箣鍚庢寜姒傜巼璁$畻
+                        property_buy_num_count -= int(_val["num"]) * int(data["re"])
+                else:
+                    # TODO 鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐�
+                    pass
+
+            property_buy_num = round(property_buy_num_count * same_time_property)
+            if buy_nums + property_buy_num <= threshold_num:
+                return i, buy_nums + property_buy_num,sure_type
+        return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type
+
+
+def process_data(code, datas, capture_timestamp):
     now_time_str = datetime.now().strftime("%H:%M:%S")
     __start_time = round(t.time() * 1000)
     try:
         if len(datas) > 0:
             # 鍒ゆ柇浠锋牸鍖洪棿鏄惁姝g‘
             if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
-
-                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "鑲′环涓嶅尮閰� code-{} price-{}".format(code,datas[0]["val"]["price"]))
+                raise L2DataException(L2DataException.CODE_PRICE_ERROR,
+                                      "鑲′环涓嶅尮閰� code-{} price-{}".format(code, datas[0]["val"]["price"]))
             # 鍔犺浇鍘嗗彶鏁版嵁
             load_l2_data(code)
             # 绾犳鏁版嵁
@@ -325,7 +771,11 @@
                 # 鎷兼帴鏁版嵁
                 local_today_datas[code].extend(add_datas)
                 l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
-
+            total_datas = local_today_datas[code]
+            # 涔板叆纭鐐瑰鐞�
+            TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1],
+                                                               add_datas)
+            if len(add_datas) > 0:
                 latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                 # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞�
                 if __is_same_time(now_time_str, latest_time):
@@ -342,11 +792,14 @@
                         # 娌℃湁璁$畻寮�濮嬬偣
                         c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3)
                         if c_index is not None:
-                            total_datas = local_today_datas[code]
+
                             logger_l2_trade.info("鎵惧埌涔扮偣锛歿} - {}".format(code, json.dumps(total_datas[c_index])))
 
                             # 瑙﹀彂鏁版嵁鍒嗘瀽 锛岃幏鍙栬繛缁定鍋滄爣璁版暟鎹�
                             buy_nums = 0
+                            # 鑾峰彇娑ㄥ仠浠�
+                            limit_up_price = gpcode_manager.get_limit_up_price(code)
+                            last_data_index = -1
                             for i in range(c_index, len(total_datas)):
                                 _val = total_datas[i]["val"]
                                 # 鏈夎繛缁�4涓定鍋滀拱灏辨爣璁拌绠楄捣濮嬬偣
@@ -356,24 +809,32 @@
                                 elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                     # 娑ㄥ仠涔版挙
                                     buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
+                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
+                                    last_data_index = i
+                                    break
 
                             TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index)
-                            # 鑾峰彇娑ㄥ仠浠�
-                            limit_up_price = gpcode_manager.get_limit_up_price(code)
+
                             if limit_up_price is not None:
-                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
+                                if last_data_index > -1:
                                     # 澶т簬1000w灏变拱
                                     logger_l2_trade.info(
                                         "鎵ц涔板叆锛歿} - 璁$畻缁撴潫鐐癸細 {}".format(code, json.dumps(total_datas[-1])))
                                     try:
-                                        trade_manager.start_buy(code)
+                                        trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index],
+                                                                last_data_index)
                                         TradePointManager.delete_buy_cancel_point(code)
                                     except Exception as e:
                                         pass
                     else:
                         # 鏈夎绠楀紑濮嬬偣,璁$畻鏂板鐨勬暟鎹�
-                        buy_nums = 0
+                        buy_nums = c_num
+                        last_data = None
+                        last_data_index = len(total_datas) - len(add_datas) - 1
+                        # 鑾峰彇娑ㄥ仠浠�
+                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                         for data in add_datas:
+                            last_data_index += 1
                             _val = data["val"]
                             if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                 # 娑ㄥ仠涔�
@@ -381,16 +842,17 @@
                             elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                 # 娑ㄥ仠涔版挙
                                 buy_nums -= int(_val["num"]) * int(data["re"])
+                            if buy_nums * limit_up_price * 100 > 1000 * 10000:
+                                last_data = data
+                                break
+
                         TradePointManager.set_buy_compute_start_data(code, buy_nums)
-                        latest_num = c_num + buy_nums
-                        # 鑾峰彇娑ㄥ仠浠�
-                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                         if limit_up_price is not None:
-                            if latest_num * limit_up_price * 100 > 1000 * 10000:
+                            if last_data is not None:
                                 # 澶т簬1000w灏变拱
                                 logger_l2_trade.info("鎵ц涔板叆锛歿} - 璁$畻缁撴潫鐐癸細 {}".format(code, json.dumps(add_datas[-1])))
                                 try:
-                                    trade_manager.start_buy(code)
+                                    trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index)
                                     TradePointManager.delete_buy_cancel_point(code)
                                 except Exception as e:
                                     pass
@@ -697,25 +1159,25 @@
 
 
 if __name__ == "__main__":
-    # 鍒犻櫎澶ф暟鎹�
-    redis = redis_manager.RedisManager(1).getRedis()
-    keys = redis.keys("big_data*")
-    for key in keys:
-        redis.delete(key)
-    # print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
-    # load_l2_data("002868")
-    # keys= local_today_num_operate_map["002868"]
-    # for k in keys:
-    #     print(len( local_today_num_operate_map["002868"][k]))
-    # pass
-    # __set_buy_compute_start_data("000000", 100, 1)
-    # __set_buy_compute_start_data("000000", 100)
-    # __set_l2_data_latest_count("000333", 20)
-    # print(type(get_l2_data_latest_count("000333")))
-    # datas = ["2", "3", "4", "5"]
-    # print(datas[4:])
-    # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP))
-
-    # 鑾峰彇澧為噺鏁版嵁
-    # 淇濆瓨鏁版嵁
-    # 鎷兼帴鏁版嵁
+    code = "000868"
+    local_today_datas.setdefault(code, [])
+    path = "C:/Users/Administrator/Desktop/demo/000868/"
+    for file_name in os.listdir(path):
+        p = "{}{}".format(path, file_name)
+        f = open(p)
+        for line in f.readlines():  # 渚濇璇诲彇姣忚
+            line = line.strip()
+            data = json.loads(line)
+            result = __format_l2_data(data, code, 10.00)
+            add_datas = get_add_data(code, result)
+            print("澧炲姞鐨勬暟閲忥細", len(add_datas))
+            if len(add_datas) > 0:
+                # 鎷兼帴鏁版嵁
+                local_today_datas[code].extend(add_datas)
+            if code in local_latest_datas:
+                local_latest_datas[code] = result
+            else:
+                local_latest_datas.setdefault(code, result)
+        f.close()
+    for d in local_today_datas[code]:
+        print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"])

--
Gitblit v1.8.0