code_attribute/first_target_code_data_processor.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2/huaxin/huaxin_target_codes_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
server.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
code_attribute/first_target_code_data_processor.py
New file @@ -0,0 +1,205 @@ """ 首板目标代码处理器 """ # 处理首板代码信息 import logging import constant import inited_data from code_attribute import gpcode_manager, gpcode_first_screen_manager, global_data_loader, code_nature_analyse, \ code_volumn_manager from code_attribute.code_data_util import ZYLTGBUtil from log_module.log import logger_first_code_record from third_data import kpl_api, block_info from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_manager import KPLLimitUpDataRecordManager from ths import l2_code_operate from trade import trade_data_manager from utils import global_util, tool __CodesPlateKeysManager = CodesHisReasonAndBlocksManager() def process_first_codes_datas(dataList): print("首板代码数量:", len(dataList)) limit_up_price_dict = {} temp_codes = [] codes = [] tick_datas = [] if dataList: for data in dataList: code = data["code"] codes.append(code) # ---查询想买单,如果没有在列表中就需要强行加入列表 want_codes = gpcode_manager.WantBuyCodesManager.list_code() if want_codes: # 没有在现价采集中的想买代码 diff_codes = set(want_codes) - set(codes) if diff_codes: # 想买单的代码还没有在目标代码中 zyltgb_list = [] for code in diff_codes: # 获取涨停价 _limit_up_price = gpcode_manager.get_limit_up_price(code) if not _limit_up_price: inited_data.re_set_price_pres([code], True) # 再次获取涨停价 _limit_up_price = gpcode_manager.get_limit_up_price(code) if _limit_up_price: # 成功获取到了涨停价,构造虚拟的现价信息 codes.append(code) dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0", "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100", "zyltgbUnit": 0}) # 强制更新自由流通股本 if zyltgb_list: ZYLTGBUtil.save_list(zyltgb_list) # 将保存的数据更新到内存中 for z in zyltgb_list: val = ZYLTGBUtil.get(z["code"]) if val: global_util.zyltgb_map[z["code"]] = val # ---保存未筛选的首板代码 new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes) # 保存自由流通股本 if dataList: zyltgb_list = [] for data in dataList: code = data["code"] if code in global_util.zyltgb_map: continue zyltgb_list.append( {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]}) if zyltgb_list: ZYLTGBUtil.save_list(zyltgb_list) global_data_loader.load_zyltgb() # 获取昨日收盘价 for code in codes: # 如果涨停价是空值就需要设置昨日收盘价格 if gpcode_manager.get_limit_up_price(code) is None: inited_data.re_set_price_pres([code], True) # 板块关键字准备 for code in codes: if __CodesPlateKeysManager.get_history_limit_up_reason(code) is None: # 从数据库加载历史涨停原因 __CodesPlateKeysManager.set_history_limit_up_reason(code, KPLLimitUpDataRecordManager.get_latest_blocks_set( code)) if __CodesPlateKeysManager.get_blocks(code) is None: try: results = kpl_api.getStockIDPlate(code) bs = [r[1] for r in results] __CodesPlateKeysManager.set_blocks(code, bs) except Exception as e: logging.exception(e) pass # 获取60天最大记录 for code in codes: need_get_volumn = False if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None: need_get_volumn = True if not need_get_volumn and code_nature_analyse.CodeNatureRecordManager.get_nature( code) is None: need_get_volumn = True if need_get_volumn: volumes_data = inited_data.get_volumns_by_code(code, 150) volumes = inited_data.parse_max_volume(volumes_data[:90], code_nature_analyse.is_new_top( gpcode_manager.get_limit_up_price(code), volumes_data[:90])) logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes) code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2]) # 判断K线形态 # is_has_k_format, msg = code_nature_analyse.is_has_k_format( # gpcode_manager.get_limit_up_price(code), volumes_data) # if not is_has_k_format: # logger_first_code_record.info("{}首板K线形态不好,{}", code, msg) # # 股性不好,就不要加入 # bad_codes.add(code) # # 加入禁止交易代码 # l2_trade_util.forbidden_trade(code) code_nature_analyse.set_record_datas(code, gpcode_manager.get_limit_up_price(code), volumes_data) gpcode_manager.FirstCodeManager.add_record(codes) # 初始化板块信息 for code in codes: block_info.init_code(code) if new_add_codes: gpcode_manager.set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes, fields="symbol,sec_name,sec_type,sec_level")) # 加入首板历史记录 logger_first_code_record.info("新增首板:{}", new_add_codes) # 移除代码 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: listen_codes = gpcode_manager.get_listen_codes() for lc in listen_codes: if not gpcode_manager.is_in_gp_pool(lc): # 移除代码 l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除") # 保存现价 if dataList: for data in dataList: code = data["code"] codes.append(code) limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: limit_up_price_dict[code] = limit_up_price else: temp_codes.append(code) tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"], "volumeUnit": data["volumeUnit"]}) # 获取涨停价 if temp_codes: # 获取涨停价 inited_data.re_set_price_pres(temp_codes) # 重新获取涨停价 for code in temp_codes: limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: limit_up_price_dict[code] = limit_up_price # 首板数据加工 prices = [] for data in dataList: code = data["code"] price = data["price"] limit_up_time = data["time"] if limit_up_time == "00:00:00": limit_up_time = None if code not in limit_up_price_dict: continue is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01 # 纠正数据 if is_limit_up and limit_up_time is None: limit_up_time = tool.get_now_time_str() if is_limit_up: # 加入首板涨停 gpcode_manager.FirstCodeManager.add_limited_up_record([code]) pricePre = gpcode_manager.get_price_pre(code) if pricePre is None: inited_data.re_set_price_pres([code]) rate = round((float(price) - pricePre) * 100 / pricePre, 1) prices.append( {"code": code, "time": limit_up_time, "rate": rate, "limit_up": is_limit_up}) if code in new_add_codes: if is_limit_up: place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count( code) if place_order_count == 0: trade_data_manager.placeordercountmanager.place_order(code) gpcode_first_screen_manager.process_ticks(prices) return tick_datas l2/huaxin/huaxin_target_codes_manager.py
@@ -8,6 +8,7 @@ from code_attribute.code_data_util import ZYLTGBUtil from db import redis_manager from third_data import kpl_data_manager, kpl_api from trade import current_price_process_manager from utils import tool, global_util, socket_util redisManager = redis_manager.RedisManager(0) @@ -89,23 +90,8 @@ fitem = {"code": code, "price": d[1], "volume": d[3] // (10000*100), "volumeUnit": 1, "time": "00:00:00", "zyltgb": zyltgb // 10000, "zyltgbUnit": 1} flist.append(fitem) fdata = {"type": 22, "data": flist} print("首板代码数量", len(flist)) addr, port = "127.0.0.1", 9001 sk = socket.socket() # 生成socket,连接server sk.connect((addr, port)) try: sk.sendall(socket_util.load_header(json.dumps(fdata).encode('utf-8'))) finally: sk.close() sk = socket.socket() # 生成socket,连接server sk.connect((addr, port)) try: sk.sendall(socket_util.load_header(json.dumps({"type": 40, "data": []}).encode('utf-8'))) finally: sk.close() current_price_process_manager.accept_prices(flist) if __name__ == "__main__": server.py
@@ -12,7 +12,7 @@ from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util from code_attribute import code_volumn_manager, code_nature_analyse, global_data_loader, gpcode_manager, \ gpcode_first_screen_manager gpcode_first_screen_manager, first_target_code_data_processor import constant from user import authority import inited_data @@ -275,197 +275,10 @@ raise Exception('未到接受时间') # 首板代码 dataList, is_add = data_process.parseGPCode(_str) print("首板代码数量:", len(dataList)) limit_up_price_dict = {} temp_codes = [] codes = [] tick_datas = [] if dataList: for data in dataList: code = data["code"] codes.append(code) # ---查询想买单,如果没有在列表中就需要强行加入列表 want_codes = gpcode_manager.WantBuyCodesManager.list_code() if want_codes: # 没有在现价采集中的想买代码 diff_codes = set(want_codes) - set(codes) if diff_codes: zyltgb_list = [] for code in diff_codes: # 查询是否在L2现价中 if code in self.__l2_current_price_data: item = self.__l2_current_price_data.get(code) codes.append(code) dataList.append(item) # 保存自由流通股本 zyltgb_list.append( {"code": code, "zyltgb": item["zyltgb"], "zyltgb_unit": item["zyltgbUnit"]}) else: # 获取涨停价 _limit_up_price = gpcode_manager.get_limit_up_price(code) if not _limit_up_price: inited_data.re_set_price_pres([code], True) # 再次获取涨停价 _limit_up_price = gpcode_manager.get_limit_up_price(code) if _limit_up_price: # 成功获取到了涨停价,构造虚拟的现价信息 codes.append(code) dataList.append({"code": code, "price": f"{_limit_up_price}", "volume": "0", "volumeUnit": 0, "time": "00:00:00", "zyltgb": "100", "zyltgbUnit": 0}) # 强制更新自由流通股本 if zyltgb_list: ZYLTGBUtil.save_list(zyltgb_list) # 将保存的数据更新到内存中 for z in zyltgb_list: val = ZYLTGBUtil.get(z["code"]) if val: global_util.zyltgb_map[z["code"]] = val # ---保存未筛选的首板代码 new_add_codes = gpcode_first_screen_manager.set_target_no_screen_codes(codes) # 保存自由流通股本 if dataList: zyltgb_list = [] for data in dataList: code = data["code"] if code in global_util.zyltgb_map: continue zyltgb_list.append( {"code": code, "zyltgb": data["zyltgb"], "zyltgb_unit": data["zyltgbUnit"]}) if zyltgb_list: ZYLTGBUtil.save_list(zyltgb_list) global_data_loader.load_zyltgb() bad_codes = set() # 获取昨日收盘价 for code in codes: # 如果涨停价是空值就需要设置昨日收盘价格 if gpcode_manager.get_limit_up_price(code) is None: inited_data.re_set_price_pres([code], True) # 板块关键字准备 for code in codes: if self.__CodesPlateKeysManager.get_history_limit_up_reason(code) is None: self.__CodesPlateKeysManager.set_history_limit_up_reason(code, KPLLimitUpDataRecordManager.get_latest_blocks_set( code)) if self.__CodesPlateKeysManager.get_blocks(code) is None: try: results = kpl_api.getStockIDPlate(code) bs = [r[1] for r in results] self.__CodesPlateKeysManager.set_blocks(code, bs) except Exception as e: logging.exception(e) pass # 获取60天最大记录 for code in codes: need_get_volumn = False if code not in global_util.max60_volumn or global_util.max60_volumn.get(code) is None: need_get_volumn = True if not need_get_volumn and code_nature_analyse.CodeNatureRecordManager.get_nature( code) is None: need_get_volumn = True if need_get_volumn: volumes_data = inited_data.get_volumns_by_code(code, 150) volumes = inited_data.parse_max_volume(volumes_data[:90], code_nature_analyse.is_new_top( gpcode_manager.get_limit_up_price(code), volumes_data[:90])) logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes) code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2]) # 判断K线形态 is_has_k_format, msg = code_nature_analyse.is_has_k_format( gpcode_manager.get_limit_up_price(code), volumes_data) if not is_has_k_format: logger_first_code_record.info("{}首板K线形态不好,{}", code, msg) # 股性不好,就不要加入 bad_codes.add(code) # 加入禁止交易代码 l2_trade_util.forbidden_trade(code) code_nature_analyse.set_record_datas(code, gpcode_manager.get_limit_up_price(code), volumes_data) gpcode_manager.FirstCodeManager.add_record(codes) # 初始化板块信息 for code in codes: block_info.init_code(code) if new_add_codes: gpcode_manager.set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes, fields="symbol,sec_name,sec_type,sec_level")) # 加入首板历史记录 logger_first_code_record.info("新增首板:{}", new_add_codes) # 移除代码 listen_codes = gpcode_manager.get_listen_codes() for lc in listen_codes: if not gpcode_manager.is_in_gp_pool(lc): # 移除代码 l2_code_operate.L2CodeOperate.get_instance().add_operate(0, lc, "代码被移除") # 保存现价 if dataList: for data in dataList: code = data["code"] codes.append(code) limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: limit_up_price_dict[code] = limit_up_price else: temp_codes.append(code) tick_datas.append({"code": code, "price": data["price"], "volume": data["volume"], "volumeUnit": data["volumeUnit"]}) # 获取涨停价 if temp_codes: # 获取涨停价 inited_data.re_set_price_pres(temp_codes) # 重新获取涨停价 for code in temp_codes: limit_up_price = gpcode_manager.get_limit_up_price(code) if limit_up_price is not None: limit_up_price_dict[code] = limit_up_price tick_datas = first_target_code_data_processor.process_first_codes_datas(dataList) # 保存现价 self.first_tick_datas.clear() self.first_tick_datas.extend(tick_datas) # 首板数据加工 prices = [] for data in dataList: code = data["code"] price = data["price"] limit_up_time = data["time"] if limit_up_time == "00:00:00": limit_up_time = None if code not in limit_up_price_dict: continue is_limit_up = abs(float(limit_up_price_dict[code]) - float(price)) < 0.01 # 纠正数据 if is_limit_up and limit_up_time is None: limit_up_time = tool.get_now_time_str() if is_limit_up: # 加入首板涨停 gpcode_manager.FirstCodeManager.add_limited_up_record([code]) pricePre = gpcode_manager.get_price_pre(code) if pricePre is None: inited_data.re_set_price_pres([code]) rate = round((float(price) - pricePre) * 100 / pricePre, 1) prices.append( {"code": code, "time": limit_up_time, "rate": rate, "limit_up": is_limit_up}) if code in new_add_codes: if is_limit_up: place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count( code) if place_order_count == 0: trade_data_manager.placeordercountmanager.place_order(code) gpcode_first_screen_manager.process_ticks(prices) except Exception as e: logging.exception(e) finally: