Administrator
2024-11-05 1bd4e57aada5b4d3decf0d4088fc949c819d7864
刪除同花顺时代的代码
31个文件已删除
11个文件已修改
2317 ■■■■■ 已修改文件
alert.mp3 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_data_util.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/mysql_data_delegate.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager_delegate.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/01524cdfb33d0fa1b5a69a5f60c0856c 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/01524cdfb33d0fa1b5a69a5f60c0856c.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/479fdde70427c212421e408d94a30892 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/479fdde70427c212421e408d94a30892.inf 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/5f02c16d1706a5857d1166a0279c4477 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/5f02c16d1706a5857d1166a0279c4477.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/79eda687b206cf9338a7498f287b9df1 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/79eda687b206cf9338a7498f287b9df1.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/831c0d379641e75a009c8571b789d91e 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/831c0d379641e75a009c8571b789d91e.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/97f026b9573053ae5dd8daacc7e75d88 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/97f026b9573053ae5dd8daacc7e75d88.inf 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/a3c899eb9dc5faac0ebba63e45c476fd 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/a3c899eb9dc5faac0ebba63e45c476fd.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/b3bf755a9732c235ef49c86d82db9841 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/b3bf755a9732c235ef49c86d82db9841.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/cc43a7195e37a97c7c85fdc5b7c37337 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/cc43a7195e37a97c7c85fdc5b7c37337.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/dd002e0132e87679024d8fed93933042 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/dd002e0132e87679024d8fed93933042.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/ec282109624ef7eedeba39f23faa3787 补丁 | 查看 | 原始文档 | blame | 历史
gmcache/ec282109624ef7eedeba39f23faa3787.inf 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 1054 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 142 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/server.py 248 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_l1.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_l1.spec 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/client_manager.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/l2_code_operate.py 313 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/l2_listen_pos_health_manager.py 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_util.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
user/authority.py 87 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/alert_util.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
alert.mp3
Binary files differ
api/outside_api_command_callback.py
@@ -36,7 +36,7 @@
    logger_real_place_order_position, logger_device
from output import l2_output_util
from third_data import kpl_util, history_k_data_manager, huaxin_l1_data_manager, third_blocks_manager, kpl_data_manager
from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager, RealTimeKplMarketData, \
from third_data.code_plate_key_manager import  KPLCodeJXBlockManager, \
    RadicalBuyBlockManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import JueJinApi, HistoryKDatasUtils
code_attribute/code_data_util.py
@@ -4,14 +4,12 @@
# 股票代码相关的参数
import decimal
import time
from code_attribute import gpcode_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from utils import tool
from utils.tool import async_call
__db = 0
_redisManager = redis_manager.RedisManager(0)
code_attribute/first_target_code_data_processor.py
@@ -16,8 +16,7 @@
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
from ths import l2_code_operate
from trade import trade_data_manager, l2_trade_util
from trade import l2_trade_util
from settings.trade_setting import MarketSituationManager
from utils import global_util, tool, init_data_util, buy_condition_util
code_attribute/gpcode_manager.py
@@ -5,7 +5,6 @@
import json
import time
import constant
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
@@ -13,8 +12,6 @@
from log_module.log import logger_pre_close_price
from utils import tool
import decimal
from ths import l2_listen_pos_health_manager, client_manager
__redisManager = redis_manager.RedisManager(0)
__db = 0
@@ -873,141 +870,3 @@
    finally:
        RedisUtils.realse(redis_instance)
