From b7000cbf5e67e90abe53e96a4ea931afbf906e24 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 16 九月 2022 18:51:47 +0800 Subject: [PATCH] l2数据计算优化 --- l2_data_manager.py | 768 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 615 insertions(+), 153 deletions(-) diff --git a/l2_data_manager.py b/l2_data_manager.py index 2ece3bd..3e81904 100644 --- a/l2_data_manager.py +++ b/l2_data_manager.py @@ -1,18 +1,19 @@ import decimal import json +import os import time as t from datetime import datetime import data_process import l2_data_util -import mysql import gpcode_manager -import mongo_data + import redis_manager import tool import trade_manager from log import logger_l2_trade +from trade_data_manager import TradeBuyDataManager _redisManager = redis_manager.RedisManager(1) # l2鏁版嵁绠$悊 @@ -91,25 +92,30 @@ @staticmethod def get_buy_cancel_compute_start_data(code): redis = TradePointManager.__get_redis() - index = redis.get("buy_cancel_compute_index-{}".format(code)) - total_num = redis.get("buy_cancel_compute_num-{}".format(code)) - if index is None: - return None, 0 + info = redis.get("buy_cancel_compute_info-{}".format(code)) + if info is None: + return None, None , None else: - return int(index), int(total_num) + info=json.loads(info) + return info[0],info[1],info[2] # 璁剧疆涔版挙鐐逛俊鎭� - @staticmethod - def set_buy_cancel_compute_start_data(code, num_add, index=None): + # buy_num 绾拱棰� computed_index璁$畻鍒扮殑涓嬫爣 index鎾や拱淇″彿璧风偣 + + @classmethod + def set_buy_cancel_compute_start_data(cls,code, buy_num,computed_index, index): redis = TradePointManager.__get_redis() expire = tool.get_expire() - if index is not None: - redis.setex("buy_cancel_compute_index-{}".format(code), expire, index) - key = "buy_cancel_compute_num-{}".format(code) - if redis.get(key) is None: - redis.setex(key, expire, num_add) - else: - redis.incrby(key, num_add) + redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index,buy_num,computed_index))) + + # 澧炲姞鎾や拱鐨勭函涔伴 + @classmethod + def add_buy_nums_for_cancel(cls,code,num_add,computed_index): + cancel_index,nums,c_index= cls.get_buy_cancel_compute_start_data(code) + if cancel_index is None: + raise Exception("鏃犳挙涔颁俊鍙疯褰�") + nums+=num_add + cls.set_buy_cancel_compute_start_data(code,nums,computed_index) def load_l2_data(code, force=False): @@ -140,7 +146,8 @@ l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) -def saveL2Data(code, datas): +def saveL2Data(code, datas, msg=""): + start_time = round(t.time() * 1000) # 鏌ヨ绁ㄦ槸鍚﹀湪寰呯洃鍚殑绁ㄩ噷闈� if not gpcode_manager.is_in_gp_pool(code): return None @@ -152,22 +159,21 @@ # 璁$畻淇濈暀鐨勬椂闂� expire = tool.get_expire() - index = 0 start_index = redis_instance.get("l2-maxindex-{}".format(code)) if start_index is None: - start_index = 0 + start_index = -1 else: start_index = int(start_index) max_index = start_index + i = 0 for _data in datas: - index = index + 1 - + i += 1 key = "l2-" + _data["key"] value = redis_instance.get(key) if value is None: # 鏂板 - max_index = start_index + index - value = {"index": start_index + index, "re": _data['re']} + max_index = start_index + i + value = {"index": start_index + i, "re": _data["re"]} redis_instance.setex(key, expire, json.dumps(value)) else: json_value = json.loads(value) @@ -179,77 +185,29 @@ finally: redis_instance.delete("l2-save-{}".format(code)) + print("淇濆瓨鏂版暟鎹敤鏃讹細", msg, round(t.time() * 1000) - start_time) return datas +# TODO 鑾峰彇l2鐨勬暟鎹� +def get_l2_data_index(code, key): + pass + + def parseL2Data(str): - now = int(t.time()) day = datetime.now().strftime("%Y%m%d") dict = json.loads(str) data = dict["data"] client = dict["client"] code = data["code"] channel = data["channel"] + capture_time = data["captureTime"] + process_time = data["processTime"] data = data["data"] - datas = [] - dataIndexs = {} - - # 鑾峰彇娑ㄥ仠浠� limit_up_price = gpcode_manager.get_limit_up_price(code) - same_time_num = {} - for item in data: - # 瑙f瀽鏁版嵁 - time = item["time"] - if time in same_time_num: - same_time_num[time] = same_time_num[time] + 1 - else: - same_time_num[time] = 1 - - price = float(item["price"]) - num = item["num"] - limitPrice = item["limitPrice"] - # 娑ㄥ仠浠� - if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)): - limitPrice = 1 - item["limitPrice"] = "{}".format(limitPrice) - operateType = item["operateType"] - cancelTime = item["cancelTime"] - cancelTimeUnit = item["cancelTimeUnit"] - key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, - cancelTimeUnit) - if key in dataIndexs: - # 鏁版嵁閲嶅娆℃暟+1 - datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 - else: - # 鏁版嵁閲嶅娆℃暟榛樿涓�1 - datas.append({"key": key, "val": item, "re": 1}) - dataIndexs.setdefault(key, len(datas) - 1) - for key in same_time_num: - if same_time_num[key] > 50: - # 鍙兘淇濆瓨杩�3s鐨勬暟鎹� - ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"]) - ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S")) - if abs(ts1 - ts_now) <= 3: - # TODO 淇濆瓨鏁版嵁 - redis = _redisManager.getRedis() - redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str) - - return day, client, channel, code, datas - - -# 绾犳鏁版嵁锛屽皢re瀛楁鏇挎崲涓鸿緝澶у�� -def correct_data(code, _datas): - latest_data = local_latest_datas.get(code) - if latest_data is None: - latest_data = [] - - for data in _datas: - for _ldata in latest_data: - if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: - max_re = max(_ldata["re"], data["re"]) - _ldata["re"] = max_re - data["re"] = max_re - return _datas + datas = L2DataUtil.format_l2_data(data, code, limit_up_price) + # 鑾峰彇娑ㄥ仠浠� + return day, client, channel, code, capture_time, process_time, datas # 淇濆瓨l2鏁版嵁 @@ -267,55 +225,543 @@ saveL2Data(code, add_datas) -# 鑾峰彇澧為噺鏁版嵁 -def get_add_data(code, datas): - if datas is not None and len(datas) < 1: - return [] - last_key = "" - __latest_datas = local_latest_datas.get(code) - if __latest_datas is not None and len(__latest_datas) > 0: - last_key = __latest_datas[-1]["key"] - count = 0 - start_index = -1 - # 濡傛灉鍘熸潵娌℃湁鏁版嵁 - - for n in reversed(datas): - count += 1 - if n["key"] == last_key: - start_index = len(datas) - count - break - if len(last_key) > 0: - if start_index < 0 or start_index + 1 >= len(datas): - return [] +class L2DataUtil: + @classmethod + def is_same_time(cls, time1, time2): + # TODO 娴嬭瘯 + # if 1 > 0: + # return True + time1_s = time1.split(":") + time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) + time2_s = time2.split(":") + time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) + if abs(time2_second - time1_second) < 3: + return True else: - return datas[start_index + 1:] - else: - return datas[start_index + 1:] + return False + # 鑾峰彇澧為噺鏁版嵁 + @classmethod + def get_add_data(cls, code, datas, _start_index): + if datas is not None and len(datas) < 1: + return [] + last_key = "" + __latest_datas = local_latest_datas.get(code) + if __latest_datas is not None and len(__latest_datas) > 0: + last_key = __latest_datas[-1]["key"] + count = 0 + start_index = -1 + # 濡傛灉鍘熸潵娌℃湁鏁版嵁 + # TODO 璁剧疆add_data鐨勫簭鍙� + for n in reversed(datas): + count += 1 + if n["key"] == last_key: + start_index = len(datas) - count + break -def __is_same_time(time1, time2): - # TODO 娴嬭瘯 - # if 1 > 0: - # return True - time1_s = time1.split(":") - time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) - time2_s = time2.split(":") - time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) - if abs(time2_second - time1_second) < 3: + _add_datas = [] + if len(last_key) > 0: + if start_index < 0 or start_index + 1 >= len(datas): + _add_datas = [] + else: + _add_datas = datas[start_index + 1:] + else: + _add_datas = datas[start_index + 1:] + for i in range(0, len(_add_datas)): + _add_datas[i]["index"] = _start_index + i + + return _add_datas + + # 绾犳鏁版嵁锛屽皢re瀛楁鏇挎崲涓鸿緝澶у�� + @classmethod + def correct_data(cls, code, _datas): + latest_data = local_latest_datas.get(code) + if latest_data is None: + latest_data = [] + save_list = [] + for data in _datas: + for _ldata in latest_data: + if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: + max_re = max(_ldata["re"], data["re"]) + _ldata["re"] = max_re + data["re"] = max_re + # 淇濆瓨鍒版暟鎹簱锛屾洿鏂皉e鐨勬暟鎹� + save_list.append(_ldata) + if len(save_list) > 0: + saveL2Data(code, save_list, "淇濆瓨绾犳鏁版嵁") + return _datas + + # 澶勭悊l2鏁版嵁 + @classmethod + def format_l2_data(cls, data, code, limit_up_price): + datas = [] + dataIndexs = {} + same_time_num = {} + for item in data: + # 瑙f瀽鏁版嵁 + time = item["time"] + if time in same_time_num: + same_time_num[time] = same_time_num[time] + 1 + else: + same_time_num[time] = 1 + + price = float(item["price"]) + num = item["num"] + limitPrice = item["limitPrice"] + # 娑ㄥ仠浠� + if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)): + limitPrice = 1 + item["limitPrice"] = "{}".format(limitPrice) + operateType = item["operateType"] + cancelTime = item["cancelTime"] + cancelTimeUnit = item["cancelTimeUnit"] + key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, + cancelTimeUnit) + if key in dataIndexs: + # 鏁版嵁閲嶅娆℃暟+1 + datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 + else: + # 鏁版嵁閲嶅娆℃暟榛樿涓�1 + datas.append({"key": key, "val": item, "re": 1}) + dataIndexs.setdefault(key, len(datas) - 1) + l2_data_util.save_big_data(code, same_time_num, data) + return datas + + @classmethod + def get_time_as_second(time_str): + ts = time_str.split(":") + return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) + + # 鏄惁鏄定鍋滀环涔� + def is_limit_up_price_buy(val): + if int(val["limitPrice"]) != 1: + return False + + if int(val["operateType"]) != 0: + return False + + price = float(val["price"]) + num = int(val["num"]) + if price * num * 100 < 50 * 10000: + return False return True - else: - return False + + # 鏄惁娑ㄥ仠涔版挙 + def is_limit_up_price_buy_cancel(val): + if int(val["limitPrice"]) != 1: + return False + + if int(val["operateType"]) != 1: + return False + + price = float(val["price"]) + num = int(val["num"]) + if price * num * 100 < 50 * 10000: + return False + return True -def process_data(code, datas): +# L2浜ゆ槗鏁版嵁澶勭悊鍣� +class L2TradeDataProcessor: + unreal_buy_dict = {} + + @classmethod + # 鏁版嵁澶勭悊鍏ュ彛 + # datas: 鏈鎴浘鏁版嵁 + # capture_timestamp:鎴浘鏃堕棿鎴� + def process(cls, code, datas, capture_timestamp): + now_time_str = datetime.now().strftime("%H:%M:%S") + __start_time = round(t.time() * 1000) + try: + if len(datas) > 0: + # 鍒ゆ柇浠锋牸鍖洪棿鏄惁姝g‘ + if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): + raise L2DataException(L2DataException.CODE_PRICE_ERROR, + "鑲′环涓嶅尮閰� code-{} price-{}".format(code, datas[0]["val"]["price"])) + # 鍔犺浇鍘嗗彶鏁版嵁 + load_l2_data(code) + # 绾犳鏁版嵁 + datas = L2DataUtil.correct_data(code, datas) + _start_index = 0 + if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0: + _start_index = local_today_datas[code][-1]["index"] + add_datas = L2DataUtil.get_add_data(code, datas, _start_index) + if len(add_datas) > 0: + # 鎷兼帴鏁版嵁 + local_today_datas[code].extend(add_datas) + l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) + total_datas = local_today_datas[code] + # 涔板叆纭鐐瑰鐞� + TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, + total_datas[-1], + add_datas) + if len(add_datas) > 0: + latest_time = add_datas[len(add_datas) - 1]["val"]["time"] + # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞� + if L2DataUtil.is_same_time(now_time_str, latest_time): + # 鍒ゆ柇鏄惁宸茬粡鎸傚崟 + state = trade_manager.get_trade_state(code) + if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: + # 宸叉寕鍗� + cls.process_order(code, add_datas) + else: + # 鏈寕鍗� + cls.process_not_order(code, add_datas) + # 淇濆瓨鏁版嵁 + save_l2_data(code, datas, add_datas) + finally: + if code in cls.unreal_buy_dict: + cls.unreal_buy_dict.pop(code) + + # 澶勭悊鏈寕鍗� + @classmethod + def process_not_order(cls, code, add_datas): + + + # 澶勭悊宸叉寕鍗� + @classmethod + def process_order(cls, code, add_datas): + # 鑾峰彇涔嬪墠鏄惁鏈夎褰曠殑鎾や拱淇″彿 + cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code) + buy_index, buy_num = cls.get_order_begin_pos(code) + if cancel_index is None: + # 鏃犳挙鍗曚俊鍙疯捣濮嬬偣璁板綍 + cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3) + buy_num_for_cancel = 0 + computed_index=buy_index + if cancel_index is not None: + # 鑾峰彇闃堝�� 鏈変拱鎾や俊鍙凤紝缁熻鎾や拱绾拱棰� + threshold_money=10000000 + cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money) + else: + # 鏃犱拱鎾や俊鍙�,缁堟鎵ц + pass + + #寮�濮嬭绠楁挙鐨勪俊鍙� + @classmethod + def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money): + # sure_type 0-铏氭嫙鎸備拱浣� 1-鐪熷疄鎸備拱浣� + computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money) + total_datas = local_today_datas[code] + if computed_index is not None: + # 鍙戝嚭鎾や拱淇″彿锛岄渶瑕佹挙涔� + if cls.unreal_buy_dict.get(code) is not None: + # 鏈夎櫄鎷熶笅鍗� + # 鍒犻櫎铏氭嫙涓嬪崟鏍囪 + cls.unreal_buy_dict.pop(code) + # TODO 鍒犻櫎涓嬪崟鏍囪浣嶇疆 + pass + else: + # 鏃犺櫄鎷熶笅鍗曪紝闇�瑕佹墽琛屾挙鍗� + logger_l2_trade.info( + "鎵ц鎾ら攢锛歿} - {}".format(code, json.dumps(total_datas[computed_index]))) + try: + trade_manager.start_cancel_buy(code) + # 鍙栨秷涔板叆鏍囪瘑 + TradePointManager.delete_buy_point(code) + TradePointManager.delete_buy_cancel_point(code) + except Exception as e: + pass + + if computed_index < len(local_today_datas[code])-1: + # TODO鏁版嵁灏氭湭澶勭悊瀹�,閲嶆柊杩涘叆涓嬪崟璁$畻娴佺▼ + cls.start_compute_buy(code,computed_index+1,0,threshold_money) + pass + else: + #鏃犻渶鎾や拱锛岃褰曟挙涔颁俊鍙� + TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index) + # 鍒ゆ柇鏄惁鏈夎櫄鎷熶笅鍗� + unreal_buy_info=cls.unreal_buy_dict.get(code) + if unreal_buy_info is not None: + # unreal_buy_info 鐨勫唴瀹规牸寮忎负锛�(瑙︽硶涔版搷浣滀笅鏍�,鎴浘鏃堕棿) + # 鐪熷疄涓嬪崟 + logger_l2_trade.info( + "鎵ц涔板叆锛歿} ".format(code)) + try: + trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]], + unreal_buy_info[0]) + TradePointManager.delete_buy_cancel_point(code) + except Exception as e: + pass + pass + else: + #缁堟鎵ц + pass + + + + @classmethod + def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money): + total_datas=local_today_datas[code] + # 鑾峰彇涔板叆淇″彿璁$畻璧峰浣嶇疆 + index, num = cls.get_order_begin_pos(code) + # 鏄惁涓烘柊鑾峰彇鍒扮殑浣嶇疆 + new_get_pos = False + if index is None: + # 鏈変拱鍏ヤ俊鍙� + has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3) + if has_single: + num = 0 + new_get_pos = True + # TODO 璁板綍涔板叆淇″彿浣嶇疆 + if index is None: + # 鏈幏鍙栧埌涔板叆淇″彿锛岀粓姝㈢▼搴� + return None + + + # 涔板叆绾拱棰濈粺璁� + # TODO 鑾峰彇闃堝�� + threshold_money=10000000 + compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money) + if compute_index is not None: + # 杈惧埌涓嬪崟鏉′欢 + # 铏氭嫙涓嬪崟 + cls.unreal_buy_dict[code]=(compute_index,capture_time) + else: + # TODO 鏈揪鍒颁笅鍗曟潯浠讹紝淇濆瓨绾拱棰濓紝璁剧疆绾拱棰� + + + pass + + + + # 鑾峰彇涓嬪崟璧峰淇″彿 + @classmethod + def get_order_begin_pos(cls, code): + index, num = TradePointManager.get_buy_compute_start_data(code) + return index, num + + # 鑾峰彇鎾ゅ崟璧峰浣嶇疆 + @classmethod + def has_order_cancel_begin_pos(cls): + # cancel_index:鎾ゅ崟淇″彿璧风偣 + # buy_num_for_cancel锛氫粠鎸傚叆鐐硅绠楃殑绾拱棰� + # computed_index 璁$畻鐨勬渶鍚庝綅缃� + cancel_index, buy_num_for_cancel,computed_index = TradePointManager.get_buy_cancel_compute_start_data(code) + return cancel_index, buy_num_for_cancel,computed_index + + # 璁$畻涓嬪崟璧峰淇″彿 + # compute_data_count 鐢ㄤ簬璁$畻鐨刲2鏁版嵁鏁伴噺 + def compute_order_begin_pos(self, code, compute_data_count, continue_count): + # 鍊掓暟100鏉℃暟鎹煡璇� + datas = local_today_datas[code] + __len = len(datas) + if __len < continue_count: + return None + start_index = 0 + if compute_data_count > __len: + compute_data_count = __len + + if __len > compute_data_count: + start_index = __len - compute_data_count + __time = None + _limit_up_count_1s = 0 + _limit_up_count_1s_start_index = -1 + + for i in range(start_index, __len - (continue_count - 1)): + _val = datas[i]["val"] + # 鏃堕棿瑕�>=09:30:00 + if L2DataUtil.get_time_as_second(_val["time"]) < second_930: + continue + + # 鏈夎繛缁�4涓定鍋滀拱灏辨爣璁拌绠楄捣濮嬬偣 + if L2DataUtil.is_limit_up_price_buy(_val): + index_0 = i + index_1 = -1 + index_2 = -1 + # index_3 = -1 + for j in range(index_0 + 1, __len): + # 娑ㄥ仠涔� + if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): + index_1 = j + break + + if index_1 > 0: + for j in range(index_1 + 1, __len): + # 娑ㄥ仠涔� + if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): + index_2 = j + break + # if index_2 > 0: + # for j in range(index_2 + 1, __len): + # # 娑ㄥ仠涔� + # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: + # index_3 = j + if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 + logger_l2_trade.info("鎵惧埌鐗╃悊杩炵画娑ㄥ仠涔� {},{},{}".format(code, i, datas[i])) + return i + # 鍚�1s鍐呮湁涓嶈繛缁殑4涓定鍋滀拱锛堝鏋滈亣涔版挙灏遍噸鏂拌绠楋紝涓棿鍙棿闅斾笉娑ㄥ仠涔帮級鏍囪璁$畻璧峰鐐� + if L2DataUtil.is_limit_up_price_buy(_val): + # 娑ㄥ仠涔� + if __time is None: + _time = datas[i]["val"]["time"] + _limit_up_count_1s = 1 + _limit_up_count_1s_start_index = i + elif _time == _val["time"]: + _limit_up_count_1s += 1 + else: + _time = datas[i]["val"]["time"] + _limit_up_count_1s = 1 + _limit_up_count_1s_start_index = i + elif _val["operateType"] == 1: + # 涔版挙 + _time = None + _limit_up_count_1s = 0 + _limit_up_count_1s_start_index = -1 + + if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: + logger_l2_trade.info("鎵惧埌鍚屼竴绉掕繛缁定鍋滀拱 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i])) + return _limit_up_count_1s_start_index + + return None + + # 鏄惁鏈夋挙閿�淇″彿 + @classmethod + def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count): + datas = local_today_datas[code] + __len = len(datas) + if __len < continue_count: + return None + start_index = 0 + if compute_data_count > __len: + compute_data_count = __len + + if __len > compute_data_count: + start_index = __len - compute_data_count + for i in range(start_index, __len - (continue_count - 1)): + _val = datas[i]["val"] + if L2DataUtil.get_time_as_second(_val["time"]) < second_930: + continue + # 鏈夎繛缁�3涓拱鎾� + if L2DataUtil.is_limit_up_price_buy_cancel(_val): + index_0 = i + index_1 = -1 + index_2 = -1 + for j in range(index_0 + 1, __len): + # 娑ㄥ仠涔� + if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): + index_1 = j + break + + if index_1 > 0: + for j in range(index_1 + 1, __len): + # 娑ㄥ仠涔� + if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): + index_2 = j + break + if index_1 - index_0 == 1 and index_2 - index_1 == 1: + logger_l2_trade.info("杩炵画3涓定鍋滀拱鎾� {},{},{}".format(code, i, json.dumps(datas[i]))) + return i + return None + + # 淇濆瓨涓嬪崟浣嶇疆 + def save_order_pos(self): + pass + + # 鏄惁鍙互涓嬪崟 + def is_can_order(self): + pass + + # 铏氭嫙涓嬪崟 + def unreal_order(self): + pass + + # 璁剧疆铏氭嫙鎸備拱浣� + def set_unreal_sure_order_pos(self): + pass + + # 鑾峰彇棰勪及鎸備拱浣� + @classmethod + def get_sure_order_pos(cls, code): + index, data = TradeBuyDataManager.get_buy_sure_position(code) + if index is None: + return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1] + else: + return 1, index, data + + # 缁熻涔板叆鍑�涔伴噺 + @classmethod + def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money): + total_datas = local_today_datas[code] + buy_nums = origin_num + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is None: + raise Exception("娑ㄥ仠浠锋棤娉曡幏鍙�") + threshold_num = threshold_money / (limit_up_price * 100) + for i in range(compute_start_index, len(total_datas)): + _val = total_datas[i]["val"] + # 鏈夎繛缁�4涓定鍋滀拱灏辨爣璁拌绠楄捣濮嬬偣 + if L2DataUtil.is_limit_up_price_buy(_val): + # 娑ㄥ仠涔� + buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) + if buy_nums >= threshold_num: + return i, buy_nums + elif L2DataUtil.is_limit_up_price_buy_cancel(_val): + # 娑ㄥ仠涔版挙 + buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) + return None, buy_nums + + # 鍚屼竴鏃堕棿涔板叆鐨勬鐜囪绠� + @classmethod + def get_same_time_property(cls, code): + # TODO 涓庢澘鍧楃儹搴︽湁鍏� + return 0.5 + + # 缁熻涔版挙鍑�涔伴噺 + @classmethod + def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money): + buy_nums = origin_num + total_datas = local_today_datas[code] + limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is None: + raise Exception("娑ㄥ仠浠锋棤娉曡幏鍙�") + threshold_num = threshold_money / (limit_up_price * 100) + # 鑾峰彇棰勪及鎸備拱浣� sure_type:0 铏氭嫙鎸備拱 1 瀹為檯鎸備拱 + sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code) + same_time_property = cls.get_same_time_property(code) + # 鍚屼竴绉掞紝鍦ㄩ浼颁拱鍏ヤ綅涔嬪悗鐨勬暟鎹箣鍜� + property_buy_num_count = 0 + for i in range(start_index, len(total_datas)): + data = total_datas[i] + _val = data["val"] + if L2DataUtil.is_limit_up_price_buy(_val): + # 娑ㄥ仠涔� + if i < sure_pos: + buy_nums += int(_val["num"]) * int(data["re"]) + elif sure_data["val"]["time"] == _val["time"]: + # 鍚屼竴绉掍拱鍏ワ紝鑰屼笖杩樺湪棰勪及涔板叆浣嶄箣鍚� + property_buy_num_count += int(_val["num"]) * int(data["re"]) + + elif L2DataUtil.is_limit_up_price_buy_cancel(_val): + # 娑ㄥ仠鎾や拱 + # 鍒ゆ柇涔板叆浣嶇疆鏄惁鍦ㄤ拱鍏ヤ俊鍙蜂箣鍓� + buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas) + if buy_index is not None: + # 鎵惧埌涔版挙鏁版嵁鐨勪拱鍏ョ偣 + if buy_index < sure_pos: + buy_nums -= int(_val["num"]) * int(data["re"]) + elif sure_data["val"]["time"] == _val["time"]: + # 鍚屼竴绉�,鑰屼笖杩樺湪棰勪及涔板叆浣嶄箣鍚庢寜姒傜巼璁$畻 + property_buy_num_count -= int(_val["num"]) * int(data["re"]) + else: + # TODO 鏈壘鍒颁拱鎾ゆ暟鎹殑涔板叆鐐� + pass + + property_buy_num = round(property_buy_num_count * same_time_property) + if buy_nums + property_buy_num <= threshold_num: + return i, buy_nums + property_buy_num,sure_type + return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type + + +def process_data(code, datas, capture_timestamp): now_time_str = datetime.now().strftime("%H:%M:%S") __start_time = round(t.time() * 1000) try: if len(datas) > 0: # 鍒ゆ柇浠锋牸鍖洪棿鏄惁姝g‘ if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): - - raise L2DataException(L2DataException.CODE_PRICE_ERROR, "鑲′环涓嶅尮閰� code-{} price-{}".format(code,datas[0]["val"]["price"])) + raise L2DataException(L2DataException.CODE_PRICE_ERROR, + "鑲′环涓嶅尮閰� code-{} price-{}".format(code, datas[0]["val"]["price"])) # 鍔犺浇鍘嗗彶鏁版嵁 load_l2_data(code) # 绾犳鏁版嵁 @@ -325,7 +771,11 @@ # 鎷兼帴鏁版嵁 local_today_datas[code].extend(add_datas) l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) - + total_datas = local_today_datas[code] + # 涔板叆纭鐐瑰鐞� + TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1], + add_datas) + if len(add_datas) > 0: latest_time = add_datas[len(add_datas) - 1]["val"]["time"] # 鏃堕棿宸笉鑳藉お澶ф墠鑳藉鐞� if __is_same_time(now_time_str, latest_time): @@ -342,11 +792,14 @@ # 娌℃湁璁$畻寮�濮嬬偣 c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3) if c_index is not None: - total_datas = local_today_datas[code] + logger_l2_trade.info("鎵惧埌涔扮偣锛歿} - {}".format(code, json.dumps(total_datas[c_index]))) # 瑙﹀彂鏁版嵁鍒嗘瀽 锛岃幏鍙栬繛缁定鍋滄爣璁版暟鎹� buy_nums = 0 + # 鑾峰彇娑ㄥ仠浠� + limit_up_price = gpcode_manager.get_limit_up_price(code) + last_data_index = -1 for i in range(c_index, len(total_datas)): _val = total_datas[i]["val"] # 鏈夎繛缁�4涓定鍋滀拱灏辨爣璁拌绠楄捣濮嬬偣 @@ -356,24 +809,32 @@ elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: # 娑ㄥ仠涔版挙 buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) + if buy_nums * limit_up_price * 100 > 1000 * 10000: + last_data_index = i + break TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index) - # 鑾峰彇娑ㄥ仠浠� - limit_up_price = gpcode_manager.get_limit_up_price(code) + if limit_up_price is not None: - if buy_nums * limit_up_price * 100 > 1000 * 10000: + if last_data_index > -1: # 澶т簬1000w灏变拱 logger_l2_trade.info( "鎵ц涔板叆锛歿} - 璁$畻缁撴潫鐐癸細 {}".format(code, json.dumps(total_datas[-1]))) try: - trade_manager.start_buy(code) + trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index], + last_data_index) TradePointManager.delete_buy_cancel_point(code) except Exception as e: pass else: # 鏈夎绠楀紑濮嬬偣,璁$畻鏂板鐨勬暟鎹� - buy_nums = 0 + buy_nums = c_num + last_data = None + last_data_index = len(total_datas) - len(add_datas) - 1 + # 鑾峰彇娑ㄥ仠浠� + limit_up_price = gpcode_manager.get_limit_up_price(code) for data in add_datas: + last_data_index += 1 _val = data["val"] if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0: # 娑ㄥ仠涔� @@ -381,16 +842,17 @@ elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: # 娑ㄥ仠涔版挙 buy_nums -= int(_val["num"]) * int(data["re"]) + if buy_nums * limit_up_price * 100 > 1000 * 10000: + last_data = data + break + TradePointManager.set_buy_compute_start_data(code, buy_nums) - latest_num = c_num + buy_nums - # 鑾峰彇娑ㄥ仠浠� - limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: - if latest_num * limit_up_price * 100 > 1000 * 10000: + if last_data is not None: # 澶т簬1000w灏变拱 logger_l2_trade.info("鎵ц涔板叆锛歿} - 璁$畻缁撴潫鐐癸細 {}".format(code, json.dumps(add_datas[-1]))) try: - trade_manager.start_buy(code) + trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) TradePointManager.delete_buy_cancel_point(code) except Exception as e: pass @@ -697,25 +1159,25 @@ if __name__ == "__main__": - # 鍒犻櫎澶ф暟鎹� - redis = redis_manager.RedisManager(1).getRedis() - keys = redis.keys("big_data*") - for key in keys: - redis.delete(key) - # print("big_data-{}-{}".format("123", int(round(t.time() * 1000)))) - # load_l2_data("002868") - # keys= local_today_num_operate_map["002868"] - # for k in keys: - # print(len( local_today_num_operate_map["002868"][k])) - # pass - # __set_buy_compute_start_data("000000", 100, 1) - # __set_buy_compute_start_data("000000", 100) - # __set_l2_data_latest_count("000333", 20) - # print(type(get_l2_data_latest_count("000333"))) - # datas = ["2", "3", "4", "5"] - # print(datas[4:]) - # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)) - - # 鑾峰彇澧為噺鏁版嵁 - # 淇濆瓨鏁版嵁 - # 鎷兼帴鏁版嵁 + code = "000868" + local_today_datas.setdefault(code, []) + path = "C:/Users/Administrator/Desktop/demo/000868/" + for file_name in os.listdir(path): + p = "{}{}".format(path, file_name) + f = open(p) + for line in f.readlines(): # 渚濇璇诲彇姣忚 + line = line.strip() + data = json.loads(line) + result = __format_l2_data(data, code, 10.00) + add_datas = get_add_data(code, result) + print("澧炲姞鐨勬暟閲忥細", len(add_datas)) + if len(add_datas) > 0: + # 鎷兼帴鏁版嵁 + local_today_datas[code].extend(add_datas) + if code in local_latest_datas: + local_latest_datas[code] = result + else: + local_latest_datas.setdefault(code, result) + f.close() + for d in local_today_datas[code]: + print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"]) -- Gitblit v1.8.0