| | |
| | | import gpcode_manager |
| | | import mongo_data |
| | | |
| | | |
| | | # 统计今日卖出 |
| | | # 统计今日买入 |
| | | import tool |
| | |
| | | code = data["code"] |
| | | trade_data = data["data"] |
| | | return code, trade_data |
| | | |
| | | |
| | | # 代码对应的价格是否正确 |
| | | def is_same_code_with_price(code, price): |
| | |
| | | mongo_data.save("ths-zylt", _list) |
| | | |
| | | |
| | | def saveClientActive(client_id, host): |
| | | def saveClientActive(client_id, host, thsDead): |
| | | if client_id <= 0: |
| | | return |
| | | redis = __redisManager.getRedis(); |
| | | redis.setex("client-active-{}".format(client_id), 10, host) |
| | | redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead))) |
| | | |
| | | |
| | | def getValidL2Clients(): |
| | |
| | | return list(set(client_ids).intersection(set(l2_clients))) |
| | | |
| | | |
| | | # 获取客户端IP |
| | | def getActiveClientIP(client_id): |
| | | redis = __redisManager.getRedis(); |
| | | return redis.get("client-active-{}".format(client_id)) |
| | | val = redis.get("client-active-{}".format(client_id)) |
| | | if val is None: |
| | | return None |
| | | val=json.loads(val) |
| | | return val[0] |
| | | |
| | | |
| | | # 获取客户端同花顺状态 |
| | | def getTHSState(client_id): |
| | | redis = __redisManager.getRedis(); |
| | | val = redis.get("client-active-{}".format(client_id)) |
| | | if val is None: |
| | | return None |
| | | val = json.loads(val) |
| | | return val[1] |
| | | |
| | | |
| | | # 保存量能 |
| | |
| | | print("refresh-l2-data") |
| | | for client_id in code_sv_map: |
| | | ip = data_process.getActiveClientIP(client_id) |
| | | ths_dead=data_process.getTHSState(client_id) |
| | | if ip is not None and len(ip) > 0: |
| | | if ths_dead: |
| | | client_state[client_id].configure(text="(在线:{})".format(ip), foreground="#FF7F27") |
| | | else: |
| | | client_state[client_id].configure(text="(在线:{})".format(ip), foreground="#008000") |
| | | else: |
| | | client_state[client_id].configure(text="(离线:未知IP)", foreground="#999999") |
| | |
| | | |
| | | # 保存最新价 |
| | | symbol = symbol.split(".")[1] |
| | | logger_juejin_tick.info("{} {} {}".format(symbol, price, tick["created_at"])) |
| | | |
| | | accpt_price(symbol, price) |
| | | __prices_now[symbol] = price |
| | | |
| | |
| | | if pricePre is not None: |
| | | rate = round((price - pricePre) * 100 / pricePre, 1) |
| | | if rate >= 7: |
| | | print(code, price, rate) |
| | | logger_juejin_tick.info("{}-{}-{}",code, price, rate) |
| | | if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate( |
| | | code) and not gpcode_manager.is_listen_full(): |
| | | L2CodeOperate.get_instance().add_operate(1, code) |
| | | L2CodeOperate.get_instance().add_operate(1, code,"现价变化") |
| | | # 进入监控 |
| | | elif rate < 5: |
| | | # 移除监控 |
| | | if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code): |
| | | L2CodeOperate.get_instance().add_operate(0, code) |
| | | L2CodeOperate.get_instance().add_operate(0, code,"现价变化") |
| | | |
| | | |
| | | def on_bar(context, bars): |
| | |
| | | print("读取操作队列", data, redis.llen("code_operate_queue")) |
| | | if data is not None: |
| | | data = json.loads(data) |
| | | logger_code_operate.info("读取操作队列:{}", data) |
| | | type, code = data["type"], data["code"] |
| | | |
| | | if type == 0: |
| | | # 是否在固定库 |
| | | if l2_data_manager.is_in_l2_fixed_codes(code): |
| | |
| | | logging.exception(e) |
| | | print("发送操作异常:", str(e)) |
| | | |
| | | def add_operate(self, type, code, client=None, pos=None): |
| | | def add_operate(self, type, code, msg="", client=None, pos=None): |
| | | redis = self.redis_manager_.getRedis() |
| | | print("add_operate", type, code) |
| | | redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code, "client": client, "pos": pos})) |
| | | redis.rpush("code_operate_queue", |
| | | json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos})) |
| | | |
| | | def repaire_operate(self, client, pos, code): |
| | | # 如果本来该位置代码为空则不用修复 |
| | |
| | | if code_ == "" or code_ is None: |
| | | return |
| | | logger_code_operate.info("客户端位置代码修复:client-{},pos-{},code-{}", client, pos, code) |
| | | |
| | | redis = self.redis_manager_.getRedis() |
| | | redis.rpush("code_operate_queue", json.dumps({"type": 2, "client": client, "pos": pos, "code": code})) |
| | | self.add_operate(2, code, "", client, pos) |
| | | |
| | | # 修复l2的数据错误 |
| | | def repaire_l2_data(self, code): |
| | |
| | | redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data})) |
| | | |
| | | # 移除监控 |
| | | def remove_l2_listen(self, code): |
| | | def remove_l2_listen(self, code, msg): |
| | | # 是否正在监听 |
| | | if gpcode_manager.is_listen(code): |
| | | self.add_operate(0, code) |
| | | self.add_operate(0, code, msg=msg) |
| | | |
| | | # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致 |
| | | @classmethod |
| | | def set_operate_code_state(cls, client_id, channel, state): |
| | | cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), tool.get_expire(), state) |
| | | cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), 10, state) |
| | | |
| | | def get_operate_code_state(self, client_id, channel): |
| | | value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel)) |
| | |
| | | data = json.loads(result["data"]) |
| | | codes = data["data"] |
| | | result_list = {} |
| | | if codes is not None: |
| | | for d in codes: |
| | | result_list[d["index"]]=d["code"] |
| | | return result_list |
| | | else: |
| | | raise Exception("获取客户端监听代码出错") |
| | | raise Exception("获取客户端监听代码出错:{}".format(result)) |
| | | |
| | | |
| | | # 矫正客户端代码 |
| | |
| | | for index in range(0, 8): |
| | | code = gpcode_manager.get_listen_code_by_pos(client_id, index) |
| | | if code is not None and len(code) > 0 and index_codes.get(index) != code: |
| | | # 修复代码 |
| | | # 交易时间才修复代码 |
| | | if tool.is_trade_time(): |
| | | L2CodeOperate().repaire_operate(client_id, index, code) |
| | | elif code is None or len(code) == 0 and index_codes.get(index) is not None: |
| | | # 删除前端代码位 |
| | | L2CodeOperate().add_operate(4, "", client_id, index) |
| | | # L2CodeOperate().add_operate(4, "", client_id, index) |
| | | pass |
| | | except Exception as e: |
| | | logger_code_operate.error("client:{} msg:{}".format(client_id, str(e))) |
| | | |
| | |
| | | import decimal |
| | | import json |
| | | import os |
| | | import time as t |
| | | from datetime import datetime |
| | | |
| | | import data_process |
| | | import l2_data_util |
| | | import mysql |
| | | |
| | | import gpcode_manager |
| | | import mongo_data |
| | | |
| | | import redis_manager |
| | | import tool |
| | | import trade_manager |
| | | from log import logger_l2_trade |
| | | from trade_data_manager import TradeBuyDataManager |
| | | |
| | | _redisManager = redis_manager.RedisManager(1) |
| | | # l2数据管理 |
| | |
| | | @staticmethod |
| | | def get_buy_cancel_compute_start_data(code): |
| | | redis = TradePointManager.__get_redis() |
| | | index = redis.get("buy_cancel_compute_index-{}".format(code)) |
| | | total_num = redis.get("buy_cancel_compute_num-{}".format(code)) |
| | | if index is None: |
| | | return None, 0 |
| | | info = redis.get("buy_cancel_compute_info-{}".format(code)) |
| | | if info is None: |
| | | return None, None , None |
| | | else: |
| | | return int(index), int(total_num) |
| | | info=json.loads(info) |
| | | return info[0],info[1],info[2] |
| | | |
| | | # 设置买撤点信息 |
| | | @staticmethod |
| | | def set_buy_cancel_compute_start_data(code, num_add, index=None): |
| | | # buy_num 纯买额 computed_index计算到的下标 index撤买信号起点 |
| | | |
| | | @classmethod |
| | | def set_buy_cancel_compute_start_data(cls,code, buy_num,computed_index, index): |
| | | redis = TradePointManager.__get_redis() |
| | | expire = tool.get_expire() |
| | | if index is not None: |
| | | redis.setex("buy_cancel_compute_index-{}".format(code), expire, index) |
| | | key = "buy_cancel_compute_num-{}".format(code) |
| | | if redis.get(key) is None: |
| | | redis.setex(key, expire, num_add) |
| | | else: |
| | | redis.incrby(key, num_add) |
| | | redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index,buy_num,computed_index))) |
| | | |
| | | # 增加撤买的纯买额 |
| | | @classmethod |
| | | def add_buy_nums_for_cancel(cls,code,num_add,computed_index): |
| | | cancel_index,nums,c_index= cls.get_buy_cancel_compute_start_data(code) |
| | | if cancel_index is None: |
| | | raise Exception("无撤买信号记录") |
| | | nums+=num_add |
| | | cls.set_buy_cancel_compute_start_data(code,nums,computed_index) |
| | | |
| | | |
| | | def load_l2_data(code, force=False): |
| | |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force) |
| | | |
| | | |
| | | def saveL2Data(code, datas): |
| | | def saveL2Data(code, datas, msg=""): |
| | | start_time = round(t.time() * 1000) |
| | | # 查询票是否在待监听的票里面 |
| | | if not gpcode_manager.is_in_gp_pool(code): |
| | | return None |
| | |
| | | |
| | | # 计算保留的时间 |
| | | expire = tool.get_expire() |
| | | index = 0 |
| | | start_index = redis_instance.get("l2-maxindex-{}".format(code)) |
| | | if start_index is None: |
| | | start_index = 0 |
| | | start_index = -1 |
| | | else: |
| | | start_index = int(start_index) |
| | | max_index = start_index |
| | | i = 0 |
| | | for _data in datas: |
| | | index = index + 1 |
| | | |
| | | i += 1 |
| | | key = "l2-" + _data["key"] |
| | | value = redis_instance.get(key) |
| | | if value is None: |
| | | # 新增 |
| | | max_index = start_index + index |
| | | value = {"index": start_index + index, "re": _data['re']} |
| | | max_index = start_index + i |
| | | value = {"index": start_index + i, "re": _data["re"]} |
| | | redis_instance.setex(key, expire, json.dumps(value)) |
| | | else: |
| | | json_value = json.loads(value) |
| | |
| | | finally: |
| | | redis_instance.delete("l2-save-{}".format(code)) |
| | | |
| | | print("保存新数据用时:", msg, round(t.time() * 1000) - start_time) |
| | | return datas |
| | | |
| | | |
| | | # TODO 获取l2的数据 |
| | | def get_l2_data_index(code, key): |
| | | pass |
| | | |
| | | |
| | | def parseL2Data(str): |
| | | now = int(t.time()) |
| | | day = datetime.now().strftime("%Y%m%d") |
| | | dict = json.loads(str) |
| | | data = dict["data"] |
| | | client = dict["client"] |
| | | code = data["code"] |
| | | channel = data["channel"] |
| | | capture_time = data["captureTime"] |
| | | process_time = data["processTime"] |
| | | data = data["data"] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | datas = L2DataUtil.format_l2_data(data, code, limit_up_price) |
| | | # 获取涨停价 |
| | | return day, client, channel, code, capture_time, process_time, datas |
| | | |
| | | |
| | | # 保存l2数据 |
| | | def save_l2_data(code, datas, add_datas): |
| | | redis = _redisManager.getRedis() |
| | | # 保存最近的数据 |
| | | redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) |
| | | # 设置进内存 |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = datas |
| | | else: |
| | | local_latest_datas.setdefault(code, datas) |
| | | __set_l2_data_latest_count(code, len(datas)) |
| | | if len(add_datas) > 0: |
| | | saveL2Data(code, add_datas) |
| | | |
| | | |
| | | class L2DataUtil: |
| | | @classmethod |
| | | def is_same_time(cls, time1, time2): |
| | | # TODO 测试 |
| | | # if 1 > 0: |
| | | # return True |
| | | time1_s = time1.split(":") |
| | | time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) |
| | | time2_s = time2.split(":") |
| | | time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) |
| | | if abs(time2_second - time1_second) < 3: |
| | | return True |
| | | else: |
| | | return False |
| | | |
| | | # 获取增量数据 |
| | | @classmethod |
| | | def get_add_data(cls, code, datas, _start_index): |
| | | if datas is not None and len(datas) < 1: |
| | | return [] |
| | | last_key = "" |
| | | __latest_datas = local_latest_datas.get(code) |
| | | if __latest_datas is not None and len(__latest_datas) > 0: |
| | | last_key = __latest_datas[-1]["key"] |
| | | count = 0 |
| | | start_index = -1 |
| | | # 如果原来没有数据 |
| | | # TODO 设置add_data的序号 |
| | | for n in reversed(datas): |
| | | count += 1 |
| | | if n["key"] == last_key: |
| | | start_index = len(datas) - count |
| | | break |
| | | |
| | | _add_datas = [] |
| | | if len(last_key) > 0: |
| | | if start_index < 0 or start_index + 1 >= len(datas): |
| | | _add_datas = [] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | else: |
| | | _add_datas = datas[start_index + 1:] |
| | | for i in range(0, len(_add_datas)): |
| | | _add_datas[i]["index"] = _start_index + i |
| | | |
| | | return _add_datas |
| | | |
| | | # 纠正数据,将re字段替换为较大值 |
| | | @classmethod |
| | | def correct_data(cls, code, _datas): |
| | | latest_data = local_latest_datas.get(code) |
| | | if latest_data is None: |
| | | latest_data = [] |
| | | save_list = [] |
| | | for data in _datas: |
| | | for _ldata in latest_data: |
| | | if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: |
| | | max_re = max(_ldata["re"], data["re"]) |
| | | _ldata["re"] = max_re |
| | | data["re"] = max_re |
| | | # 保存到数据库,更新re的数据 |
| | | save_list.append(_ldata) |
| | | if len(save_list) > 0: |
| | | saveL2Data(code, save_list, "保存纠正数据") |
| | | return _datas |
| | | |
| | | # 处理l2数据 |
| | | @classmethod |
| | | def format_l2_data(cls, data, code, limit_up_price): |
| | | datas = [] |
| | | dataIndexs = {} |
| | | |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | same_time_num = {} |
| | | for item in data: |
| | | # 解析数据 |
| | |
| | | # 数据重复次数默认为1 |
| | | datas.append({"key": key, "val": item, "re": 1}) |
| | | dataIndexs.setdefault(key, len(datas) - 1) |
| | | for key in same_time_num: |
| | | if same_time_num[key] > 50: |
| | | # 只能保存近3s的数据 |
| | | ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"]) |
| | | ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S")) |
| | | if abs(ts1 - ts_now) <= 3: |
| | | # TODO 保存数据 |
| | | redis = _redisManager.getRedis() |
| | | redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str) |
| | | l2_data_util.save_big_data(code, same_time_num, data) |
| | | return datas |
| | | |
| | | return day, client, channel, code, datas |
| | | @classmethod |
| | | def get_time_as_second(time_str): |
| | | ts = time_str.split(":") |
| | | return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | |
| | | |
| | | # 纠正数据,将re字段替换为较大值 |
| | | def correct_data(code, _datas): |
| | | latest_data = local_latest_datas.get(code) |
| | | if latest_data is None: |
| | | latest_data = [] |
| | | |
| | | for data in _datas: |
| | | for _ldata in latest_data: |
| | | if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]: |
| | | max_re = max(_ldata["re"], data["re"]) |
| | | _ldata["re"] = max_re |
| | | data["re"] = max_re |
| | | return _datas |
| | | |
| | | |
| | | # 保存l2数据 |
| | | def save_l2_data(code, datas, add_datas): |
| | | redis = _redisManager.getRedis() |
| | | # 保存最近的数据 |
| | | redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas)) |
| | | # 设置进内存 |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = datas |
| | | else: |
| | | local_latest_datas.setdefault(code, datas) |
| | | __set_l2_data_latest_count(code, len(datas)) |
| | | if len(add_datas) > 0: |
| | | saveL2Data(code, add_datas) |
| | | |
| | | |
| | | # 获取增量数据 |
| | | def get_add_data(code, datas): |
| | | if datas is not None and len(datas) < 1: |
| | | return [] |
| | | last_key = "" |
| | | __latest_datas = local_latest_datas.get(code) |
| | | if __latest_datas is not None and len(__latest_datas) > 0: |
| | | last_key = __latest_datas[-1]["key"] |
| | | count = 0 |
| | | start_index = -1 |
| | | # 如果原来没有数据 |
| | | |
| | | for n in reversed(datas): |
| | | count += 1 |
| | | if n["key"] == last_key: |
| | | start_index = len(datas) - count |
| | | break |
| | | if len(last_key) > 0: |
| | | if start_index < 0 or start_index + 1 >= len(datas): |
| | | return [] |
| | | else: |
| | | return datas[start_index + 1:] |
| | | else: |
| | | return datas[start_index + 1:] |
| | | |
| | | |
| | | def __is_same_time(time1, time2): |
| | | # TODO 测试 |
| | | # if 1 > 0: |
| | | # return True |
| | | time1_s = time1.split(":") |
| | | time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2]) |
| | | time2_s = time2.split(":") |
| | | time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2]) |
| | | if abs(time2_second - time1_second) < 3: |
| | | return True |
| | | else: |
| | | # 是否是涨停价买 |
| | | def is_limit_up_price_buy(val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 0: |
| | | return False |
| | | |
| | | def process_data(code, datas): |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | if price * num * 100 < 50 * 10000: |
| | | return False |
| | | return True |
| | | |
| | | # 是否涨停买撤 |
| | | def is_limit_up_price_buy_cancel(val): |
| | | if int(val["limitPrice"]) != 1: |
| | | return False |
| | | |
| | | if int(val["operateType"]) != 1: |
| | | return False |
| | | |
| | | price = float(val["price"]) |
| | | num = int(val["num"]) |
| | | if price * num * 100 < 50 * 10000: |
| | | return False |
| | | return True |
| | | |
| | | |
| | | # L2交易数据处理器 |
| | | class L2TradeDataProcessor: |
| | | unreal_buy_dict = {} |
| | | |
| | | @classmethod |
| | | # 数据处理入口 |
| | | # datas: 本次截图数据 |
| | | # capture_timestamp:截图时间戳 |
| | | def process(cls, code, datas, capture_timestamp): |
| | | now_time_str = datetime.now().strftime("%H:%M:%S") |
| | | __start_time = round(t.time() * 1000) |
| | | try: |
| | | if len(datas) > 0: |
| | | # 判断价格区间是否正确 |
| | | if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, |
| | | "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) |
| | | # 加载历史数据 |
| | | load_l2_data(code) |
| | | # 纠正数据 |
| | | datas = L2DataUtil.correct_data(code, datas) |
| | | _start_index = 0 |
| | | if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0: |
| | | _start_index = local_today_datas[code][-1]["index"] |
| | | add_datas = L2DataUtil.get_add_data(code, datas, _start_index) |
| | | if len(add_datas) > 0: |
| | | # 拼接数据 |
| | | local_today_datas[code].extend(add_datas) |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) |
| | | total_datas = local_today_datas[code] |
| | | # 买入确认点处理 |
| | | TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, |
| | | total_datas[-1], |
| | | add_datas) |
| | | if len(add_datas) > 0: |
| | | latest_time = add_datas[len(add_datas) - 1]["val"]["time"] |
| | | # 时间差不能太大才能处理 |
| | | if L2DataUtil.is_same_time(now_time_str, latest_time): |
| | | # 判断是否已经挂单 |
| | | state = trade_manager.get_trade_state(code) |
| | | if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: |
| | | # 已挂单 |
| | | cls.process_order(code, add_datas) |
| | | else: |
| | | # 未挂单 |
| | | cls.process_not_order(code, add_datas) |
| | | # 保存数据 |
| | | save_l2_data(code, datas, add_datas) |
| | | finally: |
| | | if code in cls.unreal_buy_dict: |
| | | cls.unreal_buy_dict.pop(code) |
| | | |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"])) |
| | | # 处理未挂单 |
| | | @classmethod |
| | | def process_not_order(cls, code, add_datas): |
| | | |
| | | |
| | | # 处理已挂单 |
| | | @classmethod |
| | | def process_order(cls, code, add_datas): |
| | | # 获取之前是否有记录的撤买信号 |
| | | cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code) |
| | | buy_index, buy_num = cls.get_order_begin_pos(code) |
| | | if cancel_index is None: |
| | | # 无撤单信号起始点记录 |
| | | cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3) |
| | | buy_num_for_cancel = 0 |
| | | computed_index=buy_index |
| | | if cancel_index is not None: |
| | | # 获取阈值 有买撤信号,统计撤买纯买额 |
| | | threshold_money=10000000 |
| | | cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money) |
| | | else: |
| | | # 无买撤信号,终止执行 |
| | | pass |
| | | |
| | | #开始计算撤的信号 |
| | | @classmethod |
| | | def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money): |
| | | # sure_type 0-虚拟挂买位 1-真实挂买位 |
| | | computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money) |
| | | total_datas = local_today_datas[code] |
| | | if computed_index is not None: |
| | | # 发出撤买信号,需要撤买 |
| | | if cls.unreal_buy_dict.get(code) is not None: |
| | | # 有虚拟下单 |
| | | # 删除虚拟下单标记 |
| | | cls.unreal_buy_dict.pop(code) |
| | | # TODO 删除下单标记位置 |
| | | pass |
| | | else: |
| | | # 无虚拟下单,需要执行撤单 |
| | | logger_l2_trade.info( |
| | | "执行撤销:{} - {}".format(code, json.dumps(total_datas[computed_index]))) |
| | | try: |
| | | trade_manager.start_cancel_buy(code) |
| | | # 取消买入标识 |
| | | TradePointManager.delete_buy_point(code) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | | |
| | | if computed_index < len(local_today_datas[code])-1: |
| | | # TODO数据尚未处理完,重新进入下单计算流程 |
| | | cls.start_compute_buy(code,computed_index+1,0,threshold_money) |
| | | pass |
| | | else: |
| | | #无需撤买,记录撤买信号 |
| | | TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index) |
| | | # 判断是否有虚拟下单 |
| | | unreal_buy_info=cls.unreal_buy_dict.get(code) |
| | | if unreal_buy_info is not None: |
| | | # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间) |
| | | # 真实下单 |
| | | logger_l2_trade.info( |
| | | "执行买入:{} ".format(code)) |
| | | try: |
| | | trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]], |
| | | unreal_buy_info[0]) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | | pass |
| | | else: |
| | | #终止执行 |
| | | pass |
| | | |
| | | |
| | | |
| | | @classmethod |
| | | def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money): |
| | | total_datas=local_today_datas[code] |
| | | # 获取买入信号计算起始位置 |
| | | index, num = cls.get_order_begin_pos(code) |
| | | # 是否为新获取到的位置 |
| | | new_get_pos = False |
| | | if index is None: |
| | | # 有买入信号 |
| | | has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3) |
| | | if has_single: |
| | | num = 0 |
| | | new_get_pos = True |
| | | # TODO 记录买入信号位置 |
| | | if index is None: |
| | | # 未获取到买入信号,终止程序 |
| | | return None |
| | | |
| | | |
| | | # 买入纯买额统计 |
| | | # TODO 获取阈值 |
| | | threshold_money=10000000 |
| | | compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money) |
| | | if compute_index is not None: |
| | | # 达到下单条件 |
| | | # 虚拟下单 |
| | | cls.unreal_buy_dict[code]=(compute_index,capture_time) |
| | | else: |
| | | # TODO 未达到下单条件,保存纯买额,设置纯买额 |
| | | |
| | | |
| | | pass |
| | | |
| | | |
| | | |
| | | # 获取下单起始信号 |
| | | @classmethod |
| | | def get_order_begin_pos(cls, code): |
| | | index, num = TradePointManager.get_buy_compute_start_data(code) |
| | | return index, num |
| | | |
| | | # 获取撤单起始位置 |
| | | @classmethod |
| | | def has_order_cancel_begin_pos(cls): |
| | | # cancel_index:撤单信号起点 |
| | | # buy_num_for_cancel:从挂入点计算的纯买额 |
| | | # computed_index 计算的最后位置 |
| | | cancel_index, buy_num_for_cancel,computed_index = TradePointManager.get_buy_cancel_compute_start_data(code) |
| | | return cancel_index, buy_num_for_cancel,computed_index |
| | | |
| | | # 计算下单起始信号 |
| | | # compute_data_count 用于计算的l2数据数量 |
| | | def compute_order_begin_pos(self, code, compute_data_count, continue_count): |
| | | # 倒数100条数据查询 |
| | | datas = local_today_datas[code] |
| | | __len = len(datas) |
| | | if __len < continue_count: |
| | | return None |
| | | start_index = 0 |
| | | if compute_data_count > __len: |
| | | compute_data_count = __len |
| | | |
| | | if __len > compute_data_count: |
| | | start_index = __len - compute_data_count |
| | | __time = None |
| | | _limit_up_count_1s = 0 |
| | | _limit_up_count_1s_start_index = -1 |
| | | |
| | | for i in range(start_index, __len - (continue_count - 1)): |
| | | _val = datas[i]["val"] |
| | | # 时间要>=09:30:00 |
| | | if L2DataUtil.get_time_as_second(_val["time"]) < second_930: |
| | | continue |
| | | |
| | | # 有连续4个涨停买就标记计算起始点 |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | index_0 = i |
| | | index_1 = -1 |
| | | index_2 = -1 |
| | | # index_3 = -1 |
| | | for j in range(index_0 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): |
| | | index_1 = j |
| | | break |
| | | |
| | | if index_1 > 0: |
| | | for j in range(index_1 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]): |
| | | index_2 = j |
| | | break |
| | | # if index_2 > 0: |
| | | # for j in range(index_2 + 1, __len): |
| | | # # 涨停买 |
| | | # if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0: |
| | | # index_3 = j |
| | | if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1 |
| | | logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i])) |
| | | return i |
| | | # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点 |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | # 涨停买 |
| | | if __time is None: |
| | | _time = datas[i]["val"]["time"] |
| | | _limit_up_count_1s = 1 |
| | | _limit_up_count_1s_start_index = i |
| | | elif _time == _val["time"]: |
| | | _limit_up_count_1s += 1 |
| | | else: |
| | | _time = datas[i]["val"]["time"] |
| | | _limit_up_count_1s = 1 |
| | | _limit_up_count_1s_start_index = i |
| | | elif _val["operateType"] == 1: |
| | | # 买撤 |
| | | _time = None |
| | | _limit_up_count_1s = 0 |
| | | _limit_up_count_1s_start_index = -1 |
| | | |
| | | if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1: |
| | | logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i])) |
| | | return _limit_up_count_1s_start_index |
| | | |
| | | return None |
| | | |
| | | # 是否有撤销信号 |
| | | @classmethod |
| | | def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count): |
| | | datas = local_today_datas[code] |
| | | __len = len(datas) |
| | | if __len < continue_count: |
| | | return None |
| | | start_index = 0 |
| | | if compute_data_count > __len: |
| | | compute_data_count = __len |
| | | |
| | | if __len > compute_data_count: |
| | | start_index = __len - compute_data_count |
| | | for i in range(start_index, __len - (continue_count - 1)): |
| | | _val = datas[i]["val"] |
| | | if L2DataUtil.get_time_as_second(_val["time"]) < second_930: |
| | | continue |
| | | # 有连续3个买撤 |
| | | if L2DataUtil.is_limit_up_price_buy_cancel(_val): |
| | | index_0 = i |
| | | index_1 = -1 |
| | | index_2 = -1 |
| | | for j in range(index_0 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): |
| | | index_1 = j |
| | | break |
| | | |
| | | if index_1 > 0: |
| | | for j in range(index_1 + 1, __len): |
| | | # 涨停买 |
| | | if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]): |
| | | index_2 = j |
| | | break |
| | | if index_1 - index_0 == 1 and index_2 - index_1 == 1: |
| | | logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i]))) |
| | | return i |
| | | return None |
| | | |
| | | # 保存下单位置 |
| | | def save_order_pos(self): |
| | | pass |
| | | |
| | | # 是否可以下单 |
| | | def is_can_order(self): |
| | | pass |
| | | |
| | | # 虚拟下单 |
| | | def unreal_order(self): |
| | | pass |
| | | |
| | | # 设置虚拟挂买位 |
| | | def set_unreal_sure_order_pos(self): |
| | | pass |
| | | |
| | | # 获取预估挂买位 |
| | | @classmethod |
| | | def get_sure_order_pos(cls, code): |
| | | index, data = TradeBuyDataManager.get_buy_sure_position(code) |
| | | if index is None: |
| | | return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1] |
| | | else: |
| | | return 1, index, data |
| | | |
| | | # 统计买入净买量 |
| | | @classmethod |
| | | def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money): |
| | | total_datas = local_today_datas[code] |
| | | buy_nums = origin_num |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price is None: |
| | | raise Exception("涨停价无法获取") |
| | | threshold_num = threshold_money / (limit_up_price * 100) |
| | | for i in range(compute_start_index, len(total_datas)): |
| | | _val = total_datas[i]["val"] |
| | | # 有连续4个涨停买就标记计算起始点 |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | # 涨停买 |
| | | buy_nums += int(_val["num"]) * int(total_datas[i]["re"]) |
| | | if buy_nums >= threshold_num: |
| | | return i, buy_nums |
| | | elif L2DataUtil.is_limit_up_price_buy_cancel(_val): |
| | | # 涨停买撤 |
| | | buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) |
| | | return None, buy_nums |
| | | |
| | | # 同一时间买入的概率计算 |
| | | @classmethod |
| | | def get_same_time_property(cls, code): |
| | | # TODO 与板块热度有关 |
| | | return 0.5 |
| | | |
| | | # 统计买撤净买量 |
| | | @classmethod |
| | | def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money): |
| | | buy_nums = origin_num |
| | | total_datas = local_today_datas[code] |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price is None: |
| | | raise Exception("涨停价无法获取") |
| | | threshold_num = threshold_money / (limit_up_price * 100) |
| | | # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买 |
| | | sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code) |
| | | same_time_property = cls.get_same_time_property(code) |
| | | # 同一秒,在预估买入位之后的数据之和 |
| | | property_buy_num_count = 0 |
| | | for i in range(start_index, len(total_datas)): |
| | | data = total_datas[i] |
| | | _val = data["val"] |
| | | if L2DataUtil.is_limit_up_price_buy(_val): |
| | | # 涨停买 |
| | | if i < sure_pos: |
| | | buy_nums += int(_val["num"]) * int(data["re"]) |
| | | elif sure_data["val"]["time"] == _val["time"]: |
| | | # 同一秒买入,而且还在预估买入位之后 |
| | | property_buy_num_count += int(_val["num"]) * int(data["re"]) |
| | | |
| | | elif L2DataUtil.is_limit_up_price_buy_cancel(_val): |
| | | # 涨停撤买 |
| | | # 判断买入位置是否在买入信号之前 |
| | | buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas) |
| | | if buy_index is not None: |
| | | # 找到买撤数据的买入点 |
| | | if buy_index < sure_pos: |
| | | buy_nums -= int(_val["num"]) * int(data["re"]) |
| | | elif sure_data["val"]["time"] == _val["time"]: |
| | | # 同一秒,而且还在预估买入位之后按概率计算 |
| | | property_buy_num_count -= int(_val["num"]) * int(data["re"]) |
| | | else: |
| | | # TODO 未找到买撤数据的买入点 |
| | | pass |
| | | |
| | | property_buy_num = round(property_buy_num_count * same_time_property) |
| | | if buy_nums + property_buy_num <= threshold_num: |
| | | return i, buy_nums + property_buy_num,sure_type |
| | | return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type |
| | | |
| | | |
| | | def process_data(code, datas, capture_timestamp): |
| | | now_time_str = datetime.now().strftime("%H:%M:%S") |
| | | __start_time = round(t.time() * 1000) |
| | | try: |
| | | if len(datas) > 0: |
| | | # 判断价格区间是否正确 |
| | | if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])): |
| | | raise L2DataException(L2DataException.CODE_PRICE_ERROR, |
| | | "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"])) |
| | | # 加载历史数据 |
| | | load_l2_data(code) |
| | | # 纠正数据 |
| | |
| | | # 拼接数据 |
| | | local_today_datas[code].extend(add_datas) |
| | | l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas) |
| | | |
| | | total_datas = local_today_datas[code] |
| | | # 买入确认点处理 |
| | | TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1], |
| | | add_datas) |
| | | if len(add_datas) > 0: |
| | | latest_time = add_datas[len(add_datas) - 1]["val"]["time"] |
| | | # 时间差不能太大才能处理 |
| | | if __is_same_time(now_time_str, latest_time): |
| | |
| | | # 没有计算开始点 |
| | | c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3) |
| | | if c_index is not None: |
| | | total_datas = local_today_datas[code] |
| | | |
| | | logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index]))) |
| | | |
| | | # 触发数据分析 ,获取连续涨停标记数据 |
| | | buy_nums = 0 |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | last_data_index = -1 |
| | | for i in range(c_index, len(total_datas)): |
| | | _val = total_datas[i]["val"] |
| | | # 有连续4个涨停买就标记计算起始点 |
| | |
| | | elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: |
| | | # 涨停买撤 |
| | | buy_nums -= int(_val["num"]) * int(total_datas[i]["re"]) |
| | | if buy_nums * limit_up_price * 100 > 1000 * 10000: |
| | | last_data_index = i |
| | | break |
| | | |
| | | TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index) |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | |
| | | if limit_up_price is not None: |
| | | if buy_nums * limit_up_price * 100 > 1000 * 10000: |
| | | if last_data_index > -1: |
| | | # 大于1000w就买 |
| | | logger_l2_trade.info( |
| | | "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1]))) |
| | | try: |
| | | trade_manager.start_buy(code) |
| | | trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index], |
| | | last_data_index) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | | else: |
| | | # 有计算开始点,计算新增的数据 |
| | | buy_nums = 0 |
| | | buy_nums = c_num |
| | | last_data = None |
| | | last_data_index = len(total_datas) - len(add_datas) - 1 |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | for data in add_datas: |
| | | last_data_index += 1 |
| | | _val = data["val"] |
| | | if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0: |
| | | # 涨停买 |
| | |
| | | elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1: |
| | | # 涨停买撤 |
| | | buy_nums -= int(_val["num"]) * int(data["re"]) |
| | | if buy_nums * limit_up_price * 100 > 1000 * 10000: |
| | | last_data = data |
| | | break |
| | | |
| | | TradePointManager.set_buy_compute_start_data(code, buy_nums) |
| | | latest_num = c_num + buy_nums |
| | | # 获取涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price is not None: |
| | | if latest_num * limit_up_price * 100 > 1000 * 10000: |
| | | if last_data is not None: |
| | | # 大于1000w就买 |
| | | logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1]))) |
| | | try: |
| | | trade_manager.start_buy(code) |
| | | trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index) |
| | | TradePointManager.delete_buy_cancel_point(code) |
| | | except Exception as e: |
| | | pass |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | # 删除大数据 |
| | | redis = redis_manager.RedisManager(1).getRedis() |
| | | keys = redis.keys("big_data*") |
| | | for key in keys: |
| | | redis.delete(key) |
| | | # print("big_data-{}-{}".format("123", int(round(t.time() * 1000)))) |
| | | # load_l2_data("002868") |
| | | # keys= local_today_num_operate_map["002868"] |
| | | # for k in keys: |
| | | # print(len( local_today_num_operate_map["002868"][k])) |
| | | # pass |
| | | # __set_buy_compute_start_data("000000", 100, 1) |
| | | # __set_buy_compute_start_data("000000", 100) |
| | | # __set_l2_data_latest_count("000333", 20) |
| | | # print(type(get_l2_data_latest_count("000333"))) |
| | | # datas = ["2", "3", "4", "5"] |
| | | # print(datas[4:]) |
| | | # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)) |
| | | |
| | | # 获取增量数据 |
| | | # 保存数据 |
| | | code = "000868" |
| | | local_today_datas.setdefault(code, []) |
| | | path = "C:/Users/Administrator/Desktop/demo/000868/" |
| | | for file_name in os.listdir(path): |
| | | p = "{}{}".format(path, file_name) |
| | | f = open(p) |
| | | for line in f.readlines(): # 依次读取每行 |
| | | line = line.strip() |
| | | data = json.loads(line) |
| | | result = __format_l2_data(data, code, 10.00) |
| | | add_datas = get_add_data(code, result) |
| | | print("增加的数量:", len(add_datas)) |
| | | if len(add_datas) > 0: |
| | | # 拼接数据 |
| | | local_today_datas[code].extend(add_datas) |
| | | if code in local_latest_datas: |
| | | local_latest_datas[code] = result |
| | | else: |
| | | local_latest_datas.setdefault(code, result) |
| | | f.close() |
| | | for d in local_today_datas[code]: |
| | | print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"]) |
| | |
| | | # l2数据工具 |
| | | # 比较时间的大小 |
| | | import hashlib |
| | | import json |
| | | import time |
| | | |
| | | import l2_data_manager |
| | | import tool |
| | | from log import logger_l2_trade, logger_l2_big_data |
| | | from trade_gui import async_call |
| | | |
| | | |
| | | def compare_time(time1, time2): |
| | |
| | | return time_seconds |
| | | |
| | | |
| | | |
| | | # 计算时间的区间 |
| | | def __compute_time_space_as_second(cancel_time, cancel_time_unit): |
| | | __time = int(cancel_time) |
| | |
| | | |
| | | |
| | | # 根据买撤数据(与今日总的数据)计算买入数据 |
| | | def get_buy_data_with_cancel_data(cancel_data, today_datas): |
| | | def get_buy_data_with_cancel_data(cancel_data): |
| | | # 计算时间区间 |
| | | min_space, max_space = __compute_time_space_as_second(cancel_data["val"]["cancelTime"], |
| | | cancel_data["val"]["cancelTimeUnit"]) |
| | | max_time = __sub_time(cancel_data["val"]["time"], min_space) |
| | | min_time = __sub_time(cancel_data["val"]["time"], max_space) |
| | | |
| | | for data in today_datas: |
| | | buy_datas = l2_data_manager.local_today_num_operate_map.get("{}-{}".format(cancel_data["val"]["num"], "0")) |
| | | if buy_datas is None: |
| | | # 无数据 |
| | | return None, None |
| | | for i in range(0, len(buy_datas)): |
| | | data = buy_datas[i] |
| | | if int(data["val"]["operateType"]) != 0: |
| | | continue |
| | | if int(data["val"]["num"]) != int(cancel_data["val"]["num"]): |
| | | continue |
| | | if compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0: |
| | | return data |
| | | return data["index"], data |
| | | return None, None |
| | | |
| | | |
| | | __last_big_data = {} |
| | | |
| | | |
| | | @async_call |
| | | def save_big_data(code, same_time_nums, datas): |
| | | latest_datas = __last_big_data.get(code) |
| | | d1 = json.dumps(datas) |
| | | d2 = json.dumps(latest_datas) |
| | | if latest_datas is not None and d1.strip() == d2.strip(): |
| | | return None |
| | | __last_big_data[code] = datas |
| | | # 获取不一样的快照 |
| | | if latest_datas is not None: |
| | | for i in range(len(d1)): |
| | | if d1[i] != d2[i]: |
| | | # 保存快照 |
| | | logger_l2_big_data.debug("code:{} d1:{} d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30]) |
| | | break |
| | | |
| | | for key in same_time_nums: |
| | | if same_time_nums[key] > 20: |
| | | redis = l2_data_manager._redisManager.getRedis() |
| | | redis.setex("big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1) |
| | | break |
| | | |
| | | |
| | | def test(datas): |
| | |
| | | # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}] |
| | | # result= get_buy_data_with_cancel_data(cancel_data,today_datas) |
| | | # print(result) |
| | | __datas = {} |
| | | test(__datas) |
| | | print(__datas) |
| | | redis = l2_data_manager._redisManager.getRedis() |
| | | keys = redis.keys("big_data-*") |
| | | for k in keys: |
| | | redis.delete(k) |
New file |
| | |
| | | # l2交易因子 |
| | | class L2TradeFactorUtil: |
| | | # 获取基础m值,返回单位为元 |
| | | @classmethod |
| | | def get_base_safe_val(cls, zyltgb): |
| | | yi = round(zyltgb / 100000000) |
| | | if yi < 1: |
| | | yi = 1 |
| | | return 6000000 + (yi - 1) * 500000 |
| | | |
| | | # 自由流通市值影响比例 |
| | | @classmethod |
| | | def get_zylt_rate(cls, zyltgb): |
| | | yi = round(zyltgb / 100000000) |
| | | if yi < 1: |
| | | yi = 1 |
| | | if yi <= 30: |
| | | rate = -0.04 + 0.01 * (yi - 1) |
| | | if rate > 0.1: |
| | | rate = 0.1 |
| | | else: |
| | | rate = 0.09 - (yi - 31) * 0.002 |
| | | if rate < -0.1: |
| | | rate = -0.1 |
| | | return round(rate, 4) |
| | | |
| | | # 获取行业影响比例 |
| | | # total_limit_percent为统计的比例之和乘以100 |
| | | @classmethod |
| | | def get_industry_rate(cls, total_limit_percent): |
| | | t = total_limit_percent / 10 |
| | | rate = t / 0.5 * 0.04 |
| | | if rate > 0.52: |
| | | rate = 0.52 |
| | | return round(rate, 2) |
| | | |
| | | # 获取量影响比例 |
| | | @classmethod |
| | | def get_volumn_rate(cls, day60_max, yest, today): |
| | | old_volumn = yest |
| | | base_rate = 0.25 |
| | | if day60_max > yest: |
| | | old_volumn = day60_max |
| | | base_rate = 0.26 |
| | | r = round(today / old_volumn, 2) |
| | | print("比例:", r) |
| | | rate = 0 |
| | | if r < 0.11: |
| | | rate = base_rate - (r - 0.01) |
| | | elif r < 0.45: |
| | | rate = base_rate - r |
| | | elif r < 0.75: |
| | | rate = (base_rate - 0.2049) + (r - 0.74) * 0.4 |
| | | elif r < 1.38: |
| | | rate = base_rate - (r - 0.75) * 0.8 |
| | | else: |
| | | rate = base_rate - 0.5 |
| | | return round(rate, 4) |
| | | |
| | | # 当前股票首次涨停时间的影响比例 |
| | | @classmethod |
| | | def get_limit_up_time_rate(cls, time_str): |
| | | times = time_str.split(":") |
| | | start_m = 9 * 60 + 30 |
| | | m = int(times[0]) * 60 + int(times[1]) |
| | | dif = m - start_m |
| | | base_rate = 0.15 |
| | | rate = 0 |
| | | if dif < 1: |
| | | rate = base_rate |
| | | elif dif <= 5: |
| | | rate = base_rate - dif * 0.01 |
| | | elif dif <= 120: |
| | | # 11:30之前 |
| | | rate = 0.0985 - (dif - 6) * 0.0015 |
| | | else: |
| | | rate = 0.0985 - (dif - 89 - 6) * 0.0015 |
| | | if rate < -0.15: |
| | | rate = -0.15 |
| | | return round(rate, 4) |
| | | |
| | | # 纯万手哥影响值(手数》=9000 OR 金额》=500w) |
| | | @classmethod |
| | | def get_big_money_rate(cls, num): |
| | | if num >= 8: |
| | | return 0.08 |
| | | else: |
| | | return num * 0.01 |
| | | |
| | | @classmethod |
| | | def compute_rate(cls, zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today, |
| | | limit_up_time, big_money_num): |
| | | # 自由流通股本影响比例 |
| | | zyltgb_rate = cls.get_zylt_rate(zyltgb) |
| | | # 行业涨停影响比例 |
| | | industry_rate = cls.get_industry_rate(total_industry_limit_percent) |
| | | # 量影响比例 |
| | | volumn_rate=cls.get_volumn_rate(volumn_day60_max,volumn_yest,volumn_today) |
| | | # 涨停时间影响比例 |
| | | limit_up_time_rate=cls.get_limit_up_time_rate(limit_up_time) |
| | | # 万手哥影响 |
| | | big_money_rate=cls.get_big_money_rate(big_money_num) |
| | | |
| | | return 1-(zyltgb_rate+industry_rate+volumn_rate+limit_up_time_rate+big_money_rate); |
| | | |
| | | |
| | | # l2因子归因数据 |
| | | class L2TradeFactorSourceDataUtil: |
| | | # 是否为大单 |
| | | @classmethod |
| | | def is_big_money(cls, data): |
| | | if int(data["val"]["num"]) >= 9000: |
| | | return True |
| | | money = round(float(data["val"]["price"]) * int(data["val"]["num"]) * 100) |
| | | if money >= 5000000: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(L2TradeFactorUtil.get_big_money_rate(32)) |
| | | print(L2TradeFactorUtil.get_big_money_rate(8)) |
| | | print(L2TradeFactorUtil.get_big_money_rate(0)) |
| | |
| | | logger.add(get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(get_path("l2", "l2_big_data"), filter=lambda record: record["extra"].get("name") == "l2_big_data", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(get_path("juejin", "juejin_tick"), filter=lambda record: record["extra"].get("name") == "juejin_tick", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | |
| | | logger_l2_error = logger.bind(name="l2_error") |
| | | logger_l2_process = logger.bind(name="l2_process") |
| | | logger_l2_trade = logger.bind(name="l2_trade") |
| | | logger_l2_big_data = logger.bind(name="l2_big_data") |
| | | logger_juejin_tick = logger.bind(name="juejin_tick") |
| | | logger_code_operate = logger.bind(name="code_operate") |
| | | logger_device = logger.bind(name="device") |
| | |
| | | import authority |
| | | import juejin |
| | | import l2_data_manager |
| | | import l2_data_util |
| | | import tool |
| | | import trade_manager |
| | | import l2_code_operate |
| | | |
| | | from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate |
| | | from trade_data_manager import TradeCancelDataManager |
| | | |
| | | |
| | | class MyTCPServer(socketserver.TCPServer): |
| | |
| | | if len(data) == 0: |
| | | # print("客户端断开连接") |
| | | break; |
| | | _str = data.decode() |
| | | _str = str(data, encoding="gb2312") |
| | | if len(_str) > 0: |
| | | # print("结果:",_str) |
| | | type = data_process.parseType(_str) |
| | |
| | | |
| | | try: |
| | | __start_time = round(time.time() * 1000) |
| | | |
| | | # level2盘口数据 |
| | | day, client, channel, code, datas = l2_data_manager.parseL2Data(_str) |
| | | day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data( |
| | | _str) |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | | # 保存l2截图时间 |
| | | TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time) |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | |
| | | self.l2CodeOperate.set_operate_code_state(client, channel, 1) |
| | | |
| | | if gpcode_manager.is_listen(code): |
| | | l2_data_manager.process_data(code, datas) |
| | | l2_data_manager.process_data(code, datas, capture_timestamp) |
| | | except l2_data_manager.L2DataException as l: |
| | | # 单价不符 |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | |
| | | # todo 太敏感移除代码 |
| | | logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) |
| | | # 单价不一致时需要移除代码重新添加 |
| | | l2_code_operate.L2CodeOperate().remove_l2_listen(code) |
| | | l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误") |
| | | self.l2_data_error_dict[key] = round(time.time() * 1000) |
| | | |
| | | except Exception as e: |
| | |
| | | juejin.accpt_price(item["code"], float(item["price"])) |
| | | |
| | | elif type == 30: |
| | | # 心跳信息 |
| | | data = data_process.parse(_str)["data"] |
| | | client_id = data["client"] |
| | | thsDead = data.get("thsDead") |
| | | logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) |
| | | data_process.saveClientActive(int(client_id), host) |
| | | data_process.saveClientActive(int(client_id), host, thsDead) |
| | | # print("心跳:", client_id) |
| | | |
| | | sk.send(return_str.encode()) |
New file |
| | |
| | | import json |
| | | import time |
| | | |
| | | # 交易撤销数据管理器 |
| | | import l2_data_util |
| | | import redis_manager |
| | | import tool |
| | | from log import logger_trade |
| | | |
| | | |
| | | class TradeCancelDataManager: |
| | | capture_time_dict = {} |
| | | |
| | | # 保存截图时间 |
| | | @classmethod |
| | | def save_l2_capture_time(cls, client_id, pos, code, capture_time): |
| | | cls.capture_time_dict["{}-{}-{}".format(client_id, pos, code)] = {"create_time": round(time.time() * 1000), |
| | | "capture_time": capture_time} |
| | | |
| | | # 获取最近一次的截图时间 |
| | | @classmethod |
| | | def get_latest_l2_capture_time(cls, client_id, pos, code): |
| | | val = cls.capture_time_dict.get("{}-{}-{}".format(client_id, pos, code)) |
| | | if val is None: |
| | | return -1 |
| | | # 间隔时间不能大于1s |
| | | if round(time.time() * 1000) - val["create_time"] > 1000: |
| | | return -1 |
| | | return val["capture_time"] |
| | | |
| | | # 获取l2数据的增长速度 |
| | | @classmethod |
| | | def get_l2_data_grow_speed(cls, client_id, pos, code, add_datas, capture_time): |
| | | count = 0 |
| | | for data in add_datas: |
| | | count += data["re"] |
| | | lastest_capture_time = cls.get_latest_l2_capture_time(client_id, pos, code) |
| | | if lastest_capture_time < 0: |
| | | raise Exception("获取上次l2数据截图时间出错") |
| | | return count / (capture_time - lastest_capture_time) |
| | | |
| | | # 获取买入确认点的位置 |
| | | @classmethod |
| | | def get_buy_sure_position(cls, index, speed, trade_time): |
| | | return index + round(speed * trade_time) |
| | | |
| | | |
| | | class TradeBuyDataManager: |
| | | redisManager = redis_manager.RedisManager(0) |
| | | buy_sure_position_dict = {} |
| | | |
| | | # 设置买入点的信息 |
| | | # trade_time: 买入点截图时间与下单提交时间差值 |
| | | # capture_time: 买入点截图时间 |
| | | # last_data: 买入点最后一条数据 |
| | | @classmethod |
| | | def set_buy_position_info(cls, code, capture_time, trade_time, last_data, last_data_index): |
| | | redis = cls.redisManager.getRedis() |
| | | redis.setex("buy_position_info-{}".format(code), tool.get_expire(), |
| | | json.dumps((capture_time, trade_time, last_data, last_data_index))) |
| | | |
| | | # 获取买入点信息 |
| | | @classmethod |
| | | def get_buy_position_info(cls, code): |
| | | redis = cls.redisManager.getRedis() |
| | | val_str = redis.get("buy_position_info-{}".format(code)) |
| | | if val_str is None: |
| | | return None, None, None,None |
| | | else: |
| | | val = json.loads(val_str) |
| | | return val[0], val[1], val[2], val[3] |
| | | |
| | | # 删除买入点信息 |
| | | @classmethod |
| | | def remove_buy_position_info(cls, code): |
| | | redis = cls.redisManager.getRedis() |
| | | redis.delete("buy_position_info-{}".format(code)) |
| | | |
| | | # 设置买入确认点信息 |
| | | @classmethod |
| | | def __set_buy_sure_position(cls, code, index, data): |
| | | logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data) |
| | | redis = cls.redisManager.getRedis() |
| | | key = "buy_sure_position-{}".format(code) |
| | | redis.setex(key, tool.get_expire(), json.dumps((index, data))) |
| | | cls.buy_sure_position_dict[code] = (index, data) |
| | | # 移除下单信号的详细信息 |
| | | cls.remove_buy_position_info(code) |
| | | |
| | | # 清除买入确认点信息 |
| | | @classmethod |
| | | def __clear_buy_sure_position(cls, code): |
| | | redis = cls.redisManager.getRedis() |
| | | key = "buy_sure_position-{}".format(code) |
| | | redis.delete(key) |
| | | if code in cls.buy_sure_position_dict: |
| | | cls.buy_sure_position_dict.pop(code) |
| | | |
| | | # 获取买入确认点信息 |
| | | @classmethod |
| | | def get_buy_sure_position(cls, code): |
| | | temp = cls.buy_sure_position_dict.get(code) |
| | | if temp is not None: |
| | | return temp[0], temp[1] |
| | | |
| | | redis = cls.redisManager.getRedis() |
| | | key = "buy_sure_position-{}".format(code) |
| | | val = redis.get(key) |
| | | if val is None: |
| | | return None, None |
| | | else: |
| | | val = json.loads(val) |
| | | cls.buy_sure_position_dict[code] = (val[0], val[1]) |
| | | return val[0], val[1] |
| | | |
| | | # 处理买入确认点信息 |
| | | @classmethod |
| | | def process_buy_sure_position_info(cls, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas): |
| | | buy_capture_time, trade_time, l2_data, l2_data_index = cls.get_buy_position_info(code) |
| | | if buy_capture_time is None: |
| | | # 没有购买者信息 |
| | | return None |
| | | if capture_time - buy_capture_time < trade_time: |
| | | # 时间未等待足够 |
| | | return None |
| | | # 时间差是否相差2s及以上 |
| | | old_time = l2_data["val"]["time"] |
| | | new_time = l2_latest_data["val"]["time"] |
| | | old_time_int = l2_data_util.get_time_as_seconds(old_time) |
| | | new_time_int = l2_data_util.get_time_as_seconds(new_time) |
| | | if new_time_int - old_time_int >= 2: |
| | | # 间隔2s及其以上表示数据异常 |
| | | # 间隔2s以上的就以下单时间下一秒末尾作为确认点 |
| | | start_index = l2_data_index |
| | | if len(l2_today_datas)-1 > start_index: |
| | | for i in range(start_index + 1, len(l2_today_datas)): |
| | | _time = l2_today_datas[i]["val"]["time"] |
| | | if l2_data_util.get_time_as_seconds(_time) - old_time_int >= 2: |
| | | index = i - 1 |
| | | data = l2_today_datas[index] |
| | | cls.__set_buy_sure_position(code, index, data) |
| | | break |
| | | else: |
| | | cls.__set_buy_sure_position(code, l2_data_index, l2_data) |
| | | elif new_time_int - old_time_int >= 0: |
| | | # 间隔2s内表示数据正常,将其位置设置为新增数据的中间位置 |
| | | index = len(l2_today_datas)-1 - (len(l2_add_datas)) // 2 |
| | | data = l2_today_datas[index] |
| | | cls.__set_buy_sure_position(code, index, data) |
| | | else: |
| | | # 间隔时间小于0 ,一般产生原因是数据回溯产生,故不做处理 |
| | | logger_trade.warning("预估委托位置错误:数据间隔时间小于0 code-{}", code) |
| | | pass |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | TradeBuyDataManager.set_buy_capture_time("123456", 178938828, 1232) |
| | | print(TradeBuyDataManager.get_buy_capture_time("123456")) |
| | |
| | | from log import * |
| | | from threading import Thread |
| | | |
| | | |
| | | def async_call(fn): |
| | | def wrapper(*args, **kwargs): |
| | | Thread(target=fn, args=args, kwargs=kwargs).start() |
| | | |
| | | return wrapper |
| | | |
| | | |
| | | class THSGuiTrade(object): |
| | | __instance = None |
| | |
| | | # 撤买 |
| | | def cancel_buy(self, code): |
| | | self.buy_cancel_lock.acquire() |
| | | global code_input |
| | | code_input = 0 |
| | | try: |
| | | logger_trade_gui.info("开始撤单:code-{}".format(code)) |
| | | win = self.cancel_win |
| | |
| | | import l2_code_operate |
| | | import mongo_data |
| | | import tool |
| | | from trade_gui import THSGuiTrade |
| | | from trade_data_manager import TradeBuyDataManager |
| | | from trade_gui import THSGuiTrade, async_call |
| | | import time as t |
| | | from l2_code_operate import * |
| | | import l2_data_manager |
| | |
| | | def forbidden_trade(code): |
| | | add_to_forbidden_trade_codes(code) |
| | | l2_data_manager.remove_from_l2_fixed_codes(code) |
| | | l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code) |
| | | l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易") |
| | | |
| | | |
| | | # 是否在禁止交易代码中 |
| | |
| | | |
| | | |
| | | # 开始交易 |
| | | def start_buy(code): |
| | | def start_buy(code, capture_timestamp,last_data,last_data_index): |
| | | # 是否禁止交易 |
| | | if is_in_forbidden_trade_codes(code): |
| | | raise Exception("禁止交易") |
| | |
| | | # 买一手的资金是否足够 |
| | | if price * 100 > money: |
| | | raise Exception("账户可用资金不足") |
| | | try: |
| | | |
| | | print("开始买入") |
| | | logger_trade.info("{}开始买入".format(code)) |
| | | set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER) |
| | | __buy(code, price, trade_state, capture_timestamp,last_data,last_data_index) |
| | | |
| | | |
| | | # 购买 |
| | | @async_call |
| | | def __buy(code, price, trade_state, capture_timestamp, last_data,last_data_index): |
| | | try: |
| | | guiTrade.buy(code, price) |
| | | __place_order_success(code, capture_timestamp, last_data,last_data_index) |
| | | except Exception as e: |
| | | __place_order_fail(code, trade_state) |
| | | logger_trade.error("{}买入异常{}".format(code, str(e))) |
| | | raise e |
| | | |
| | | |
| | | # 下单成功 |
| | | def __place_order_success(code, capture_timestamp, last_data,last_data_index): |
| | | # 买入结束点 |
| | | use_time = round(time.time() * 1000) - capture_timestamp |
| | | logger_trade.info("{}-从截图到下单成功总费时:{}".format(code, use_time)) |
| | | # 下单成功,加入固定代码库 |
| | | l2_data_manager.add_to_l2_fixed_codes(code) |
| | | # 记录下单的那一帧图片的截图时间与交易用时 |
| | | TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data,last_data_index) |
| | | |
| | | print("买入结束") |
| | | logger_trade.info("{}买入成功".format(code)) |
| | | except Exception as e: |
| | | |
| | | |
| | | # 下单失败 |
| | | def __place_order_fail(code, trade_state): |
| | | print("买入异常") |
| | | logger_trade.error("{}买入异常{}".format(code, str(e))) |
| | | # 状态还原 |
| | | set_trade_state(code, trade_state) |
| | | raise e |
| | | |
| | | |
| | | # 开始取消买入 |
| | |
| | | logger_trade.info("{}开始撤单".format(code)) |
| | | set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING) |
| | | guiTrade.cancel_buy(code) |
| | | # 下单成功,加入固定代码库 |
| | | l2_data_manager.remove_from_l2_fixed_codes(code) |
| | | logger_trade.info("{}撤单成功".format(code)) |
| | | __cancel_success(code) |
| | | except Exception as e: |
| | | # 状态还原 |
| | | set_trade_state(code, trade_state) |
| | |
| | | raise e |
| | | |
| | | |
| | | # 取消委托成功 |
| | | def __cancel_success(code): |
| | | TradeBuyDataManager.remove_buy_capture_time(code) |
| | | # 下单成功,加入固定代码库 |
| | | l2_data_manager.remove_from_l2_fixed_codes(code) |
| | | logger_trade.info("{}撤单成功".format(code)) |
| | | |
| | | |
| | | # 处理交易成功数据 |
| | | def process_trade_success_data(datas): |
| | | if datas is None: |