alert.mp3Binary files differ
api/outside_api_command_callback.py
@@ -36,7 +36,7 @@ logger_real_place_order_position, logger_device from output import l2_output_util from third_data import kpl_util, history_k_data_manager, huaxin_l1_data_manager, third_blocks_manager, kpl_data_manager from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager, RealTimeKplMarketData, \ from third_data.code_plate_key_manager import KPLCodeJXBlockManager, \ RadicalBuyBlockManager from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils code_attribute/code_data_util.py
@@ -4,14 +4,12 @@ # 股票代码相关的参数 import decimal import time from code_attribute import gpcode_manager from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager from db import redis_manager_delegate as redis_manager from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils from utils import tool from utils.tool import async_call __db = 0 _redisManager = redis_manager.RedisManager(0) code_attribute/first_target_code_data_processor.py
@@ -16,8 +16,7 @@ from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager from third_data.history_k_data_manager import HistoryKDataManager from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi from ths import l2_code_operate from trade import trade_data_manager, l2_trade_util from trade import l2_trade_util from settings.trade_setting import MarketSituationManager from utils import global_util, tool, init_data_util, buy_condition_util code_attribute/gpcode_manager.py
@@ -5,7 +5,6 @@ import json import time import constant from db import redis_manager_delegate as redis_manager from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils @@ -13,8 +12,6 @@ from log_module.log import logger_pre_close_price from utils import tool import decimal from ths import l2_listen_pos_health_manager, client_manager __redisManager = redis_manager.RedisManager(0) __db = 0 @@ -873,141 +870,3 @@ finally: RedisUtils.realse(redis_instance) # 获取可以操作的位置 def get_can_listen_pos(client_id=0): client_ids = [] if client_id <= 0: client_ids = client_manager.getValidL2Clients() else: client_ids.append(client_id) # random.shuffle(client_ids) available_positions = [] for client_id in client_ids: redis_instance = __redisManager.getRedis() k = "listen_code-{}-*".format(client_id) keys = RedisUtils.keys(redis_instance, k, auto_free=False) # random.shuffle(keys) codes = [] for key in keys: index = key.split("-")[-1] if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE: continue result = RedisUtils.get(redis_instance, key, auto_free=False) if result is None or len(result) == 0: available_positions.append((client_id, int(key.replace("listen_code-{}-".format(client_id), "")))) else: codes.append((key, result)) RedisUtils.realse(redis_instance) # 查询是否有重复的代码 codes_set = set() count = 0 for code in codes: count = count + 1 codes_set.add(code[1]) if len(codes_set) < count: return client_id, int(code[0].replace("listen_code-{}-".format(client_id), "")) if available_positions: # 获取健康状态 available_positions_health_states = l2_listen_pos_health_manager.list_health_state(available_positions) # 尽量不分配第一个位置 available_positions_new = sorted(available_positions, key=lambda x: (available_positions_health_states[x], 0 if x[1] == 0 else 1), reverse=True) # available_positions.sort(key=lambda x: available_positions_health_states[x], reverse=True) # 取第1个数据 return available_positions_new[0][0], available_positions_new[0][1] return None, None # 获取可以操作的位置 def get_free_listen_pos_count(): client_ids = client_manager.getValidL2Clients() free_count = 0 for client_id in client_ids: redis_instance = __redisManager.getRedis() try: k = "listen_code-{}-*".format(client_id) keys = RedisUtils.keys(redis_instance, k, auto_free=False) for key in keys: code = RedisUtils.get(redis_instance, key, auto_free=False) if not code: free_count += 1 finally: RedisUtils.realse(redis_instance) return free_count # 获取正在监听的代码的位置 def get_listen_code_pos(code): val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code)) if val is None: return None, None val = json.loads(val) cid, pid = val[0], val[1] code_ = get_listen_code_by_pos(cid, pid) # 校验代码 if code_ == code: return cid, pid else: return None, None # 是否正在监听 def is_listen(code): val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code)) if val is None: return False else: return True # codes = get_listen_codes() # return codes.__contains__(code) def is_listen_old(code): codes = get_listen_codes() return codes.__contains__(code) # 监听是否满了 def is_listen_full(): clients = client_manager.getValidL2Clients() codes = get_listen_codes() return len(codes) >= constant.L2_CODE_COUNT_PER_DEVICE * len(clients) # 是否正在操作 def is_operate(code): return RedisUtils.get(__redisManager.getRedis(), "gp_operate-{}".format(code)) is not None # 设置正在操作的代码 def set_operate(code): RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1") # 批量设置正在操作的代码 def set_operates(codes): for code in codes: RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1") # 移除正在操作的代码 def rm_operate(code): RedisUtils.delete(__redisManager.getRedis(), "gp_operate-{}".format(code)) # 批量移除正在操作的代码 def rm_operates(codes): redis_instance = __redisManager.getRedis() try: for code in codes: RedisUtils.delete(redis_instance, "gp_operate-{}".format(code), auto_free=False) finally: RedisUtils.realse(redis_instance) if __name__ == '__main__': get_can_listen_pos() db/mysql_data_delegate.py
@@ -1,7 +1,3 @@ # 网络代理 import logging import pymysql # 把连接参数定义成字典 import constant db/redis_manager.py
@@ -9,7 +9,6 @@ import constant from log_module.log import logger_redis_debug from utils import tool config = constant.REDIS_CONFIG db/redis_manager_delegate.py
@@ -3,7 +3,6 @@ """ import logging import queue import threading import time import redis gmcache/01524cdfb33d0fa1b5a69a5f60c0856cBinary files differ
gmcache/01524cdfb33d0fa1b5a69a5f60c0856c.inf
File was deleted gmcache/479fdde70427c212421e408d94a30892Binary files differ
gmcache/479fdde70427c212421e408d94a30892.inf
File was deleted gmcache/5f02c16d1706a5857d1166a0279c4477Binary files differ
gmcache/5f02c16d1706a5857d1166a0279c4477.inf
File was deleted gmcache/79eda687b206cf9338a7498f287b9df1Binary files differ
gmcache/79eda687b206cf9338a7498f287b9df1.inf
File was deleted gmcache/831c0d379641e75a009c8571b789d91eBinary files differ
gmcache/831c0d379641e75a009c8571b789d91e.inf
File was deleted gmcache/97f026b9573053ae5dd8daacc7e75d88Binary files differ
gmcache/97f026b9573053ae5dd8daacc7e75d88.inf
File was deleted gmcache/a3c899eb9dc5faac0ebba63e45c476fdBinary files differ
gmcache/a3c899eb9dc5faac0ebba63e45c476fd.inf
File was deleted gmcache/b3bf755a9732c235ef49c86d82db9841Binary files differ
gmcache/b3bf755a9732c235ef49c86d82db9841.inf
File was deleted gmcache/cc43a7195e37a97c7c85fdc5b7c37337Binary files differ
gmcache/cc43a7195e37a97c7c85fdc5b7c37337.inf
File was deleted gmcache/dd002e0132e87679024d8fed93933042Binary files differ
gmcache/dd002e0132e87679024d8fed93933042.inf
File was deleted gmcache/ec282109624ef7eedeba39f23faa3787Binary files differ
gmcache/ec282109624ef7eedeba39f23faa3787.inf
File was deleted gui.py
File was deleted inited_data.py
@@ -4,62 +4,21 @@ from __future__ import print_function, absolute_import import schedule # import gm.api as gmapi from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer from code_attribute import big_money_num_manager, global_data_loader, gpcode_manager, gpcode_first_screen_manager from code_attribute.code_nature_analyse import LatestMaxVolumeManager, HighIncreaseCodeManager, CodeNatureRecordManager from db.redis_manager_delegate import RedisUtils from l2.l2_sell_manager import L2MarketSellManager from l2.l2_transaction_data_manager import HuaXinBuyOrderManager from third_data.third_blocks_manager import InvalidBlockManager, CodeThirdBlocksManager from ths import client_manager import constant from trade.deal_big_money_manager import DealOrderNoManager from trade.trade_manager import AutoCancelSellModeManager from utils import global_util, tool import threading from trade.trade_manager import AutoCancelSellModeManager from utils import tool from servers import server from db import redis_manager_delegate as redis_manager from user import authority import decimal from third_data.history_k_data_util import HistoryKDatasUtils from trade import l2_trade_util, trade_manager from trade import l2_trade_util from l2.cancel_buy_strategy import L2LimitUpSellStatisticUtil from log_module.log import logger_juejin_tick, logger_system from trade.trade_data_manager import CodeActualPriceProcessor, PlaceOrderCountManager, AccountMoneyManager from trade.trade_queue_manager import JueJinBuy1VolumnManager from log_module.log import logger_system from trade.trade_data_manager import PlaceOrderCountManager, AccountMoneyManager from utils.ths_industry_util import ThsCodeIndustryManager redisManager = redis_manager.RedisManager(0) __jueJinBuy1VolumnManager = JueJinBuy1VolumnManager() __actualPriceProcessor = CodeActualPriceProcessor() # 设置账户信息 def setAccountInfo(accountId, strategyId, token): redis = redisManager.getRedis() try: RedisUtils.set(redis, "juejin-account-id", accountId, auto_free=False) RedisUtils.set(redis, "juejin-strategy-id", strategyId, auto_free=False) RedisUtils.set(redis, "juejin-token", token, auto_free=False) finally: RedisUtils.realse(redis) def getAccountInfo(): redis = redisManager.getRedis() try: account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False) strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False) token = RedisUtils.get(redis, "juejin-token", auto_free=False) return account_id, strategy_id, token finally: RedisUtils.realse(redis) def init_data(): @@ -116,104 +75,13 @@ PlaceOrderCountManager().clear() # 每日初始化 def everyday_init(): # 交易時間不能做初始化 # if not tool.is_init_time() or True: # raise Exception("交易时间不能初始化") init_data() codes = gpcode_manager.get_gp_list() logger_system.info("每日初始化") # 今日实时涨停 global_data_loader.add_limit_up_codes([], True) # 主要获取收盘价 __get_latest_info(None) # 获取60天最大量与昨日量 global_util.today_volumn.clear() global_util.max60_volumn.clear() global_util.yesterday_volumn.clear() # 清除大单数据 global_util.big_money_num.clear() # 初始化大单数据 for code in codes: big_money_num_manager.add_num(code, 0) big_money_num_manager.expire(code) # 清除涨停时间 global_util.limit_up_time.clear() # 初始化同花顺主站 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: l2_clients = client_manager.getValidL2Clients() for client in l2_clients: try: server.repair_ths_main_site(client) except Exception as e: pass def __run_schedule(): while True: schedule.run_pending() def init(context): # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30) # 订阅浦发银行, bar频率为一天和一分钟 # 订阅订阅多个频率的数据,可多次调用subscribe # 获取需要监听的股票 init_data() logger_system.info("掘金初始化") schedule.every().day.at("09:15:00").do(everyday_init) t1 = threading.Thread(target=lambda: __run_schedule()) # 后台运行 t1.setDaemon(True) t1.start() # 多个时间点获取收盘价 gmapi.schedule(schedule_func=__get_latest_info, date_rule='1d', time_rule='09:28:00') # 初始化内容 clients = authority.get_l2_clients() for client in clients: for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): gpcode_manager.init_listen_code_by_pos(client, i) def __get_latest_info(context): # 初始化内容 clients = authority.get_l2_clients() for c in clients: for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): gpcode_manager.init_listen_code_by_pos(int(c), i) codes = gpcode_manager.get_gp_list() result = HistoryKDatasUtils.get_gp_latest_info(codes) for item in result: sec_level = item['sec_level'] symbol = item['symbol'] symbol = symbol.split(".")[1] pre_close = tool.to_price(decimal.Decimal(str(item['pre_close']))) if sec_level == 1: if symbol in codes: gpcode_manager.CodePrePriceManager.set_price_pre(symbol, pre_close) else: gpcode_manager.rm_gp(symbol) # 获取最新的信息 def __get_current_info(): data = gpcode_manager.get_gp_list() results = HistoryKDatasUtils.get_gp_current_info(data) logger_juejin_tick.debug("定时获取:{}", results) for result in results: price = result["price"] symbol = result['symbol'] # 保存最新价 symbol = symbol.split(".")[1] __prices_now = {} # 保存运行时数据 servers/server.py
@@ -14,23 +14,19 @@ from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util from code_attribute import code_volumn_manager, global_data_loader, gpcode_manager, first_target_code_data_processor import constant from user import authority from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager import l2_data_util from l2 import l2_data_manager_new, l2_data_manager, code_price_manager import l2.l2_data_util from third_data import block_info from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager from trade import trade_data_manager, trade_manager, l2_trade_util, \ current_price_process_manager, trade_juejin, trade_constant from code_attribute.code_data_util import ZYLTGBUtil import l2.transaction_progress from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \ logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_debug from log_module.log import logger_trade_delegate, logger_buy_1_volumn_record, \ logger_l2_trade_queue, logger_l2_trade_buy_queue, logger_debug from trade.huaxin import huaxin_trade_record_manager from trade.trade_manager import TradeTargetCodeModeManager from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager @@ -79,7 +75,6 @@ super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 # print("----setup方法被执行-----") # print("打印传入的参数:", self.server.pipe_trade) self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance() def __notify_trade(self, type_): if self.server.pipe_trade: @@ -124,107 +119,7 @@ else: print(_str) return_str = "OK" if type == 0: try: origin_start_time = round(time.time() * 1000) __start_time = round(time.time() * 1000) # level2盘口数据 day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data( _str) last_health_time = self.last_l2_listen_health_time.get((client, channel)) # --------------------------------设置L2健康状态-------------------------------- if last_health_time is None or __start_time - last_health_time > 1000: self.last_l2_listen_health_time[(client, channel)] = __start_time # 更新监听位健康状态 if origin_datas_count == 0: l2_listen_pos_health_manager.set_unhealthy(client, channel) else: l2_listen_pos_health_manager.set_healthy(client, channel) l2_log.threadIds[code] = random.randint(0, 100000) if True: # 间隔1s保存一条l2的最后一条数据 if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[ code] >= 1000 and len(origin_datas) > 0: self.l2_save_time_dict[code] = origin_start_time logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1]) # 10ms的网络传输延时 capture_timestamp = __start_time - process_time - 10 # print("截图时间:", process_time) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "截图时间:{} 数据解析时间".format(process_time)) cid, pid = gpcode_manager.get_listen_code_pos(code) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2获取代码位置耗时") # 判断目标代码位置是否与上传数据位置一致 if cid is not None and pid is not None and client == int(cid) and channel == int(pid): # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas)) l2_data_util.save_l2_latest_data_number(code, origin_datas_count) # 保存l2数据条数 if not origin_datas: # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)): raise Exception("无新增数据") # 保存最近的数据 self.latest_oringin_data[code] = origin_datas limit_up_price = gpcode_manager.get_limit_up_price(code) datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price) try: # 校验客户端代码 l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) __start_time = round(time.time() * 1000) if gpcode_manager.is_listen(code): __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2外部数据预处理耗时") l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "l2数据有效处理外部耗时", False) # 保存原始数据数量 # l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) # if round(time.time() * 1000) - __start_time > 20: # l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, # "异步保存原始数据条数耗时", # False) except l2_data_manager.L2DataException as l: # 单价不符 if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: key = "{}-{}-{}".format(client, channel, code) if key not in self.l2_data_error_dict or round( time.time() * 1000) - self.l2_data_error_dict[key] > 10000: # self.l2CodeOperate.repaire_l2_data(code) logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) # 单价不一致时需要移除代码重新添加 l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误") self.l2_data_error_dict[key] = round(time.time() * 1000) except Exception as e: print("异常", str(e), code) logging.exception(e) logger_l2_error.error("出错:{}".format(str(e))) logger_l2_error.error("内容:{}".format(_str)) finally: __end_time = round(time.time() * 1000) # 只记录大于40ms的数据 if __end_time - origin_start_time > 100: l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, "l2数据处理总耗时", True) except Exception as e: if str(e).find("新增数据"): pass else: logger_l2_error.exception(e) elif type == 2: if type == 2: # 涨停代码 dataList, is_add = data_process.parseGPCode(_str) # 设置涨停时间 @@ -447,16 +342,6 @@ space = time.time() - __start_time if space > 0.1: logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg) elif type == 20: # 登录 data = data_process.parse(_str)["data"] try: client_id, _authoritys = authority.login(data["account"], data["pwd"]) return_str = data_process.toJson( {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}}) except Exception as e: return_str = data_process.toJson({"code": 1, "msg": str(e)}) # 现价更新 elif type == 40: datas = data_process.parse(_str)["data"] @@ -509,56 +394,6 @@ # if need_sync: # # 同步数据 # L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) elif type == 30: # 心跳信息 data = data_process.parse(_str)["data"] client_id = data["client"] thsDead = data.get("thsDead") logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) client_manager.saveClientActive(int(client_id), host, thsDead) if constant.is_windows(): # 动态导入 if ths_util.is_ths_dead(client_id): # TODO 重启同花顺 # 报警 l2_clients = authority.get_l2_clients() if client_id in l2_clients: alert_util.alarm() elif type == 60: # L2自启动成功 data = data_process.parse(_str)["data"] client_id = data["client"] print("L2自启动成功", client_id) now_str = tool.get_now_time_str() ts = tool.get_time_as_second(now_str) # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行 if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False: # 准备批量设置代码 return_json = {"code": 1, "msg": "等待批量设置代码"} return_str = json.dumps(return_json) # 获取排名前16位的代码 codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16) codes = sorted(codes) if client_id == 2: codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE] else: codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:] codes_datas = [] for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE): if i >= len(codes): break codes_datas.append((i, codes[i])) # 如果设置失败需要重试2次 for i in range(0, 3): set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas) if set_success: break else: time.sleep(3) else: return_json = {"code": 0, "msg": "开启在线状态"} return_str = json.dumps(return_json) elif type == 70: # 选股宝热门概念 data_json = data_process.parse(_str) @@ -833,81 +668,6 @@ def finish(self): super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做 # print("--------finish方法被执行---") def send_msg(client_id, data): _ip = client_manager.getActiveClientIP(client_id) # print("ip", client_id, _ip) if _ip is None or len(_ip) <= 0: raise Exception("客户端IP为空") socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socketClient.connect((_ip, 9006)) # 连接socket try: socketClient.send(json.dumps(data).encode()) recv = socketClient.recv(1024) result = str(recv, encoding="gbk") return result finally: socketClient.close() # 客户端心跳机制 def test_client_server(): while True: clients = authority.get_l2_clients() for client in clients: # print("心跳", client) try: send_msg(client, {"action": "test"}) except: pass # 矫正客户端代码 l2_code_operate.correct_client_codes() time.sleep(5) # 获取采集客户端的状态 def get_client_env_state(client): result = send_msg(client, {"action": "getEnvState"}) result = json.loads(result) if result["code"] == 0: return json.loads(result["data"]) else: raise Exception(result["msg"]) # 修复采集客户端 def repair_client_env(client): result = send_msg(client, {"action": "repairEnv"}) result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) # 同步目标标的到同花顺 def sync_target_codes_to_ths(): codes = gpcode_manager.get_second_gp_list() code_list = [] for code in codes: code_list.append(code) client = authority._get_client_ids_by_rule("data-maintain") result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list}) return result # 修复同花顺主站 def repair_ths_main_site(client): result = send_msg(client, {"action": "updateTHSSite"}) result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) else: # 测速成功 client_infos = [] for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE): client_infos.append((client, index)) l2_listen_pos_health_manager.init_all(client_infos) if __name__ == "__main__1": test_l1.py
File was deleted test_l1.spec
File was deleted ths/client_manager.py
File was deleted ths/l2_code_operate.py
File was deleted ths/l2_listen_pos_health_manager.py
File was deleted trade/current_price_process_manager.py
@@ -11,7 +11,6 @@ import constant from code_attribute import gpcode_manager from utils import tool, import_util from ths.l2_code_operate import L2CodeOperate from trade import trade_manager, l2_trade_util, trade_constant from trade.trade_data_manager import CodeActualPriceProcessor @@ -131,33 +130,7 @@ except Exception as e: logging.exception(e) else: # 后面的代码数量 # 先删除应该删除的代码 for code in del_code_list: if gpcode_manager.is_listen_old(code): cid, pid = gpcode_manager.get_listen_code_pos(code) # 强制移除 if cid and pid: gpcode_manager.set_listen_code_by_pos(cid, pid, "") # 判断是否在监听里面 L2CodeOperate.get_instance().add_operate(0, code, "现价变化") # 增加应该增加的代码 for code in add_code_list: if not gpcode_manager.is_listen_old(code): L2CodeOperate.get_instance().add_operate(1, code, "现价变化") # 获取卡位数量 free_count = gpcode_manager.get_free_listen_pos_count() if free_count < 2: # 空闲位置不足 listen_codes = gpcode_manager.get_listen_codes() for code in listen_codes: if not gpcode_manager.is_in_gp_pool(code): client_id, pos = gpcode_manager.get_listen_code_pos(code) gpcode_manager.set_listen_code_by_pos(client_id, pos, "") free_count += 1 if free_count > 2: break pass __trade_price_dict = {} trade/l2_trade_util.py
@@ -41,8 +41,7 @@ trade_record_log_util.add_forbidden_buy_log(code, msg) add_to_forbidden_trade_codes(code) async_log_util.warning(logger_trade, f"{code}加入黑名单原因:{msg}") # l2_data_manager.remove_from_l2_fixed_codes(code) # l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易") def is_in_forbidden_trade_codes(code): user/authority.py
File was deleted utils/alert_util.py
File was deleted