Administrator
2023-01-06 59fba698b03a51a8da5b56a919ebbf94d4784f74
juejin.py
@@ -1,21 +1,44 @@
"""
掘金
"""
from __future__ import print_function, absolute_import
import datetime
import json
import logging
import time as t
import schedule
import gm.api as gmapi
import big_money_num_manager
import client_manager
import code_volumn_manager
import constant
import global_data_loader
import global_util
import gpcode_manager
import threading
import l2_trade_util
import server
import tool
import redis_manager
import authority
import decimal
from l2_code_operate import L2CodeOperate
from log import logger_juejin_tick
redisManager = redis_manager.RedisManager()
import trade_gui
from l2_code_operate import L2CodeOperate
import l2_data_manager_new
from log import logger_juejin_tick, logger_system
from trade_data_manager import CodeActualPriceProcessor
from trade_queue_manager import JueJinBuy1VolumnManager
redisManager = redis_manager.RedisManager(0)
__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager()
__actualPriceProcessor = CodeActualPriceProcessor()
# 设置账户信息
@@ -34,13 +57,81 @@
    return account_id, strategy_id, token
def init_data():
    # 删除之前的分钟级大单撤单数据
    l2_data_manager_new.SecondAverageBigNumComputer.clear_data()
    l2_data_manager_new.AverageBigNumComputer.clear_data()
    # 删除所有的涨停卖数据
    l2_data_manager_new.L2LimitUpSellStatisticUtil.clear()
    # 重置所有的大单数据
    big_money_num_manager.reset_all()
    # 清除水下捞数据
    __actualPriceProcessor.clear_under_water_data()
    # 载入行业股票代码
    global_data_loader.load_industry()
    # 载入代码自由流通市值
    global_data_loader.load_zyltgb()
    # 载入量
    global_data_loader.load_volumn()
# 每日初始化
def everyday_init():
    # 交易時間不能做初始化
    if not tool.is_init_time():
        raise Exception("交易时间不能初始化")
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
    # 今日实时涨停
    global_data_loader.add_limit_up_codes([], True)
    # 主要获取收盘价
    get_latest_info(None)
    # 获取60天最大量与昨日量
    global_util.today_volumn.clear()
    global_util.max60_volumn.clear()
    global_util.yesterday_volumn.clear()
    volumn_dict = get_volumns(codes)
    for key in volumn_dict:
        code_volumn_manager.set_histry_volumn(key, volumn_dict[key][0], volumn_dict[key][1])
    # 清除大单数据
    global_util.big_money_num.clear()
    # 初始化大单数据
    for code in codes:
        big_money_num_manager.add_num(code, 0)
        big_money_num_manager.expire(code)
    # 清除涨停时间
    global_util.limit_up_time.clear()
    init_data()
    # 初始化同花顺主站
    l2_clients = client_manager.getValidL2Clients()
    for client in l2_clients:
        try:
            server.repair_ths_main_site(client)
        except Exception as e:
            pass
def __run_schedule():
    while True:
        schedule.run_pending()
def init(context):
    # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30)
    # 订阅浦发银行, bar频率为一天和一分钟
    # 订阅订阅多个频率的数据,可多次调用subscribe
    # 获取需要监听的股票
    print("掘金初始化")
    init_data()
    logger_system.info("掘金初始化")
    schedule.every().day.at("09:15:00").do(everyday_init)
    t1 = threading.Thread(target=lambda: __run_schedule())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
    # 多个时间点获取收盘价
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:30:00')
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:50:00')
@@ -81,7 +172,7 @@
def get_current_info():
    data = gpcode_manager.get_gp_list();
    results = JueJinManager.get_gp_current_info(data);
    logger_juejin_tick.debug("定时获取:{}",results)
    logger_juejin_tick.debug("定时获取:{}", results)
    for result in results:
        price = result["price"]
        symbol = result['symbol']
@@ -112,13 +203,12 @@
    # print(tick["created_at"])
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
    # 9点20-15:05接受数据
    start1 = 60 * 60 * 9 + 24 * 60;
    end1 = 60 * 60 * 11 + 35 * 60;
    start2 = 60 * 60 * 12 + 50 * 60;
    end2 = 60 * 60 * 15 + 5 * 60;
    start1 = 60 * 60 * 9 + 31 * 60
    end1 = 60 * 60 * 11 + 35 * 60
    start2 = 60 * 60 * 12 + 50 * 60
    end2 = 60 * 60 * 15 + 5 * 60
    # TODO 测试
    test = False
    if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or test:
    if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or constant.TEST:
        symbol = tick['symbol']
        price = tick['price']
        # print(symbol,price)
@@ -128,28 +218,135 @@
        # 保存最新价
        symbol = symbol.split(".")[1]
        logger_juejin_tick.info("{}   {}    {}".format(symbol, price, tick["created_at"]))
        JueJinManager.add_listen_code(symbol)
        time_ = tick["created_at"].strftime("%H:%M:%S")
        data_ = (symbol, time_, tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"])
        logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2],
                                data_[3])
        # 暂时不采用
        # need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3])
        # if need_sync:
        #     # 同步数据
        #     L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1])
        # print(tick["created_at"],tick["quotes"][0]["bid_v"])
        accpt_price(symbol, price)
        __prices_now[symbol] = price
