| | |
| | | import decimal |
| | | import json |
| | | import os |
| | | import time as t |
| | | from datetime import datetime |
| | | |
| | | import data_process |
| | | import l2_data_util |
| | | import mysql |
| | | |
| | | import gpcode_manager |
| | | import mongo_data |
| | | |
| | | import redis_manager |
| | | import tool |
| | | import trade_manager |
| | | from log import logger_l2_trade |
| | | from trade_data_manager import TradeBuyDataManager |
| | | |
| | | _redisManager = redis_manager.RedisManager(1) |
| | | # l2数据管理 |
| | |
| | | @staticmethod |
| | | def get_buy_cancel_compute_start_data(code): |
| | | redis = TradePointManager.__get_redis() |
| | | index = redis.get("buy_cancel_compute_index-{}".format(code)) |
| | | total_num = redis.get("buy_cancel_compute_num-{}".format(code)) |
| | | if index is None: |
| | | return None, 0 |
| | | info = redis.get("buy_cancel_compute_info-{}".format(code)) |
| | | if info is None: |
| | | return None, None , None |
| | | else: |
| | | return int(index), int(total_num) |
| | | info=json.loads(info) |
| | | return info[0],info[1],info[2] |
| | | |
| | | # 设置买撤点信息 |
| | | @staticmethod |
| | | def set_buy_cancel_compute_start_data(code, num_add, index=None): |
| | | # buy_num 纯买额 computed_index计算到的下标 index撤买信号起点 |
| | | |
| | | @classmethod |
| | | def set_buy_cancel_compute_start_data(cls,code, buy_num,computed_index, index): |
| | | redis = TradePointManager.__get_redis() |
| | | expire = tool.get_expire() |
| | | if index is not None: |
| | | redis.setex("buy_cancel_compute_index-{}".format(code), expire, index) |
| | | key = "buy_cancel_compute_num-{}".format(code) |
| | | if redis.get(key) is None: |
| | | redis.setex(key, expire, num_add) |
| | | else: |
| | | redis.incrby(key, num_add) |
| | | redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index,buy_num,computed_index))) |
| | | |
| | | # 增加撤买的纯买额 |
| | | @classmethod |
| | | def add_buy_nums_for_cancel(cls,code,num_add,computed_index): |
| | | cancel_index,nums,c_index= cls.get_buy_cancel_compute_start_data(code) |
| | | if cancel_index is None: |
| | | raise Exception("无撤买信号记录") |
| | | nums+=num_add |
| | | cls.set_buy_cancel_compute_start_data(code,nums,computed_index) |
| | | |
| | | |
| | | def load_l2_data(code, force=False): |
| | |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) |
| | | |
| | | |
| | | def saveL2Data(code, datas): |
| | | def saveL2Data(code, datas, msg=""): |
| | | start_time = round(t.time() * 1000) |
| | | # 查询票是否在待监听的票里面 |
| | | if not gpcode_manager.is_in_gp_pool(code): |
| | | return None |
| | |
| | | |
| | | # 计算保留的时间 |
| | | expire = tool.get_expire() |
| | | index = 0 |
| | | start_index = redis_instance.get("l2-maxindex-{}".format(code)) |
| | | if start_index is None: |
| | | start_index = 0 |
| | | start_index = -1 |
| | | else: |
| | | start_index = int(start_index) |
| | | max_index = start_index |
| | | i = 0 |
| | | for _data in datas: |
| | | index = index + 1 |
| | | |
| | | i += 1 |
| | | key = "l2-" + _data["key"] |
| | | value = redis_instance.get(key) |
| | | if value is None: |
| | | # 新增 |
| | | max_index = start_index + index |
| | | value = {"index": start_index + index, "re": _data['re']} |
| | | max_index = start_index + i |
| | | value = {"index": start_index + i, "re": _data["re"]} |
| | | redis_instance.setex(key, expire, json.dumps(value)) |
| | | else: |
| | | json_value = json.loads(value) |
| | |
| | | finally: |
| | | redis_instance.delete("l2-save-{}".format(code)) |
| | | |
| | | print("保存新数据用时:", msg, round(t.time() * 1000) - start_time) |
| | | return datas |
| | | |
| | | |
| | | # TODO 获取l2的数据 |
| | | def get_l2_data_index(code, key): |
| | | pass |
| | | |
| | | |
| | | def parseL2Data(str): |
| | | now = int(t.time()) |
| | | day = datetime.now().strftime("%Y%m%d") |
| | | dict = json.loads(str) |
| | | data = dict["data"] |
| | | client = dict["client"] |
| | | code = data["code"] |
| | | channel = data["channel"] |
| | | capture_time = data["captureTime"] |
| | | process_time = data["processTime"] |
| | | data = data["data"] |
| | | datas = [] |
| | | dataIndexs = {} |
| | | |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | same_time_num = {} |
| | | for item in data: |
| | | # 解析数据 |
| | | time = item["time"] |
| | | if time in same_time_num: |
| | | same_time_num[time] = same_time_num[time] + 1 |
| | | else: |
| | | same_time_num[time] = 1 |
| | | |
| | | price = float(item["price"]) |
| | | num = item["num"] |
| | | limitPrice = item["limitPrice"] |
| | | # 涨停价 |
| | | if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)): |
| | | limitPrice = 1 |
| | | item["limitPrice"] = "{}".format(limitPrice) |
| | | operateType = item["operateType"] |
| | | cancelTime = item["cancelTime"] |
| | | cancelTimeUnit = item["cancelTimeUnit"] |
| | | key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, |
| | | cancelTimeUnit) |
| | | if key in dataIndexs: |
| | | # 数据重复次数+1 |
| | | datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 |
| | | else: |
| | | # 数据重复次数默认为1 |
| | | datas.append({"key": key, "val": item, "re": 1}) |
| | | dataIndexs.setdefault(key, len(datas) - 1) |
| | | for key in same_time_num: |
| | | if same_time_num[key] > 50: |
| | | # 只能保存近3s的数据 |
| | | ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"]) |
| | | ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S")) |
| | | if abs(ts1 - ts_now) <= 3: |
| | | # TODO 保存数据 |
| | | redis = _redisManager.getRedis() |
| | | redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str) |
| | | |
| | | return day, client, channel, code, datas |
| | | |
| | | |
| | | # 纠正数据,将re字段替换为较大值 |
| | | def correct_data(code, _datas): |
| | | latest_data = local_latest_datas.get(code) |
| | | if latest_data is None: |
| | | latest_data = [] |
| | | |
| | | for data in _datas: |
| | | for _ldata in latest_data: |
| | | if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: |
| | | max_re = max(_ldata["re"], data["re"]) |
| | | _ldata["re"] = max_re |
| | | data["re"] = max_re |
| | | return _datas |
| | | datas = L2DataUtil.format_l2_data(data, code, limit_up_price) |
| | | # 获取涨停价 |
| | | return day, client, channel, code, capture_time, process_time, datas |
| | | |
| | | |
| | | # 保存l2数据 |
| | |
| | | saveL2Data(code, add_datas) |
| | | |
| | | |
| | | # 获取增量数据 |
| | | def get_add_data(code, datas): |
| | | if datas is not None and len(datas) < 1: |
| | | return [] |
| | | last_key = "" |
| | | __latest_datas = local_latest_datas.get(code) |
| | | if __latest_datas is not None and len(__latest_datas) > 0: |
| | | last_key = __latest_datas[-1]["key"] |
| | | count = 0 |
| | | start_index = -1 |
| | | # 如果原来没有数据 |
| | | |
| | | for n in reversed(datas): |
| | | count += 1 |
| | | if n["key"] == last_key: |
| | | start_index = len(datas) - count |
| | | break |
| | | if len(last_key) > 0: |
| | | if start_index < 0 or start_index + 1 >= len(datas): |
| | | return [] |
| | | class L2DataUtil: |
| | | @classmethod |
| | | def is_same_time(cls, time1, time2): |
| | | # TODO 测试 |
| | | # if 1 > 0: |
| | | # return True |
| | | time1_s = time1.split(":") |
| | | time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) |
| | | time2_s = time2.split(":") |
| | | time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) |
| | | if abs(time2_second - time1_second) < 3: |
| | | return True |
| | | else: |
| | | return datas[start_index + 1:] |
| | | else: |
| | | return datas[start_index + 1:] |
| | | return False |
| | | |
| | | # 获取增量数据 |
| | | @classmethod |
| | | def get_add_data(cls, code, datas, _start_index): |
| | | if datas is not None and len(datas) < 1: |
| | | return [] |
| | | last_key = "" |
| | | __latest_datas = local_latest_datas.get(code) |
| | | if __latest_datas is not None and len(__latest_datas) > 0: |
| | | last_key = __latest_datas[-1]["key"] |
| | | count = 0 |
| | | start_index = -1 |
| | | # 如果原来没有数据 |
| | | # TODO 设置add_data的序号 |
| | | for n in reversed(datas): |
| | | count += 1 |
| | | if n["key"] == last_key: |
| | | start_index = len(datas) - count |
| | | break |
| | | |
| | | def __is_same_time(time1, time2): |
| | | # TODO 测试 |
| | | # if 1 > 0: |
| | | # return True |
| | | time1_s = time1.split(":") |
| | | time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) |
| | | time2_s = time2.split(":") |
| | | time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) |
| | | if abs(time2_second - time1_second) < 3: |
| | | _add_datas = [] |
| | | if len(last_key) > 0: |
| | | if start_index < 0 or start_index + 1 >= len(datas): |
| | | _add_datas = [] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | for i in range(0, len(_add_datas)): |
| | | _add_datas[i]["index"] = _start_index + i |
| | | |
| | | return _add_datas |
| | | |
| | | # 纠正数据,将re字段替换为较大值 |
| | | @classmethod |
| | | def correct_data(cls, code, _datas): |
| | | latest_data = local_latest_datas.get(code) |
| | | if latest_data is None: |
| | | latest_data = [] |
| | | save_list = [] |
| | | for data in _datas: |
| | | for _ldata in latest_data: |
| | | if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: |
| | | max_re = max(_ldata["re"], data["re"]) |
| | | _ldata["re"] = max_re |
| | | data["re"] = max_re |
| | | # 保存到数据库,更新re的数据 |
| | | save_list.append(_ldata) |
| | | if len(save_list) > 0: |
| | | saveL2Data(code, save_list, "保存纠正数据") |
| | | return _datas |
| | | |
| | | # 处理l2数据 |
| | | @classmethod |
| | | def format_l2_data(cls, data, code, limit_up_price): |
| | | datas = [] |
| | | dataIndexs = {} |
| | | same_time_num = {} |
| | | for item in data: |
| | | # 解析数据 |
| | | time = item["time"] |
| | | if time in same_time_num: |
| | | same_time_num[time] = same_time_num[time] + 1 |
| | | else: |
| | | same_time_num[time] = 1 |
| | | |
| | | price = float(item["price"]) |
| | | num = item["num"] |
| | | limitPrice = item["limitPrice"] |
| | | # 涨停价 |
| | | if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)): |
| | | limitPrice = 1 |
| | | item["limitPrice"] = "{}".format(limitPrice) |
| | | operateType = item["operateType"] |
| | | cancelTime = item["cancelTime"] |
| | | cancelTimeUnit = item["cancelTimeUnit"] |
| | | key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime, |
| | | cancelTimeUnit) |
| | | if key in dataIndexs: |
| | | # 数据重复次数+1 |
| | | datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1 |
| | | else: |
| | | # 数据重复次数默认为1 |
| | | datas.append({"key": key, "val": item, "re": 1}) |
| | | dataIndexs.setdefault(key, len(datas) - 1) |
| | | l2_data_util.save_big_data(code, same_time_num, data) |
| | | return datas |
| | | |
| | | @classmethod |
| | | def get_time_as_second(time_str): |
| | | ts = time_str.split(":") |
| | | return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | |
| | | # 是否是涨停价买 |
| | | def is_limit_up_price_buy(val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 0: |
| | | return False |
| | | |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | if price * num * 100 < 50 * 10000: |
| | | return False |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | # 是否涨停买撤 |
| | | def is_limit_up_price_buy_cancel(val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 1: |
| | | return False |
| | | |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | if price * num * 100 < 50 * 10000: |
| | | return False |
| | | return True |
| | | |
| | | |
| | | def process_data(code, datas): |
| | | # L2交易数据处理器 |
| | | class L2TradeDataProcessor: |
| | | unreal_buy_dict = {} |
| | | |
| | | @classmethod |
| | | # 数据处理入口 |
| | | # datas: 本次截图数据 |
| | | # capture_timestamp:截图时间戳 |
| | | def process(cls, code, datas, capture_timestamp): |
| | | now_time_str = datetime.now().strftime("%H:%M:%S") |
| | | __start_time = round(t.time() * 1000) |
| | | try: |
| | | if len(datas) > 0: |
| | | # 判断价格区间是否正确 |
| | | if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, |
| | | "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) |
| | | # 加载历史数据 |
| | | load_l2_data(code) |
| | | # 纠正数据 |
| | | datas = L2DataUtil.correct_data(code, datas) |
| | | _start_index = 0 |
| | | if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0: |
| | | _start_index = local_today_datas[code][-1]["index"] |
| | | add_datas = L2DataUtil.get_add_data(code, datas, _start_index) |
| | | if len(add_datas) > 0: |
| | | # 拼接数据 |
| | | local_today_datas[code].extend(add_datas) |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) |
| | | total_datas = local_today_datas[code] |
| | | # 买入确认点处理 |
| | | TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, |
| | | total_datas[-1], |
| | | add_datas) |
| | | if len(add_datas) > 0: |
| | | latest_time = add_datas[len(add_datas) - 1]["val"]["time"] |
| | | # 时间差不能太大才能处理 |
| | | if L2DataUtil.is_same_time(now_time_str, latest_time): |
| | | # 判断是否已经挂单 |
| | | state = trade_manager.get_trade_state(code) |
| | | if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: |
| | | # 已挂单 |
| | | cls.process_order(code, add_datas) |
| | | else: |
| | | # 未挂单 |
| | | cls.process_not_order(code, add_datas) |
| | | # 保存数据 |
| | | save_l2_data(code, datas, add_datas) |
| | | finally: |
| | | if code in cls.unreal_buy_dict: |
| | | cls.unreal_buy_dict.pop(code) |
| | | |
| | | # 处理未挂单 |
| | | @classmethod |
| | | def process_not_order(cls, code, add_datas): |
| | | |
| | | |
| | | # 处理已挂单 |
| | | @classmethod |
| | | def process_order(cls, code, add_datas): |
| | | # 获取之前是否有记录的撤买信号 |
| | | cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code) |
| | | buy_index, buy_num = cls.get_order_begin_pos(code) |
| | | if cancel_index is None: |
| | | # 无撤单信号起始点记录 |
| | | cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3) |
| | | buy_num_for_cancel = 0 |
| | | computed_index=buy_index |
| | | if cancel_index is not None: |
| | | # 获取阈值 有买撤信号,统计撤买纯买额 |
| | | threshold_money=10000000 |
| | | cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money) |
| | | else: |
| | | # 无买撤信号,终止执行 |
| | | pass |
| | | |
| | | #开始计算撤的信号 |
| | | @classmethod |
| | | def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money): |
| | | # sure_type 0-虚拟挂买位 1-真实挂买位 |
| | | computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money) |
| | | total_datas = local_today_datas[code] |
| | | if computed_index is not None: |
| | | # 发出撤买信号,需要撤买 |
| | | if cls.unreal_buy_dict.get(code) is not None: |
| | | # 有虚拟下单 |
| | | # 删除虚拟下单标记 |
| | | cls.unreal_buy_dict.pop(code) |
| | | # TODO 删除下单标记位置 |
| | | pass |
| | | else: |
| | | # 无虚拟下单,需要执行撤单 |
| | | logger_l2_trade.info( |
| | | "执行撤销:{} - {}".format(code, json.dumps(total_datas[computed_index]))) |
| | | try: |
| | | trade_manager.start_cancel_buy(code) |
| | | # 取消买入标识 |
| | | TradePointManager.delete_buy_point(code) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | | |
| | | if computed_index < len(local_today_datas[code])-1: |
| | | # TODO数据尚未处理完,重新进入下单计算流程 |
| | | cls.start_compute_buy(code,computed_index+1,0,threshold_money) |
| | | pass |
| | | else: |
| | | #无需撤买,记录撤买信号 |
| | | TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index) |
| | | # 判断是否有虚拟下单 |
| | | unreal_buy_info=cls.unreal_buy_dict.get(code) |
| | | if unreal_buy_info is not None: |
| | | # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) |
| | | # 真实下单 |
| | | logger_l2_trade.info( |
| | | "执行买入:{} ".format(code)) |
| | | try: |
| | | trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]], |
| | | unreal_buy_info[0]) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | | pass |
| | | else: |
| | | #终止执行 |
| | | pass |
| | | |
| | | |
| | | |
| | | @classmethod |
| | | def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money): |
| | | total_datas=local_today_datas[code] |
| | | # 获取买入信号计算起始位置 |
| | | index, num = cls.get_order_begin_pos(code) |
| | | # 是否为新获取到的位置 |
| | | new_get_pos = False |
| | | if index is None: |
| | | # 有买入信号 |
| | | has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3) |
| | | if has_single: |
| | | num = 0 |
| | | new_get_pos = True |
| | | # TODO 记录买入信号位置 |
| | | if index is None: |
| | | # 未获取到买入信号,终止程序 |
| | | return None |
| | | |
| | | |
| | | # 买入纯买额统计 |
| | | # TODO 获取阈值 |
| | | threshold_money=10000000 |
| | | compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money) |
| | | if compute_index is not None: |
| | | # 达到下单条件 |
| | | # 虚拟下单 |
| | | cls.unreal_buy_dict[code]=(compute_index,capture_time) |
| | | else: |
| | | # TODO 未达到下单条件,保存纯买额,设置纯买额 |
| | | |
| | | |
| | | pass |
| | | |
| | | |
| | | |
| | | # 获取下单起始信号 |
| | | @classmethod |
| | | def get_order_begin_pos(cls, code): |
| | | index, num = TradePointManager.get_buy_compute_start_data(code) |
| | | return index, num |
| | | |
| | | # 获取撤单起始位置 |
| | | @classmethod |
| | | def has_order_cancel_begin_pos(cls): |
| | | # cancel_index:撤单信号起点 |
| | | # buy_num_for_cancel:从挂入点计算的纯买额 |
| | | # computed_index 计算的最后位置 |
| | | cancel_index, buy_num_for_cancel,computed_index = TradePointManager.get_buy_cancel_compute_start_data(code) |
| | | return cancel_index, buy_num_for_cancel,computed_index |
| | | |
| | | # 计算下单起始信号 |
| | | # compute_data_count 用于计算的l2数据数量 |
| | | def compute_order_begin_pos(self, code, compute_data_count, continue_count): |
| | | # 倒数100条数据查询 |
| | | datas = local_today_datas[code] |
| | | __len = len(datas) |
| | | if __len < continue_count: |
| | | return None |
| | | start_index = 0 |
| | | if compute_data_count > __len: |
| | | compute_data_count = __len |
| | | |
| | | if __len > compute_data_count: |
| | | start_index = __len - compute_data_count |
| | | __time = None |
| | | _limit_up_count_1s = 0 |
| | | _limit_up_count_1s_start_index = -1 |
| | | |
| | | for i in range(start_index, __len - (continue_count - 1)): |
| | | _val = datas[i]["val"] |
| | | # 时间要>=09:30:00 |
| | | if L2DataUtil.get_time_as_second(_val["time"]) < second_930: |
| | | continue |
| | | |
| | | # 有连续4个涨停买就标记计算起始点 |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | index_0 = i |
| | | index_1 = -1 |
| | | index_2 = -1 |
| | | # index_3 = -1 |
| | | for j in range(index_0 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): |
| | | index_1 = j |
| | | break |
| | | |
| | | if index_1 > 0: |
| | | for j in range(index_1 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): |
| | | index_2 = j |
| | | break |
| | | # if index_2 > 0: |
| | | # for j in range(index_2 + 1, __len): |
| | | # # 涨停买 |
| | | # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: |
| | | # index_3 = j |
| | | if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 |
| | | logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i])) |
| | | return i |
| | | # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点 |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | # 涨停买 |
| | | if __time is None: |
| | | _time = datas[i]["val"]["time"] |
| | | _limit_up_count_1s = 1 |
| | | _limit_up_count_1s_start_index = i |
| | | elif _time == _val["time"]: |
| | | _limit_up_count_1s += 1 |
| | | else: |
| | | _time = datas[i]["val"]["time"] |
| | | _limit_up_count_1s = 1 |
| | | _limit_up_count_1s_start_index = i |
| | | elif _val["operateType"] == 1: |
| | | # 买撤 |
| | | _time = None |
| | | _limit_up_count_1s = 0 |
| | | _limit_up_count_1s_start_index = -1 |
| | | |
| | | if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: |
| | | logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i])) |
| | | return _limit_up_count_1s_start_index |
| | | |
| | | return None |
| | | |
| | | # 是否有撤销信号 |
| | | @classmethod |
| | | def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count): |
| | | datas = local_today_datas[code] |
| | | __len = len(datas) |
| | | if __len < continue_count: |
| | | return None |
| | | start_index = 0 |
| | | if compute_data_count > __len: |
| | | compute_data_count = __len |
| | | |
| | | if __len > compute_data_count: |
| | | start_index = __len - compute_data_count |
| | | for i in range(start_index, __len - (continue_count - 1)): |
| | | _val = datas[i]["val"] |
| | | if L2DataUtil.get_time_as_second(_val["time"]) < second_930: |
| | | continue |
| | | # 有连续3个买撤 |
| | | if L2DataUtil.is_limit_up_price_buy_cancel(_val): |
| | | index_0 = i |
| | | index_1 = -1 |
| | | index_2 = -1 |
| | | for j in range(index_0 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): |
| | | index_1 = j |
| | | break |
| | | |
| | | if index_1 > 0: |
| | | for j in range(index_1 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): |
| | | index_2 = j |
| | | break |
| | | if index_1 - index_0 == 1 and index_2 - index_1 == 1: |
| | | logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i]))) |
| | | return i |
| | | return None |
| | | |
| | | # 保存下单位置 |
| | | def save_order_pos(self): |
| | | pass |
| | | |
| | | # 是否可以下单 |
| | | def is_can_order(self): |
| | | pass |
| | | |
| | | # 虚拟下单 |
| | | def unreal_order(self): |
| | | pass |
| | | |
| | | # 设置虚拟挂买位 |
| | | def set_unreal_sure_order_pos(self): |
| | | pass |
| | | |
| | | # 获取预估挂买位 |
| | | @classmethod |
| | | def get_sure_order_pos(cls, code): |
| | | index, data = TradeBuyDataManager.get_buy_sure_position(code) |
| | | if index is None: |
| | | return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1] |
| | | else: |
| | | return 1, index, data |
| | | |
| | | # 统计买入净买量 |
| | | @classmethod |
| | | def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money): |
| | | total_datas = local_today_datas[code] |
| | | buy_nums = origin_num |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price is None: |
| | | raise Exception("涨停价无法获取") |
| | | threshold_num = threshold_money / (limit_up_price * 100) |
| | | for i in range(compute_start_index, len(total_datas)): |
| | | _val = total_datas[i]["val"] |
| | | # 有连续4个涨停买就标记计算起始点 |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | # 涨停买 |
| | | buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) |
| | | if buy_nums >= threshold_num: |
| | | return i, buy_nums |
| | | elif L2DataUtil.is_limit_up_price_buy_cancel(_val): |
| | | # 涨停买撤 |
| | | buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) |
| | | return None, buy_nums |
| | | |
| | | # 同一时间买入的概率计算 |
| | | @classmethod |
| | | def get_same_time_property(cls, code): |
| | | # TODO 与板块热度有关 |
| | | return 0.5 |
| | | |
| | | # 统计买撤净买量 |
| | | @classmethod |
| | | def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money): |
| | | buy_nums = origin_num |
| | | total_datas = local_today_datas[code] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price is None: |
| | | raise Exception("涨停价无法获取") |
| | | threshold_num = threshold_money / (limit_up_price * 100) |
| | | # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买 |
| | | sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code) |
| | | same_time_property = cls.get_same_time_property(code) |
| | | # 同一秒,在预估买入位之后的数据之和 |
| | | property_buy_num_count = 0 |
| | | for i in range(start_index, len(total_datas)): |
| | | data = total_datas[i] |
| | | _val = data["val"] |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | # 涨停买 |
| | | if i < sure_pos: |
| | | buy_nums += int(_val["num"]) * int(data["re"]) |
| | | elif sure_data["val"]["time"] == _val["time"]: |
| | | # 同一秒买入,而且还在预估买入位之后 |
| | | property_buy_num_count += int(_val["num"]) * int(data["re"]) |
| | | |
| | | elif L2DataUtil.is_limit_up_price_buy_cancel(_val): |
| | | # 涨停撤买 |
| | | # 判断买入位置是否在买入信号之前 |
| | | buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas) |
| | | if buy_index is not None: |
| | | # 找到买撤数据的买入点 |
| | | if buy_index < sure_pos: |
| | | buy_nums -= int(_val["num"]) * int(data["re"]) |
| | | elif sure_data["val"]["time"] == _val["time"]: |
| | | # 同一秒,而且还在预估买入位之后按概率计算 |
| | | property_buy_num_count -= int(_val["num"]) * int(data["re"]) |
| | | else: |
| | | # TODO 未找到买撤数据的买入点 |
| | | pass |
| | | |
| | | property_buy_num = round(property_buy_num_count * same_time_property) |
| | | if buy_nums + property_buy_num <= threshold_num: |
| | | return i, buy_nums + property_buy_num,sure_type |
| | | return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type |
| | | |
| | | |
| | | def process_data(code, datas, capture_timestamp): |
| | | now_time_str = datetime.now().strftime("%H:%M:%S") |
| | | __start_time = round(t.time() * 1000) |
| | | try: |
| | | if len(datas) > 0: |
| | | # 判断价格区间是否正确 |
| | | if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): |
| | | |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"])) |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, |
| | | "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) |
| | | # 加载历史数据 |
| | | load_l2_data(code) |
| | | # 纠正数据 |
| | |
| | | # 拼接数据 |
| | | local_today_datas[code].extend(add_datas) |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) |
| | | |
| | | total_datas = local_today_datas[code] |
| | | # 买入确认点处理 |
| | | TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1], |
| | | add_datas) |
| | | if len(add_datas) > 0: |
| | | latest_time = add_datas[len(add_datas) - 1]["val"]["time"] |
| | | # 时间差不能太大才能处理 |
| | | if __is_same_time(now_time_str, latest_time): |
| | |
| | | # 没有计算开始点 |
| | | c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3) |
| | | if c_index is not None: |
| | | total_datas = local_today_datas[code] |
| | | |
| | | logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index]))) |
| | | |
| | | # 触发数据分析 ,获取连续涨停标记数据 |
| | | buy_nums = 0 |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | last_data_index = -1 |
| | | for i in range(c_index, len(total_datas)): |
| | | _val = total_datas[i]["val"] |
| | | # 有连续4个涨停买就标记计算起始点 |
| | |
| | | elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: |
| | | # 涨停买撤 |
| | | buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) |
| | | if buy_nums * limit_up_price * 100 > 1000 * 10000: |
| | | last_data_index = i |
| | | break |
| | | |
| | | TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index) |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | if limit_up_price is not None: |
| | | if buy_nums * limit_up_price * 100 > 1000 * 10000: |
| | | if last_data_index > -1: |
| | | # 大于1000w就买 |
| | | logger_l2_trade.info( |
| | | "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1]))) |
| | | try: |
| | | trade_manager.start_buy(code) |
| | | trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index], |
| | | last_data_index) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | | else: |
| | | # 有计算开始点,计算新增的数据 |
| | | buy_nums = 0 |
| | | buy_nums = c_num |
| | | last_data = None |
| | | last_data_index = len(total_datas) - len(add_datas) - 1 |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | for data in add_datas: |
| | | last_data_index += 1 |
| | | _val = data["val"] |
| | | if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0: |
| | | # 涨停买 |
| | |
| | | elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: |
| | | # 涨停买撤 |
| | | buy_nums -= int(_val["num"]) * int(data["re"]) |
| | | if buy_nums * limit_up_price * 100 > 1000 * 10000: |
| | | last_data = data |
| | | break |
| | | |
| | | TradePointManager.set_buy_compute_start_data(code, buy_nums) |
| | | latest_num = c_num + buy_nums |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price is not None: |
| | | if latest_num * limit_up_price * 100 > 1000 * 10000: |
| | | if last_data is not None: |
| | | # 大于1000w就买 |
| | | logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1]))) |
| | | try: |
| | | trade_manager.start_buy(code) |
| | | trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # 删除大数据 |
| | | redis = redis_manager.RedisManager(1).getRedis() |
| | | keys = redis.keys("big_data*") |
| | | for key in keys: |
| | | redis.delete(key) |
| | | # print("big_data-{}-{}".format("123", int(round(t.time() * 1000)))) |
| | | # load_l2_data("002868") |
| | | # keys= local_today_num_operate_map["002868"] |
| | | # for k in keys: |
| | | # print(len( local_today_num_operate_map["002868"][k])) |
| | | # pass |
| | | # __set_buy_compute_start_data("000000", 100, 1) |
| | | # __set_buy_compute_start_data("000000", 100) |
| | | # __set_l2_data_latest_count("000333", 20) |
| | | # print(type(get_l2_data_latest_count("000333"))) |
| | | # datas = ["2", "3", "4", "5"] |
| | | # print(datas[4:]) |
| | | # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)) |
| | | |
| | | # 获取增量数据 |
| | | # 保存数据 |
| | | # 拼接数据 |
| | | code = "000868" |
| | | local_today_datas.setdefault(code, []) |
| | | path = "C:/Users/Administrator/Desktop/demo/000868/" |
| | | for file_name in os.listdir(path): |
| | | p = "{}{}".format(path, file_name) |
| | | f = open(p) |
| | | for line in f.readlines(): # 依次读取每行 |
| | | line = line.strip() |
| | | data = json.loads(line) |
| | | result = __format_l2_data(data, code, 10.00) |
| | | add_datas = get_add_data(code, result) |
| | | print("增加的数量:", len(add_datas)) |
| | | if len(add_datas) > 0: |
| | | # 拼接数据 |
| | | local_today_datas[code].extend(add_datas) |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = result |
| | | else: |
| | | local_latest_datas.setdefault(code, result) |
| | | f.close() |
| | | for d in local_today_datas[code]: |
| | | print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"]) |