# 获取可以操作的位置
def get_can_listen_pos(client_id=0):
    client_ids = []
    if client_id <= 0:
        client_ids = client_manager.getValidL2Clients()
    else:
        client_ids.append(client_id)
    # random.shuffle(client_ids)
    available_positions = []
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        k = "listen_code-{}-*".format(client_id)
        keys = RedisUtils.keys(redis_instance, k, auto_free=False)
        # random.shuffle(keys)
        codes = []
        for key in keys:
            index = key.split("-")[-1]
            if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE:
                continue
            result = RedisUtils.get(redis_instance, key, auto_free=False)
            if result is None or len(result) == 0:
                available_positions.append((client_id, int(key.replace("listen_code-{}-".format(client_id), ""))))
            else:
                codes.append((key, result))
        RedisUtils.realse(redis_instance)
        # 查询是否有重复的代码
        codes_set = set()
        count = 0
        for code in codes:
            count = count + 1
            codes_set.add(code[1])
            if len(codes_set) < count:
                return client_id, int(code[0].replace("listen_code-{}-".format(client_id), ""))
    if available_positions:
        # 获取健康状态
        available_positions_health_states = l2_listen_pos_health_manager.list_health_state(available_positions)
        # 尽量不分配第一个位置
        available_positions_new = sorted(available_positions,
                                         key=lambda x: (available_positions_health_states[x], 0 if x[1] == 0 else 1),
                                         reverse=True)
        # available_positions.sort(key=lambda x: available_positions_health_states[x], reverse=True)
        # 取第1个数据
        return available_positions_new[0][0], available_positions_new[0][1]
    return None, None
# 获取可以操作的位置
def get_free_listen_pos_count():
    client_ids = client_manager.getValidL2Clients()
    free_count = 0
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        try:
            k = "listen_code-{}-*".format(client_id)
            keys = RedisUtils.keys(redis_instance, k, auto_free=False)
            for key in keys:
                code = RedisUtils.get(redis_instance, key, auto_free=False)
                if not code:
                    free_count += 1
        finally:
            RedisUtils.realse(redis_instance)
    return free_count
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return None, None
    val = json.loads(val)
    cid, pid = val[0], val[1]
    code_ = get_listen_code_by_pos(cid, pid)
    # 校验代码
    if code_ == code:
        return cid, pid
    else:
        return None, None
# 是否正在监听
def is_listen(code):
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return False
    else:
        return True
    # codes = get_listen_codes()
    # return codes.__contains__(code)
def is_listen_old(code):
    codes = get_listen_codes()
    return codes.__contains__(code)
# 监听是否满了
def is_listen_full():
    clients = client_manager.getValidL2Clients()
    codes = get_listen_codes()
    return len(codes) >= constant.L2_CODE_COUNT_PER_DEVICE * len(clients)
# 是否正在操作
def is_operate(code):
    return RedisUtils.get(__redisManager.getRedis(), "gp_operate-{}".format(code)) is not None
# 设置正在操作的代码
def set_operate(code):
    RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 批量设置正在操作的代码
def set_operates(codes):
    for code in codes:
        RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 移除正在操作的代码
def rm_operate(code):
    RedisUtils.delete(__redisManager.getRedis(), "gp_operate-{}".format(code))