# 获取到现价
def accpt_price(code, price):
def accpt_price(code, price, price_from="juejin"):
    return
    gpcode_manager.set_price(code, price)
    # 获取收盘价
    pricePre = gpcode_manager.get_price_pre(code)
    if pricePre is not None:
        rate = round((price - pricePre) * 100 / pricePre, 1)
        if rate >= 7:
            print(code, price, rate)
            if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
            logger_juejin_tick.info("{}-{}-{}", code, price, rate)
            if not gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(
                    code) and not gpcode_manager.is_listen_full():
                L2CodeOperate.get_instance().add_operate(1, code)
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化,rate-{} from-{}".format(rate, price_from))
            # 进入监控
        elif rate < 5:
            # 移除监控
            if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
                L2CodeOperate.get_instance().add_operate(0, code)
            if gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(code):
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化,rate-{} from-{}".format(rate, price_from))
# 获取到现价
def accpt_prices(prices):
    print("价格代码数量:", len(prices))
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对
    if len(gpcode_manager.get_gp_list()) - len(prices) > 2:
        return
    now_str = datetime.datetime.now().strftime("%H:%M:%S")
    now_strs = now_str.split(":")
    now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2])
    start = 60 * 60 * 9 + 31 * 60
    if False:
        for d in prices:
            code, price = d["code"], float(d["price"])
            accpt_price(code, price, "ths")
    else:
        _code_list = []
        _delete_list = []
        for d in prices:
            code, price = d["code"], float(d["price"])
            gpcode_manager.set_price(code, price)
            # 获取收盘价
            pricePre = gpcode_manager.get_price_pre(code)
            if pricePre is not None:
                rate = round((price - pricePre) * 100 / pricePre, 2)
                if rate >= 0:
                    # 暂存涨幅为正的代码
                    _code_list.append((rate, code))
                else:
                    # 暂存涨幅为负的代码
                    _delete_list.append((rate, code))
                try:
                    __actualPriceProcessor.process_rate(code, rate, now_str)
                except Exception as e:
                    logging.exception(e)
                try:
                    __actualPriceProcessor.save_current_price(code, price,
                                                              gpcode_manager.get_limit_up_price_by_preprice(
                                                                  pricePre) == tool.to_price(
                                                                  decimal.Decimal(d["price"])))
                except Exception as e:
                    logging.exception(e)
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True)
        # 预填充下单代码
        _buy_win_codes = []
        for d in new_code_list:
            _buy_win_codes.append(d[1])
        for d in _delete_list:
            _buy_win_codes.append(d[1])
        try:
            trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes)
        except Exception as e:
            logging.exception(e)
            pass
        client_ids = client_manager.getValidL2Clients()
        # 最多填充的代码数量
        max_count = len(client_ids) * 8
        if max_count == 0:
            max_count = 8
        # 截取前几个代码填充
        add_list = new_code_list[:max_count]
        # 后面的代码全部删除
        _delete_list.extend(new_code_list[max_count:])
        add_code_list = []
        del_list = []
        for d in add_list:
            add_code_list.append(d[1])
        for d in _delete_list:
            del_list.append(d[1])
        # 后面的代码数量
        # 先删除应该删除的代码
        for code in del_list:
            if gpcode_manager.is_listen_old(code):
                # 判断是否在监听里面
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
        # 增加应该增加的代码
        for code in add_code_list:
            if not gpcode_manager.is_listen_old(code):
                if not l2_trade_util.is_in_forbidden_trade_codes(code):
                    L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
            else:
                if l2_trade_util.is_in_forbidden_trade_codes(code):
                    L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
        print(add_code_list, del_list)
def on_bar(context, bars):
@@ -176,20 +373,18 @@
def recieve_msg(pipe):
    while True:
        value = pipe.recv()
        print("跨进程通信:",value)
        jsonValue= json.loads(value)
        action=jsonValue["type"]
        print("跨进程通信:", value)
        jsonValue = json.loads(value)
        action = jsonValue["type"]
        if action == 'resub':
            re_subscribe_tick()
        elif action == 'accpt_price':
            try:
                datas=jsonValue["data"]
                datas = jsonValue["data"]
                for data in datas:
                    accpt_price(data["code"],float(data["price"]))
                    accpt_price(data["code"], float(data["price"]))
            except Exception as e:
                print(str(e))
class JueJinManager:
@@ -231,6 +426,17 @@
    def stop(self):
        gmapi.stop()
    @classmethod
    def add_listen_code(cls, code):
        redis = redisManager.getRedis()
        redis.setex("juejin_listen_code-{}".format(code), 20, "1")
    @classmethod
    def get_listen_codes_lenth(cls):
        redis = redisManager.getRedis()
        keys = redis.keys("juejin_listen_code-*")
        return len(keys)
def trade(code, volume):
    account_id, s_id, token = getAccountInfo()
@@ -241,6 +447,7 @@
    print(result)
# 获取近90天的最大量与最近的量
def get_volumns(codes):
    end = datetime.datetime.now()
    # 获取近90天的历史数据
@@ -260,23 +467,15 @@
        code = result["symbol"].split(".")[1]
        volumn = int(result["volume"])
        day = "{:%Y-%m-%d}".format(result["eob"])
        if _fresult.get(code) is None:
            _fresult[code] = {"max_volumn": volumn, "latest_volumn": volumn}
        if volumn > _fresult[code]["max_volumn"]:
            _fresult[code]["max_volumn"] = volumn;
        _fresult[code]["latest_volumn"] = volumn;
        if _fresult.get(code) is None:
            _fresult[code] = (volumn, volumn)
        if volumn > _fresult[code][0]:
            _fresult[code] = (volumn, _fresult[code][1])
        _fresult[code] = (_fresult[code][0], volumn)
    return _fresult
if __name__ == '__main__':
    # trade("SZSE.000521", 100)
    # print("")
    # JueJinManager.get_gp_latest_info(["000592","002808"])
    get_current_info()
    # data_process.saveCodeVolumn(get_volumns(["000333","002911"]))
    # _redis_manager = redis_manager.RedisManager()
    # redis = _redis_manager.getRedis()
    # keys = redis.keys("test-inrec")
    # print(keys)
    everyday_init()