| | |
| | | import xmdapi |
| | | from huaxin_client import tool, constant |
| | | from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug |
| | | from third_data import custom_block_in_money_manager |
| | | from third_data.custom_block_in_money_manager import BlockInMoneyRankManager, CodeInMoneyManager |
| | | from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi |
| | | from utils import tool as out_tool |
| | | |
| | | ################B类################## |
| | |
| | | # (代码, 现价, 涨幅, 量, 当前时间, 买1价, 买1量, 买2价, 买2量, 更新时间) |
| | | level1_data_dict[pMarketDataField.SecurityID] = ( |
| | | pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(), |
| | | pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2, pMarketDataField.BidVolume2, pMarketDataField.UpdateTime) |
| | | pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2, |
| | | pMarketDataField.BidVolume2, pMarketDataField.UpdateTime) |
| | | try: |
| | | custom_block_in_money_manager.CodeInMoneyManager.set_market_info(pMarketDataField.SecurityID, lastPrice, |
| | | close_price, pMarketDataField.Turnover) |
| | | except: |
| | | pass |
| | | |
| | | |
| | | __latest_subscript_codes = set() |
| | |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | def __test_block_in_money(): |
| | | codes = BlockInMoneyRankManager().get_codes() |
| | | page_size = 200 |
| | | total_page = len(codes) // page_size + 1 |
| | | |
| | | def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None): |
| | | for i in range(0, total_page): |
| | | temp_codes = codes[i * page_size: (i + 1) * page_size] |
| | | print(temp_codes) |
| | | # 获取最近的信息 |
| | | latest_infos = HistoryKDatasUtils.get_gp_latest_info(temp_codes, "sec_id,pre_close") |
| | | pre_close_dict = {x["sec_id"]: x["pre_close"] for x in latest_infos} |
| | | |
| | | current_infos = JueJinApi.get_gp_current_info(temp_codes, "symbol,price,cum_amount") |
| | | current_infos = {x["symbol"].split(".")[1]: (x["price"], x["cum_amount"]) for x in |
| | | current_infos} |
| | | |
| | | for code in current_infos: |
| | | if code not in pre_close_dict: |
| | | continue |
| | | CodeInMoneyManager.set_market_info(code, current_infos[code][0], pre_close_dict[code], |
| | | current_infos[code][1]) |
| | | |
| | | |
| | | def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, queue_custom_block_in_money, fixed_codes=None): |
| | | """ |
| | | 运行l1订阅任务 |
| | | |
| | | @param queue_l1_w_strategy_r: L1方写,策略方读 |
| | | @param queue_l1_r_strategy_w: L1方读,策略方写 |
| | | @param queue_custom_block_in_money: 板块流入流出计算结果 |
| | | @param fixed_codes: 固定要返回数据的代码 |
| | | @return: |
| | | """ |
| | |
| | | |
| | | threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start() |
| | | |
| | | # TODO 测试 |
| | | __test_block_in_money() |
| | | |
| | | # 等待程序结束 |
| | | while True: |
| | | print("数量", len(level1_data_dict)) |
| | |
| | | if len(datas) > 0: |
| | | logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas)) |
| | | __upload_codes_info(queue_l1_w_strategy_r, datas) |
| | | # 计算流入流出并上传 |
| | | custom_block_in_money_manager.BlockInMoneyRankManager().compute() |
| | | queue_custom_block_in_money.put_nowait(custom_block_in_money_manager.BlockInMoneyRankManager().get_in_list(), custom_block_in_money_manager.BlockInMoneyRankManager().get_out_list()) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | logger_debug.exception(e) |
| | |
| | | filter=lambda record: record["extra"].get("name") == "kpl_jx_out", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | logger.add(self.get_path("kpl", "kpl_jx_in"), |
| | | filter=lambda record: record["extra"].get("name") == "kpl_jx_in", |
| | | rotation="00:00", compression="zip", enqueue=True) |
| | | |
| | | # 看盘日志 |
| | | logger.add(self.get_path("kp", "kp_msg"), |
| | | filter=lambda record: record["extra"].get("name") == "kp_msg", |
| | |
| | | |
| | | logger_kpl_jx_out = __mylogger.get_logger("kpl_jx_out") |
| | | |
| | | logger_kpl_jx_in = __mylogger.get_logger("kpl_jx_in") |
| | | |
| | | |
| | | |
| | | logger_kp_msg = __mylogger.get_logger("kp_msg") |
| | |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr): |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr, queue_custom_block_in_money): |
| | | """ |
| | | 策略进程 |
| | | |
| | | @param pipe_server: |
| | | @param queue_strategy_r_trade_w_: |
| | | @param queue_l1_w_strategy_r_: |
| | |
| | | @param queue_l1_trade_r_strategy_w_: |
| | | @param queue_l1_trade_w_strategy_r_: |
| | | @param trade_ipc_addr: 交易ipc地址(下单地址, 撤单地址) |
| | | @param queue_custom_block_in_money: 接收板块流入流出 |
| | | @return: |
| | | """ |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | |
| | | # redis后台服务 |
| | | t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | | |
| | | # |
| | | # 启动华鑫交易服务 |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, |
| | | queue_strategy_w_trade_r_for_read_, |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr) |
| | | queue_l1_trade_w_strategy_r_, trade_ipc_addr, queue_custom_block_in_money) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | # l1 |
| | | queue_l1_w_strategy_r = multiprocessing.Queue() |
| | | queue_l1_r_strategy_w = multiprocessing.Queue() |
| | | queue_custom_block_in_money = multiprocessing.Queue() # 接收板块流入流出 |
| | | # l1交易 |
| | | queue_l1_trade_w_strategy_r = multiprocessing.Queue() |
| | | queue_l1_trade_r_strategy_w = multiprocessing.Queue() |
| | |
| | | # L1订阅数据 |
| | | l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, |
| | | args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w, |
| | | queue_custom_block_in_money, |
| | | gpcode_manager.BuyOpenLimitUpCodeManager().get_codes(),)) |
| | | l1Process.start() |
| | | |
| | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w, |
| | | queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr)) |
| | | queue_l1_trade_w_strategy_r, (order_ipc_addr, cancel_order_ipc_addr), queue_custom_block_in_money) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | |
| | | hx_logger_l2_orderdetail, hx_logger_l2_market_data, logger_l2_g_cancel, logger_debug, \ |
| | | logger_system, logger_trade, logger_local_huaxin_l1_trade_info, logger_l2_codes_subscript, logger_l2_radical_buy |
| | | from third_data import block_info, kpl_data_manager, history_k_data_manager, huaxin_l1_data_manager |
| | | from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager |
| | | from third_data.code_plate_key_manager import KPLCodeJXBlockManager, CodePlateKeyBuyManager, RealTimeKplMarketData |
| | | from third_data.history_k_data_util import JueJinApi |
| | | from trade import l2_trade_util, \ |
| | | trade_data_manager, trade_constant, buy_open_limit_up_strategy |
| | |
| | | logger_debug.exception(e) |
| | | |
| | | |
| | | def __recv_pipe_block_in_money(queue_custom_block_in_money: multiprocessing.Queue): |
| | | logger_system.info(f"trade_server __recv_pipe_block_in_money 线程ID:{tool.get_thread_id()}") |
| | | if queue_custom_block_in_money is not None: |
| | | while True: |
| | | try: |
| | | val = queue_custom_block_in_money.get() |
| | | if val: |
| | | in_list, out_list = val[0], val[1] |
| | | RealTimeKplMarketData.set_market_jingxuan_blocks_from_custom(in_list) |
| | | RealTimeKplMarketData.set_market_jingxuan_out_blocks_from_custom(out_list) |
| | | except Exception as e: |
| | | logger_debug.exception(e) |
| | | |
| | | |
| | | # 排得太远撤单 |
| | | def __cancel_buy_for_too_far(): |
| | | while True: |
| | |
| | | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, |
| | | queue_l1_trade_w_strategy_r, trade_ipc_addr): |
| | | queue_l1_trade_w_strategy_r, trade_ipc_addr, queue_custom_block_in_money): |
| | | """ |
| | | |
| | | @param queue_strategy_r_trade_w: |
| | |
| | | @param queue_strategy_w_trade_r_for_read: |
| | | @param queue_l1_trade_w_strategy_r: |
| | | @param trade_ipc_addr: 交易IPC地址:(下单ipc地址,撤单ipc地址) |
| | | @param queue_custom_block_in_money: 接收板块流入流出队列 |
| | | @return: |
| | | """ |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | |
| | | t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True) |
| | | t1.start() |
| | | |
| | | # 读取板块资金流入 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_block_in_money(queue_custom_block_in_money), daemon=True) |
| | | t1.start() |
| | | |
| | | logger_system.info("create TradeServer") |
| | | t1 = threading.Thread(target=lambda: clear_invalid_client(), daemon=True) |
| | | t1.start() |
| | |
| | | from log_module import async_log_util |
| | | from db import redis_manager_delegate as redis_manager |
| | | |
| | | from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out |
| | | from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in |
| | | from third_data.kpl_util import KPLPlatManager |
| | | from trade import trade_manager, l2_trade_util, trade_constant |
| | | |
| | |
| | | # break |
| | | # blocks.add(data[1]) |
| | | # cls.__top_jx_blocks = blocks |
| | | if True: |
| | | return |
| | | blocks = set() |
| | | for data in datas: |
| | | if data[1] in constant.KPL_INVALID_BLOCKS: |
| | |
| | | break |
| | | blocks.add(kpl_util.filter_block(data[1])) |
| | | # 记录精选流出日志 |
| | | async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}") |
| | | cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks) |
| | | |
| | | @classmethod |
| | | def set_market_jingxuan_blocks_from_custom(cls, datas): |
| | | """ |
| | | 设置自定义精选流入数据 |
| | | @param datas:[(板块,流入金额)] |
| | | @return: |
| | | """ |
| | | blocks = set() |
| | | for data in datas: |
| | | if data[0] in constant.KPL_INVALID_BLOCKS: |
| | | continue |
| | | if data[1] < 5e7: |
| | | continue |
| | | blocks.add(data[0]) |
| | | if len(blocks) >= 10: |
| | | break |
| | | blocks.add(kpl_util.filter_block(data[0])) |
| | | # 记录精选流出日志 |
| | | async_log_util.info(logger_kpl_jx_in, f"原数据:{datas[:20]} 板块:{blocks}") |
| | | cls.__top_jx_blocks = BlockMapManager().filter_blocks(blocks) |
| | | |
| | | @classmethod |
| | |
| | | @param datas: |
| | | @return: |
| | | """ |
| | | if True: |
| | | return |
| | | blocks = set() |
| | | for data in datas: |
| | | if data[1] in constant.KPL_INVALID_BLOCKS: |
| | |
| | | cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks) |
| | | |
| | | @classmethod |
| | | def set_market_jingxuan_out_blocks_from_custom(cls, datas): |
| | | """ |
| | | 设置自定义精选流出数据 |
| | | @param datas:[(板块,流入金额)] |
| | | @return: |
| | | """ |
| | | blocks = set() |
| | | for data in datas: |
| | | if data[0] in constant.KPL_INVALID_BLOCKS: |
| | | continue |
| | | if data[1] > -5e7: |
| | | # 过滤5千万以上的 |
| | | break |
| | | blocks.add(data[0]) |
| | | if len(blocks) >= 10: |
| | | break |
| | | blocks.add(kpl_util.filter_block(data[0])) |
| | | # 记录精选流出日志 |
| | | async_log_util.info(logger_kpl_jx_out, f"原数据:{datas[:10]} 板块:{blocks}") |
| | | cls.__top_jx_out_blocks = BlockMapManager().filter_blocks(blocks) |
| | | |
| | | @classmethod |
| | | def get_top_market_jingxuan_blocks(cls): |
| | | return cls.__top_jx_blocks |
| | | |
| | |
| | | """ |
| | | 板块辨识度票管理 |
| | | """ |
| | | import constant |
| | | from db import mysql_data_delegate as mysql_data |
| | | from third_data.history_k_data_util import HistoryKDatasUtils |
| | | from utils import tool |
| | | |
| | | |
| | |
| | | self.mysql.execute(sql) |
| | | |
| | | |
| | | class AnalysisBlockSpecialCodesManager: |
| | | __mysql = mysql_data.Mysqldb() |
| | | |
| | | def __get_code_blocks(self, min_day, max_day): |
| | | results = self.__mysql.select_all( |
| | | f"SELECT r.`_hot_block_name`, r.`_code`, COUNT(*) FROM kpl_limit_up_record r WHERE r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' group by r.`_hot_block_name`, r.`_code`") |
| | | return results |
| | | |
| | | def __get_limit_up_info(self, min_day): |
| | | sql = f"SELECT r.`_code`, COUNT(r.`_code`),r.`_code_name`,IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 ) FROM (SELECT * FROM kpl_limit_up_record r ORDER BY r.`_create_time` DESC) r WHERE r.`_day`>'{min_day}' GROUP BY r.`_code`" |
| | | results = self.__mysql.select_all(sql) |
| | | # {"代码":(涨停次数, 名称, 自由流通市值)} |
| | | return {x[0]: (x[1], x[2], x[3]) for x in results} |
| | | |
| | | def __get_top(self, block, min_day, max_day): |
| | | sql = f"SELECT r.`_code`,r.`_code_name`, COUNT(*), IF( r.`_zylt_val` is null, 1, r.`_zylt_val`/100000000 ) FROM (SELECT * FROM kpl_limit_up_record r ORDER BY r.`_create_time` DESC) r WHERE r.`_hot_block_name`='{block}' AND r.`_day`>'{min_day}' and r.`_day`<'{max_day}' and r._code not like '68%' GROUP BY r.`_code`" |
| | | results = self.__mysql.select_all(sql) |
| | | results = list(results) |
| | | info_map = {x[1]: x for x in results} |
| | | max_count = min(10, len(results) // 2) |
| | | results.sort(key=lambda x: x[2], reverse=True) |
| | | names1 = set([x[0] for x in results[:max_count]]) |
| | | results.sort(key=lambda x: x[3], reverse=True) |
| | | names2 = set([x[0] for x in results[:max_count]]) |
| | | fnames = names1 & names2 |
| | | return fnames |
| | | |
| | | def __get_block_map(self): |
| | | """ |
| | | 获取板块裂变 |
| | | @return: |
| | | """ |
| | | constant.get_path_prefix() |
| | | with open(f"{constant.get_path_prefix()}/板块对应.txt", encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | block_map = {} |
| | | for line in lines: |
| | | line = line.strip() |
| | | if line: |
| | | line = line.replace(",", ",").replace("丨", "|") |
| | | parent_blocks = set() |
| | | if line.find("|") >= 0: |
| | | sts = line.split("|") |
| | | parent_blocks |= set(sts[0].split(",")) |
| | | line = sts[1] |
| | | blocks = line.split(",") |
| | | if not blocks: |
| | | continue |
| | | if not parent_blocks: |
| | | parent_blocks.add(blocks[0]) |
| | | for b in blocks: |
| | | b = b.strip() |
| | | if b not in block_map: |
| | | block_map[b] = set() |
| | | block_map[b] |= parent_blocks |
| | | if len(parent_blocks) > 1: |
| | | # parent加入 |
| | | for b in parent_blocks: |
| | | if b not in block_map: |
| | | block_map[b] = set() |
| | | block_map[b].add(b) |
| | | return block_map |
| | | |
| | | def test_block_map(self): |
| | | print(self.__get_block_map()) |
| | | |
| | | def get_block_special_codes(self): |
| | | """ |
| | | 获取板块有辨识度的代码 |
| | | @return: |
| | | """ |
| | | trading_dates = HistoryKDatasUtils.get_latest_trading_date(15) |
| | | max_day = trading_dates[-1] |
| | | min_day = tool.date_sub(max_day, 365) |
| | | |
| | | block_map = self.__get_block_map() |
| | | # [(板块名称,代码, 在板块中的涨停次数)] |
| | | code_block_infos = self.__get_code_blocks(min_day, max_day) |
| | | code_block_dict = {} # {"代码":{"板块": 涨停次数}} |
| | | for b in code_block_infos: |
| | | if b[1] not in code_block_dict: |
| | | code_block_dict[b[1]] = {} |
| | | bs = block_map.get(b[0]) |
| | | if not bs: |
| | | bs = {b[0]} |
| | | for bb in bs: |
| | | if bb not in code_block_dict[b[1]]: |
| | | code_block_dict[b[1]][bb] = 0 |
| | | code_block_dict[b[1]][bb] += b[2] |
| | | block_codes_dict = {} # {"板块":[(代码,涨停次数)]} |
| | | for code in code_block_dict: |
| | | for b in code_block_dict[code]: |
| | | if b not in block_codes_dict: |
| | | block_codes_dict[b] = [] |
| | | block_codes_dict[b].append((code, code_block_dict[code][b])) |
| | | |
| | | limit_up_info_map = self.__get_limit_up_info(min_day) |
| | | fdatas = [] |
| | | for b in block_codes_dict: |
| | | # if b != '人工智能': |
| | | # continue |
| | | |
| | | if b in constant.KPL_INVALID_BLOCKS: |
| | | continue |
| | | |
| | | code_info_list = block_codes_dict[b] |
| | | # code_info_list.sort(key=lambda x: x[1], reverse=True) |
| | | max_count = len(code_info_list) // 3 |
| | | # code_info_list.sort(key=lambda x: float(limit_up_info_map[x[0]][2]), reverse=True) |
| | | # code_info_list.sort(key=lambda x: code_block_dict[x[0]][b], reverse=True) |
| | | zylt_list = code_info_list[:max_count] |
| | | zylt_list = [x[0] for x in zylt_list] |
| | | # fcodes = list(set(count_list) & set(zylt_list)) |
| | | # fcodes.sort(key=lambda x: code_block_dict[x][b], reverse=True) |
| | | # ffcodes = fcodes[:5] |
| | | index = 0 |
| | | # 获取股价,是否是ST |
| | | if not zylt_list: |
| | | continue |
| | | juejin_results = HistoryKDatasUtils.get_gp_latest_info(zylt_list, fields="sec_id,sec_level,upper_limit") |
| | | if juejin_results is None: |
| | | continue |
| | | |
| | | juejin_result_dict = {x['sec_id']: (x['sec_id'], x['sec_level'], x['upper_limit']) for x in juejin_results} |
| | | for code in zylt_list: |
| | | if code_block_dict[code][b] <= 3: |
| | | # 累计涨停次数小于3次 |
| | | continue |
| | | |
| | | if code not in juejin_result_dict: |
| | | # 查询不到信息 |
| | | continue |
| | | |
| | | if juejin_result_dict[code][1] != 1: |
| | | # 非正常票 |
| | | continue |
| | | |
| | | if juejin_result_dict[code][2] < 3: |
| | | # 小于3块 |
| | | continue |
| | | index += 1 |
| | | fdatas.append( |
| | | (b, limit_up_info_map[code][1], code, code_block_dict[code][b], |
| | | int(float(limit_up_info_map[code][2])))) |
| | | if index >= 10: |
| | | break |
| | | # BlockSpecialCodesManager().set_block_codes_list(fdatas) |
| | | return fdatas |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | codes = BlockSpecialCodesManager().get_block_codes("证券") |
| | | print(codes) |
| | | datas = AnalysisBlockSpecialCodesManager().get_block_special_codes() |
| | | for d in datas: |
| | | print(d) |
| | | # BlockSpecialCodesManager().set_block_codes_list(datas) |
| | | print(datas) |