# 批量移除正在操作的代码
def rm_operates(codes):
    redis_instance = __redisManager.getRedis()
    try:
        for code in codes:
            RedisUtils.delete(redis_instance, "gp_operate-{}".format(code), auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
if __name__ == '__main__':
    get_can_listen_pos()
db/mysql_data_delegate.py
@@ -1,7 +1,3 @@
# 网络代理
import logging
import pymysql
# 把连接参数定义成字典
import constant
db/redis_manager.py
@@ -9,7 +9,6 @@
import constant
from log_module.log import logger_redis_debug
from utils import tool
config = constant.REDIS_CONFIG
db/redis_manager_delegate.py
@@ -3,7 +3,6 @@
"""
import logging
import queue
import threading
import time
import redis
gmcache/01524cdfb33d0fa1b5a69a5f60c0856c
Binary files differ
gmcache/01524cdfb33d0fa1b5a69a5f60c0856c.inf
File was deleted
gmcache/479fdde70427c212421e408d94a30892
Binary files differ
gmcache/479fdde70427c212421e408d94a30892.inf
File was deleted
gmcache/5f02c16d1706a5857d1166a0279c4477
Binary files differ
gmcache/5f02c16d1706a5857d1166a0279c4477.inf
File was deleted
gmcache/79eda687b206cf9338a7498f287b9df1
Binary files differ
gmcache/79eda687b206cf9338a7498f287b9df1.inf
File was deleted
gmcache/831c0d379641e75a009c8571b789d91e
Binary files differ
gmcache/831c0d379641e75a009c8571b789d91e.inf
File was deleted
gmcache/97f026b9573053ae5dd8daacc7e75d88
Binary files differ
gmcache/97f026b9573053ae5dd8daacc7e75d88.inf
File was deleted
gmcache/a3c899eb9dc5faac0ebba63e45c476fd
Binary files differ
gmcache/a3c899eb9dc5faac0ebba63e45c476fd.inf
File was deleted
gmcache/b3bf755a9732c235ef49c86d82db9841
Binary files differ
gmcache/b3bf755a9732c235ef49c86d82db9841.inf
File was deleted
gmcache/cc43a7195e37a97c7c85fdc5b7c37337
Binary files differ
gmcache/cc43a7195e37a97c7c85fdc5b7c37337.inf
File was deleted
gmcache/dd002e0132e87679024d8fed93933042
Binary files differ
gmcache/dd002e0132e87679024d8fed93933042.inf
File was deleted
gmcache/ec282109624ef7eedeba39f23faa3787
Binary files differ
gmcache/ec282109624ef7eedeba39f23faa3787.inf
File was deleted
gui.py
File was deleted
inited_data.py
@@ -4,62 +4,21 @@
from __future__ import print_function, absolute_import
import schedule
# import gm.api as gmapi
from cancel_strategy.s_l_h_cancel_strategy import LCancelBigNumComputer
from code_attribute import big_money_num_manager, global_data_loader, gpcode_manager, gpcode_first_screen_manager
from code_attribute.code_nature_analyse import LatestMaxVolumeManager, HighIncreaseCodeManager, CodeNatureRecordManager
from db.redis_manager_delegate import RedisUtils
from l2.l2_sell_manager import L2MarketSellManager
from l2.l2_transaction_data_manager import HuaXinBuyOrderManager
from third_data.third_blocks_manager import InvalidBlockManager, CodeThirdBlocksManager
from ths import client_manager
import constant
from trade.deal_big_money_manager import DealOrderNoManager
from trade.trade_manager import  AutoCancelSellModeManager
from utils import global_util, tool
import threading
from trade.trade_manager import AutoCancelSellModeManager
from utils import tool
from servers import server
from db import redis_manager_delegate as redis_manager
from user import authority
import decimal
from third_data.history_k_data_util import HistoryKDatasUtils
from trade import l2_trade_util, trade_manager
from trade import l2_trade_util
from l2.cancel_buy_strategy import L2LimitUpSellStatisticUtil
from log_module.log import logger_juejin_tick, logger_system
from trade.trade_data_manager import CodeActualPriceProcessor, PlaceOrderCountManager, AccountMoneyManager
from trade.trade_queue_manager import JueJinBuy1VolumnManager
from log_module.log import logger_system
from trade.trade_data_manager import PlaceOrderCountManager, AccountMoneyManager
from utils.ths_industry_util import ThsCodeIndustryManager
redisManager = redis_manager.RedisManager(0)
__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager()
__actualPriceProcessor = CodeActualPriceProcessor()
# 设置账户信息
def setAccountInfo(accountId, strategyId, token):
    redis = redisManager.getRedis()
    try:
        RedisUtils.set(redis, "juejin-account-id", accountId, auto_free=False)
        RedisUtils.set(redis, "juejin-strategy-id", strategyId, auto_free=False)
        RedisUtils.set(redis, "juejin-token", token, auto_free=False)
    finally:
        RedisUtils.realse(redis)
def getAccountInfo():
    redis = redisManager.getRedis()
    try:
        account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False)
        strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
        token = RedisUtils.get(redis, "juejin-token", auto_free=False)
        return account_id, strategy_id, token
    finally:
        RedisUtils.realse(redis)
def init_data():
@@ -116,104 +75,13 @@
        PlaceOrderCountManager().clear()
# 每日初始化
def everyday_init():
    # 交易時間不能做初始化
    # if not tool.is_init_time() or True:
    #     raise Exception("交易时间不能初始化")
    init_data()
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
    # 今日实时涨停
    global_data_loader.add_limit_up_codes([], True)
    # 主要获取收盘价
    __get_latest_info(None)
    # 获取60天最大量与昨日量
    global_util.today_volumn.clear()
    global_util.max60_volumn.clear()
    global_util.yesterday_volumn.clear()
    # 清除大单数据
    global_util.big_money_num.clear()
    # 初始化大单数据
    for code in codes:
        big_money_num_manager.add_num(code, 0)
        big_money_num_manager.expire(code)
    # 清除涨停时间
    global_util.limit_up_time.clear()
    # 初始化同花顺主站
    if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
        l2_clients = client_manager.getValidL2Clients()
        for client in l2_clients:
            try:
                server.repair_ths_main_site(client)
            except Exception as e:
                pass
def __run_schedule():
    while True:
        schedule.run_pending()
def init(context):
    # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30)
    # 订阅浦发银行, bar频率为一天和一分钟
    # 订阅订阅多个频率的数据,可多次调用subscribe
    # 获取需要监听的股票
    init_data()
    logger_system.info("掘金初始化")
    schedule.every().day.at("09:15:00").do(everyday_init)
    t1 = threading.Thread(target=lambda: __run_schedule())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
    # 多个时间点获取收盘价
    gmapi.schedule(schedule_func=__get_latest_info, date_rule='1d', time_rule='09:28:00')
    # 初始化内容
    clients = authority.get_l2_clients()
    for client in clients:
        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            gpcode_manager.init_listen_code_by_pos(client, i)
def __get_latest_info(context):
    # 初始化内容
    clients = authority.get_l2_clients()
    for c in clients:
        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            gpcode_manager.init_listen_code_by_pos(int(c), i)
    codes = gpcode_manager.get_gp_list()
    result = HistoryKDatasUtils.get_gp_latest_info(codes)
    for item in result:
        sec_level = item['sec_level']
        symbol = item['symbol']
        symbol = symbol.split(".")[1]
        pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
        if sec_level == 1:
            if symbol in codes:
                gpcode_manager.CodePrePriceManager.set_price_pre(symbol, pre_close)
        else:
            gpcode_manager.rm_gp(symbol)
# 获取最新的信息
def __get_current_info():
    data = gpcode_manager.get_gp_list()
    results = HistoryKDatasUtils.get_gp_current_info(data)
    logger_juejin_tick.debug("定时获取:{}", results)
    for result in results:
        price = result["price"]
        symbol = result['symbol']
        # 保存最新价
        symbol = symbol.split(".")[1]
__prices_now = {}
# 保存运行时数据
servers/server.py
@@ -14,23 +14,19 @@
from utils import alert_util, data_process, global_util, ths_industry_util, tool, import_util, socket_util
from code_attribute import code_volumn_manager, global_data_loader, gpcode_manager, first_target_code_data_processor
import constant
from user import authority
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2 import l2_data_manager_new, l2_data_manager, code_price_manager
import l2.l2_data_util
from third_data import block_info
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager
from ths import l2_listen_pos_health_manager, l2_code_operate, client_manager
from trade import trade_data_manager, trade_manager, l2_trade_util, \
    current_price_process_manager, trade_juejin, trade_constant
from code_attribute.code_data_util import ZYLTGBUtil
import l2.transaction_progress
from log_module.log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data, logger_l2_trade_buy_queue, logger_debug
from log_module.log import  logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_trade_buy_queue, logger_debug
from trade.huaxin import huaxin_trade_record_manager
from trade.trade_manager import TradeTargetCodeModeManager
from trade.trade_queue_manager import THSBuy1VolumnManager, thsl2tradequeuemanager
@@ -79,7 +75,6 @@
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
        # print("----setup方法被执行-----")
        # print("打印传入的参数:", self.server.pipe_trade)
        self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance()
    def __notify_trade(self, type_):
        if self.server.pipe_trade:
@@ -124,107 +119,7 @@
                    else:
                        print(_str)
                return_str = "OK"
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, origin_datas, origin_datas_count = l2.l2_data_util.parseL2Data(
                            _str)
                        last_health_time = self.last_l2_listen_health_time.get((client, channel))
                        # --------------------------------设置L2健康状态--------------------------------
                        if last_health_time is None or __start_time - last_health_time > 1000:
                            self.last_l2_listen_health_time[(client, channel)] = __start_time
                            # 更新监听位健康状态
                            if origin_datas_count == 0:
                                l2_listen_pos_health_manager.set_unhealthy(client, channel)
                            else:
                                l2_listen_pos_health_manager.set_healthy(client, channel)
                        l2_log.threadIds[code] = random.randint(0, 100000)
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(origin_datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, origin_datas[-1])
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
                            # print("截图时间:", process_time)
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "截图时间:{} 数据解析时间".format(process_time))
                            cid, pid = gpcode_manager.get_listen_code_pos(code)
                            __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                # l2.l2_data_util.set_l2_data_latest_count(code, len(origin_datas))
                                l2_data_util.save_l2_latest_data_number(code, origin_datas_count)
                                # 保存l2数据条数
                                if not origin_datas:
                                    # or not l2.l2_data_util.is_origin_data_diffrent(origin_datas,self.latest_oringin_data.get(code)):
                                    raise Exception("无新增数据")
                                # 保存最近的数据
                                self.latest_oringin_data[code] = origin_datas
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                datas = l2.l2_data_util.L2DataUtil.format_l2_data(origin_datas, code, limit_up_price)
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                    __start_time = round(time.time() * 1000)
                                    if gpcode_manager.is_listen(code):
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2外部数据预处理耗时")
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                        __start_time = l2_data_log.l2_time(code,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        # l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        # if round(time.time() * 1000) - __start_time > 20:
                                        #     l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                        #                         "异步保存原始数据条数耗时",
                                        #                         False)
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
                                    if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                        key = "{}-{}-{}".format(client, channel, code)
                                        if key not in self.l2_data_error_dict or round(
                                                time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                            # self.l2CodeOperate.repaire_l2_data(code)
                                            logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                            # 单价不一致时需要移除代码重新添加
                                            l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                            self.l2_data_error_dict[key] = round(time.time() * 1000)
                                except Exception as e:
                                    print("异常", str(e), code)
                                    logging.exception(e)
                                    logger_l2_error.error("出错:{}".format(str(e)))
                                    logger_l2_error.error("内容:{}".format(_str))
                                finally:
                                    __end_time = round(time.time() * 1000)
                                    # 只记录大于40ms的数据
                                    if __end_time - origin_start_time > 100:
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        if str(e).find("新增数据"):
                            pass
                        else:
                            logger_l2_error.exception(e)
                elif type == 2:
                if type == 2:
                    # 涨停代码
                    dataList, is_add = data_process.parseGPCode(_str)
                    # 设置涨停时间
@@ -447,16 +342,6 @@
                        space = time.time() - __start_time
                        if space > 0.1:
                            logger_debug.info("{}成交队列处理时间:{},{}", code, space, msg)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
                    try:
                        client_id, _authoritys = authority.login(data["account"], data["pwd"])
                        return_str = data_process.toJson(
                            {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
                    except Exception as e:
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                # 现价更新
                elif type == 40:
                    datas = data_process.parse(_str)["data"]
@@ -509,56 +394,6 @@
                            # if need_sync:
                            #     # 同步数据
                            #     L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    thsDead = data.get("thsDead")
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    client_manager.saveClientActive(int(client_id), host, thsDead)
                    if constant.is_windows():
                        # 动态导入
                        if ths_util.is_ths_dead(client_id):
                            # TODO 重启同花顺
                            # 报警
                            l2_clients = authority.get_l2_clients()
                            if client_id in l2_clients:
                                alert_util.alarm()
                elif type == 60:
                    # L2自启动成功
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    print("L2自启动成功", client_id)
                    now_str = tool.get_now_time_str()
                    ts = tool.get_time_as_second(now_str)
                    # 9点25到9点28之间的自启动就需要批量设置代码,目前永远不执行
                    if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00") and False:
                        # 准备批量设置代码
                        return_json = {"code": 1, "msg": "等待批量设置代码"}
                        return_str = json.dumps(return_json)
                        # 获取排名前16位的代码
                        codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
                        codes = sorted(codes)
                        if client_id == 2:
                            codes = codes[:constant.L2_CODE_COUNT_PER_DEVICE]
                        else:
                            codes = codes[constant.L2_CODE_COUNT_PER_DEVICE:]
                        codes_datas = []
                        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
                            if i >= len(codes):
                                break
                            codes_datas.append((i, codes[i]))
                        # 如果设置失败需要重试2次
                        for i in range(0, 3):
                            set_success = l2_code_operate.betch_set_client_codes(client_id, codes_datas)
                            if set_success:
                                break
                            else:
                                time.sleep(3)
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                elif type == 70:
                    # 选股宝热门概念
                    data_json = data_process.parse(_str)
@@ -833,81 +668,6 @@
    def finish(self):
        super().finish()  # 可以不调用父类的finish(),方法,父类的finish方法什么都没做
        # print("--------finish方法被执行---")
def send_msg(client_id, data):
    _ip = client_manager.getActiveClientIP(client_id)
    # print("ip", client_id, _ip)
    if _ip is None or len(_ip) <= 0:
        raise Exception("客户端IP为空")
    socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketClient.connect((_ip, 9006))
    # 连接socket
    try:
        socketClient.send(json.dumps(data).encode())
        recv = socketClient.recv(1024)
        result = str(recv, encoding="gbk")
        return result
    finally:
        socketClient.close()
# 客户端心跳机制
def test_client_server():
    while True:
        clients = authority.get_l2_clients()
        for client in clients:
            # print("心跳", client)
            try:
                send_msg(client, {"action": "test"})
            except:
                pass
        # 矫正客户端代码
        l2_code_operate.correct_client_codes()
        time.sleep(5)
# 获取采集客户端的状态
def get_client_env_state(client):
    result = send_msg(client, {"action": "getEnvState"})
    result = json.loads(result)
    if result["code"] == 0:
        return json.loads(result["data"])
    else:
        raise Exception(result["msg"])
# 修复采集客户端
def repair_client_env(client):
    result = send_msg(client, {"action": "repairEnv"})
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
# 同步目标标的到同花顺
def sync_target_codes_to_ths():
    codes = gpcode_manager.get_second_gp_list()
    code_list = []
    for code in codes:
        code_list.append(code)
    client = authority._get_client_ids_by_rule("data-maintain")
    result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
    return result
# 修复同花顺主站
def repair_ths_main_site(client):
    result = send_msg(client, {"action": "updateTHSSite"})
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
    else:
        # 测速成功
        client_infos = []
        for index in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            client_infos.append((client, index))
        l2_listen_pos_health_manager.init_all(client_infos)
if __name__ == "__main__1":
test_l1.py
File was deleted
test_l1.spec
File was deleted
ths/client_manager.py
File was deleted
ths/l2_code_operate.py
File was deleted
ths/l2_listen_pos_health_manager.py
File was deleted
trade/current_price_process_manager.py
@@ -11,7 +11,6 @@
import constant
from code_attribute import gpcode_manager
from utils import tool, import_util
from ths.l2_code_operate import L2CodeOperate
from trade import trade_manager, l2_trade_util, trade_constant
from trade.trade_data_manager import CodeActualPriceProcessor
@@ -131,33 +130,7 @@
            except Exception as e:
                logging.exception(e)
        else:
            # 后面的代码数量
            # 先删除应该删除的代码
            for code in del_code_list:
                if gpcode_manager.is_listen_old(code):
                    cid, pid = gpcode_manager.get_listen_code_pos(code)
                    # 强制移除
                    if cid and pid:
                        gpcode_manager.set_listen_code_by_pos(cid, pid, "")
                    # 判断是否在监听里面
                    L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
            # 增加应该增加的代码
            for code in add_code_list:
                if not gpcode_manager.is_listen_old(code):
                    L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
            # 获取卡位数量
            free_count = gpcode_manager.get_free_listen_pos_count()
            if free_count < 2:
                # 空闲位置不足
                listen_codes = gpcode_manager.get_listen_codes()
                for code in listen_codes:
                    if not gpcode_manager.is_in_gp_pool(code):
                        client_id, pos = gpcode_manager.get_listen_code_pos(code)
                        gpcode_manager.set_listen_code_by_pos(client_id, pos, "")
                        free_count += 1
                        if free_count > 2:
                            break
            pass
__trade_price_dict = {}
trade/l2_trade_util.py
@@ -41,8 +41,7 @@
            trade_record_log_util.add_forbidden_buy_log(code, msg)
        add_to_forbidden_trade_codes(code)
        async_log_util.warning(logger_trade, f"{code}加入黑名单原因:{msg}")
    # l2_data_manager.remove_from_l2_fixed_codes(code)
    # l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易")
def is_in_forbidden_trade_codes(code):
user/authority.py
File was deleted
utils/alert_util.py
File was deleted