big_money_num_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
gpcode_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
juejin.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_data_log.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_data_manager_new.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_data_util.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_trade_factor.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
redis_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
server.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
tool.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade_gui.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
big_money_num_manager.py
@@ -9,6 +9,14 @@ __redisManager = redis_manager.RedisManager(0) # 是否为大单 def is_big_num(val): if int(val["num"]) >= 8000 or int(val["num"]) * float(val["price"]) >= 30000: return True else: return False def add_num(code, num): redis = __redisManager.getRedis() redis.incrby("big_money-{}".format(code), num) @@ -30,7 +38,14 @@ num = redis.get("big_money-{}".format(code)) if num is None: return 0 return round(int(num)/1000/4) return round(int(num) / 1000 / 4) def reset_all(): redis = __redisManager.getRedis() keys = redis.keys("big_money-*") for k in keys: redis.setex(k, tool.get_expire(), 0) if __name__ == "__main__": gpcode_manager.py
@@ -133,6 +133,7 @@ return None limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1")) __limit_up_price_dict[code] = limit_up_price return limit_up_price def get_limit_up_price_by_preprice(price): juejin.py
@@ -58,6 +58,10 @@ def init_data(): # 重置所有的大单数据 big_money_num_manager.reset_all() # 清除水下捞数据 __actualPriceProcessor.clear_under_water_data() # 载入行业股票代码 global_data_loader.load_industry() # 载入代码自由流通市值 @@ -68,6 +72,10 @@ # 每日初始化 def everyday_init(): # 交易時間不能做初始化 if not tool.is_init_time(): raise Exception("交易时间不能初始化") codes = gpcode_manager.get_gp_list() logger_system.info("每日初始化") @@ -246,6 +254,9 @@ def accpt_prices(prices): print("价格代码数量:", len(prices)) __actualPriceProcessor.save_current_price_codes_count(len(prices)) # 采集的代码数量不对 if len(gpcode_manager.get_gp_list()) - len(prices) > 2: return now_str = datetime.datetime.now().strftime("%H:%M:%S") now_strs = now_str.split(":") now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2]) @@ -276,7 +287,10 @@ logging.exception(e) try: __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice(pricePre) == tool.to_price(decimal.Decimal(d["price"]))) __actualPriceProcessor.save_current_price(code, price, gpcode_manager.get_limit_up_price_by_preprice( pricePre) == tool.to_price( decimal.Decimal(d["price"]))) except Exception as e: logging.exception(e) l2_data_log.py
@@ -6,7 +6,9 @@ def l2_time(code, time_, description, new_line=False): timestamp = int(time.time() * 1000) log.logger_l2_process_time.info("{} {}: {}-{}{}",timestamp, description, code, time_, "\n" if new_line else "") # 只记录耗时较长的信息 if time_ > 50: log.logger_l2_process_time.info("{} {}: {}-{}{}", timestamp, description, code, time_, "\n" if new_line else "") return timestamp @@ -22,4 +24,4 @@ log.logger_l2_trade_buy.debug(("thread-id={} code={} ".format(self.key, code) + content).format(*args)) def trade_cancel(self, code, content, *args): log.logger_l2_trade_cancel.debug(("thread-id={} code={} ".format(self.key, code) + content).format(*args)) log.logger_l2_trade_cancel.debug(("thread-id={} code={} ".format(self.key, code) + content).format(*args)) l2_data_manager.py
@@ -424,6 +424,7 @@ return False return True # 是否为涨停卖 @classmethod def is_limit_up_price_sell(cls, val): if int(val["limitPrice"]) != 1: l2_data_manager_new.py
@@ -217,14 +217,15 @@ state = trade_manager.get_trade_state(code) start_index = len(total_datas) - len(add_datas) end_index = len(total_datas) - 1 if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER: if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS: # 已挂单 cls.__process_order(code, start_index, end_index, capture_timestamp) else: # 未挂单 cls.__process_not_order(code, start_index, end_index, capture_timestamp) logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time) logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"], add_datas[-1]["index"], round(t.time() * 1000) - __start_time, capture_timestamp) __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间") # 保存数据 l2_data_manager.save_l2_data(code, datas, add_datas) @@ -258,7 +259,7 @@ buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code) # 撤单计算,只看买1 cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index, buy_single_index) buy_single_index, buy_exec_index) # 计算m值大单 cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index, @@ -368,7 +369,7 @@ return False, "同一板块中老三,老四,...不能买" if cls.__codeActualPriceProcessor.is_under_water(code): # 水下捞且板块中的票小于21不能买 # 水下捞且板块中的票小于16不能买 if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get( industry) <= 16: return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry)) @@ -518,7 +519,8 @@ l2_data_manager.TradePointManager.delete_buy_cancel_point(code) # 涨停封单额计算 L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, False) L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, buy_exec_index, False) _start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "记录执行买入数据") @@ -628,7 +630,13 @@ count = threshold_count - sub_threshold_count if count < 3: count = 3 return round(count*buy1_factor) count = round(count * buy1_factor) # 最高30笔,最低8笔 if count > 30: count = 30 if count < 8: count = 8 return count _start_time = t.time() total_datas = local_today_datas[code] @@ -846,6 +854,17 @@ return True, "9:30涨停的老大,老二可以下单" return False, "老大非9:30涨停,老二不能下单" @classmethod def test3(cls): code = "002693" load_l2_data(code, True) start_index = 334 end_index = 341 buy_single_index = 152 cls.random_key[code] = random.randint(0, 100000) L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index, buy_single_index) # 涨停封单额统计 class L2LimitUpMoneyStatisticUtil: @@ -969,12 +988,13 @@ # 返回取消的标志数据 # with_cancel 是否需要判断是否撤销 @classmethod def process_data(cls, code, start_index, end_index, buy_single_begin_index, with_cancel=True): def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True): start_time = round(t.time() * 1000) total_datas = local_today_datas[code] time_dict_num = {} # 记录计算的坐标 time_dict_num_index = {} # 坐标-量的map num_dict = {} # 统计时间分布 time_dict = {} @@ -1022,8 +1042,39 @@ time_list = [] # 到当前时间累积的买1量 time_total_num_dict = {} # 大单撤销笔数 cancel_big_num_count = 0 buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]) # 从同花顺买1矫正过后的位置开始计算,到end_index结束 for i in range(index + 1, end_index + 1): data = total_datas[i] # 统计撤销数量 if big_money_num_manager.is_big_num(data["val"]): if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]): cancel_big_num_count += int(data["re"]) # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算 # 获取是否在买入执行信号周围2s buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data["val"], local_today_num_operate_map.get( code)) if buy_index is not None and buy_data is not None: # 相差1s buy_time = buy_data["val"]["time"] if abs(buy_exec_time - tool.get_time_as_second(buy_time)) < 2: cancel_big_num_count += int(data["re"]) elif L2DataUtil.is_limit_up_price_buy(data["val"]): cancel_big_num_count -= int(data["re"]) threshold_rate = 0.5 if cancel_big_num_count >= 0: if cancel_big_num_count < 10: threshold_rate = threshold_rate - cancel_big_num_count * 0.01 else: threshold_rate = threshold_rate - 10 * 0.01 time_ = data["val"]["time"] if time_ not in time_start_index_dict: # 记录每一秒的开始位置 @@ -1048,12 +1099,27 @@ # 上1s的总数 last_second_total_volumn = time_total_num_dict.get(time_list[-1]) if last_second_total_volumn > 0 and ( last_second_total_volumn - total_num) / last_second_total_volumn >= 0.5: last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate: # 相邻2s内的数据减小50% cancel_index = i cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn, total_num) break # 记录中有上2个数据 if len(time_list) >= 2: # 倒数第2个数据 last_2_second_total_volumn = time_total_num_dict.get(time_list[-2]) if last_2_second_total_volumn > 0: if last_2_second_total_volumn > last_second_total_volumn > total_num: dif = last_2_second_total_volumn - total_num if dif / last_2_second_total_volumn >= threshold_rate: cancel_index = i cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_, last_2_second_total_volumn, last_second_total_volumn, total_num) break if not with_cancel: cancel_index = None @@ -1072,7 +1138,82 @@ return None, None # 涨停卖统计 class L2LimitUpSellStatisticUtil: _redisManager = redis_manager.RedisManager(0) @classmethod def __get_redis(cls): return cls._redisManager.getRedis() # 新增卖数据 @classmethod def __incre_sell_data(cls, code, num): key = "limit_up_sell_num-{}".format(code) cls.__get_redis().incrby(key, num) @classmethod def __get_sell_data(cls, code): key = "limit_up_sell_num-{}".format(code) val = cls.__get_redis().get(key) if val is None: return 0 return int(val) @classmethod def __save_process_index(cls, code, index): key = "limit_up_sell_index-{}".format(code) cls.__get_redis().setex(key, tool.get_expire(), index) @classmethod def __get_process_index(cls, code): key = "limit_up_sell_index-{}".format(code) val = cls.__get_redis().get(key) if val is None: return -1 return int(val) # 清除数据 @classmethod def delete(cls, code): key = "limit_up_sell_num-{}".format(code) cls.__get_redis().delete(key) key = "limit_up_sell_index-{}".format(code) cls.__get_redis().delete(key) # 处理数据,返回是否需要撤单 @classmethod def process(cls, code, start_index, end_index, buy_exec_index): # 获取涨停卖的阈值 limit_up_price = gpcode_manager.get_limit_up_price(code) zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code) threshold_num = zyltgb * 0.015 // (limit_up_price * 100) total_num = cls.__get_sell_data(code) cancel_index = None process_index = cls.__get_process_index(code) for i in range(start_index, end_index + 1): if i < buy_exec_index: continue if i <= process_index: continue total_datas = local_today_datas.get(code) if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]): num = int(total_datas[i]["val"]["num"]) cls.__incre_sell_data(code, num) total_num += num if total_num > threshold_num: cancel_index = i break if cancel_index is not None: process_index = cancel_index else: process_index = end_index # 保存处理的位置 cls.__save_process_index(code, process_index) return cancel_index if __name__ == "__main__": L2TradeDataProcessor.test2() L2TradeDataProcessor.test3() print("----------------------") # L2TradeDataProcessor.test() l2_data_util.py
@@ -96,9 +96,18 @@ return __time * 3600, (__time + 1) * 3600 # 获取买入时间范围 def get_buy_time_range(cancel_data): # 计算时间区间 min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"], cancel_data["val"]["cancelTimeUnit"]) max_time = __sub_time(cancel_data["val"]["time"], min_space) min_time = __sub_time(cancel_data["val"]["time"], max_space) return min_time, max_time # 根据买撤数据(与今日总的数据)计算买入数据 def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map): # 计算时间区间 min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"], cancel_data["val"]["cancelTimeUnit"]) max_time = __sub_time(cancel_data["val"]["time"], min_space) l2_trade_factor.py
@@ -194,7 +194,7 @@ return "zyltgb:%s, total_industry_limit_percent:%s, volumn_day60_max:%s, volumn_yest:%s, volumn_today:%s,limit_up_time:%s, big_money_num:%s" % vals @classmethod def __get_zyltgb(cls, code): def get_zyltgb(cls, code): zyltgb = global_util.zyltgb_map.get(code) if zyltgb is None: global_data_loader.load_zyltgb() @@ -218,7 +218,7 @@ # 获取安全笔数 @classmethod def get_safe_buy_count(cls, code): gb = cls.__get_zyltgb(code) gb = cls.get_zyltgb(code) if not gb: # 默认10笔 return 8 redis_manager.py
@@ -21,9 +21,9 @@ if __name__ == "__main__": _redisManager = RedisManager(1) _redisManager = RedisManager(0) redis = _redisManager.getRedis() keys = redis.keys("*601975*") keys = redis.keys("under_water_seconds-*") for k in keys: redis.delete(k) server.py
@@ -26,6 +26,7 @@ import ths_industry_util import ths_util import tool import trade_gui import trade_manager import l2_code_operate from code_data_util import ZYLTGBUtil @@ -49,7 +50,7 @@ l2_data_error_dict = {} last_trade_delegate_data = None buy1_volumn_manager = THSBuy1VolumnManager() latest_buy1_volumn_dict={} latest_buy1_volumn_dict = {} buy1_price_manager = Buy1PriceManager() def setup(self): @@ -186,7 +187,6 @@ if limit_up_time_manager.get_limit_up_time(d["code"]) is None: limit_up_time_manager.save_limit_up_time(d["code"], d["time"]) elif type == 3: # 交易成功信息 dataList = data_process.parseList(_str) @@ -197,6 +197,7 @@ trade_manager.save_trade_success_data(dataList) elif type == 5: logger_trade_delegate.debug("接收到委托信息") # 交易委托信息 dataList = data_process.parseList(_str) if self.last_trade_delegate_data != _str: @@ -208,6 +209,8 @@ except Exception as e: logging.exception(e) trade_manager.save_trade_delegate_data(dataList) # 刷新交易界面 trade_gui.THSGuiTrade().refresh_data() elif type == 4: # 行业代码信息 @@ -242,8 +245,9 @@ elif type == 50: data = data_process.parse(_str)["data"] if data is not None: print(data) index = data["index"] code_name = data["codeName"] code_name = data["codeName"].replace(" ", "") volumn = data["volumn"] price = data["price"] time_ = data["time"] @@ -253,12 +257,12 @@ code = global_util.name_codes.get(code_name) if code is not None: # 记录日志 if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn,price): if self.latest_buy1_volumn_dict.get(code) != "{}-{}".format(volumn, price): # 记录数据 logger_buy_1_volumn_record.info("{}-{}",code,data) self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn,price) logger_buy_1_volumn_record.info("{}-{}", code, data) self.latest_buy1_volumn_dict[code] = "{}-{}".format(volumn, price) # 保存买1价格 self.buy1_price_manager.save(code,price) self.buy1_price_manager.save(code, price) # 校正时间 time_ = tool.compute_buy1_real_time(time_) # 保存数据 tool.py
@@ -38,10 +38,10 @@ date = datetime.datetime.now().strftime("%Y-%m-%d") return date def get_now_time_str(): time_str = datetime.datetime.now().strftime("%H:%M:%S") return time_str # 转为价格,四舍五入保留2位小数 @@ -90,6 +90,17 @@ return True else: return False # 是否为初始化时间 def is_init_time(): relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60 start1 = 60 * 60 * 9 + 30 * 60 end1 = 60 * 60 * 15 + 1 * 60 if start1 < relative_timestamp < end1: return False else: return True def is_set_code_time(): @@ -152,6 +163,7 @@ cha = (s - 2) % 3 return time_seconds_format(s - 2 - cha) if __name__ == "__main__": print(trade_time_sub("11:29:59", "13:00:00")) print(trade_time_sub("11:29:59", "14:00:00")) trade_data_manager.py
@@ -183,6 +183,8 @@ def __increment_down_price_time(self, code, seconds): key = "under_water_seconds-{}".format(code) self.__get_redis().incrby(key, seconds) # 设置个失效时间 self.__get_redis().expire(key, tool.get_expire()) def __get_down_price_time_as_seconds(self, code): key = "under_water_seconds-{}".format(code) @@ -192,14 +194,24 @@ else: return int(val) def __save_current_price_codes_count(self,count): # 清除所有的水下捞数据 def clear_under_water_data(self): key = "under_water_*" keys = self.__get_redis().keys(key) for k in keys: self.__get_redis().delete(k) def __save_current_price_codes_count(self, count): key = "current_price_codes_count" self.__get_redis().setex(key,10,count) self.__get_redis().setex(key, 10, count) def __get_current_price_codes_count(self): key = "current_price_codes_count" count = self.__get_redis().get(key) return 0 if count is None else count def process_rate(self, code, rate, time_str): # 9点半之前的数据不处理 @@ -232,7 +244,6 @@ def get_current_price_codes_count(self): return self.__get_current_price_codes_count() # 是否为水下捞 def is_under_water(self, code): trade_gui.py
@@ -771,7 +771,10 @@ if trade_win is None: return None code_name_win = win32gui.GetDlgItem(trade_win, 0x000005C2) return THSGuiUtil.getText(code_name_win) name = THSGuiUtil.getText(code_name_win) if name is not None: name=name.replace(" ","") return name @classmethod def fill_codes(cls, codes):