Administrator
2023-06-25 7bfdbe969da0b3a132d805cd15e9e83e7f2a8f8a
将掘金接口抽象出单独的类
1个文件已删除
19个文件已修改
5个文件已添加
1514 ■■■■■ 已修改文件
constant.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
init.py 278 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 522 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_huaxin_util.py 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/block_info.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 163 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/hot_block_data_process.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/xgb.py 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 127 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_juejin.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_server.py 184 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_server_processor.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -42,5 +42,7 @@
# 开盘啦
KPL_INVALID_BLOCKS = ["一季报增长", "二季报增长", "三季报增长", "四季报增长", "业绩增长", "中报增长", "年报增长", "年报预增", "无", "次新股", "ST摘帽", "超跌",
                      "股权转让", "并购重组", "再融资", "年报预增", " 专精特新", "壳资源", "行业龙头", "参股金融", "科创板"]
# 是否开启掘金交易
JUEJIN_TRADE_ENABLE = True
# 是否开启API交易
API_TRADE_ENABLE = True
# 每只票买的金额
BUY_MONEY_PER_CODE = 8000
data_export_util.py
@@ -207,4 +207,4 @@
if __name__ == "__main__":
    export_l2_excel("603999")
    export_l2_excel("002343")
db/redis_manager.py
@@ -23,8 +23,9 @@
if __name__ == "__main__":
    _redisManager = RedisManager(0)
    redis = _redisManager.getRedis()
    keys = redis.keys("under_water_seconds-*")
    keys = redis.keys("volumn_max60-*")
    for k in keys:
        # print(k)
        redis.delete(k)
    pass
gui.py
@@ -16,7 +16,6 @@
from db import mysql_data, redis_manager
import server
import settings
from juejin import JueJinManager
from ths.l2_code_operate import L2CodeOperate
from trade.l2_trade_factor import L2TradeFactorUtil
from ocr import ocr_server
@@ -83,8 +82,6 @@
    tcpserver.serve_forever()
def startJueJin(pipe):
    juejin.JueJinManager(pipe).start()
class GUI:
@@ -94,7 +91,6 @@
        gs_gui_pipe, gs_server_pipe = multiprocessing.Pipe()
        self.serverProcess = multiprocessing.Process(target=createServer, args=(p1, gs_server_pipe,))
        self.jueJinProcess = multiprocessing.Process(target=startJueJin, args=(p2,))
        self.jueJinTradeProcess = multiprocessing.Process(target=trade_juejin.run)
        self.ocrServerProcess = multiprocessing.Process(target=createOCRServer)
@@ -128,8 +124,6 @@
            time.sleep(0.1)
    def run(self):
        # TODO
        self.jueJinProcess.start()
        self.serverProcess.start()
        self.ocrServerProcess.start()
        self.jueJinTradeProcess.start()
@@ -230,7 +224,7 @@
            sv_num.set("获取到收盘价数量:{}".format(count))
        def re_get_close_price():
            juejin.re_set_price_pres(gpcode_manager.get_gp_list())
            init.re_set_price_pres(gpcode_manager.get_gp_list())
        def get_limit_up_codes_win():
            width = 500
@@ -346,7 +340,7 @@
            try:
                if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") > 0:
                    raise Exception("只能9:30之前重新分配窗口")
                datas = JueJinManager.get_codes_limit_rate(gpcode_manager.get_gp_list())
                datas = HistoryKDatasUtils.get_codes_limit_rate(gpcode_manager.get_gp_list())
                matrix = numpy.array(datas)
                codes = matrix[:, 0].tolist()
                trade_gui.re_distribute_buy_win(codes)
@@ -376,7 +370,7 @@
                cl_win.configure(text="异常:{}".format(str(e)), foreground="#FF7F27")
            try:
                juejin_length = JueJinManager.get_listen_codes_lenth()
                juejin_length = 0
                codes_length = len(gpcode_manager.get_gp_list())
                cl_codes.configure(text="{}/{}".format(juejin_length, codes_length), foreground="#008000")
            except Exception as e:
@@ -605,7 +599,7 @@
            win.mainloop()
        def init():
            juejin.everyday_init()
            init.everyday_init()
        def set_accept_l2():
            settings.set_accept_l2(accept_l2.get())
@@ -870,7 +864,7 @@
            if len(account.strip()) < 1 or len(sid.strip()) < 1 or len(token.strip()) < 1:
                showinfo('提示', "数据不完整")
                return
            juejin.setAccountInfo(account, sid, token)
            init.setAccountInfo(account, sid, token)
            showinfo('提示', "设置成功")
        # 设置掘金信息
@@ -902,7 +896,7 @@
        frame.grid(row=0, column=2, pady=5, padx=5)
        # 设置参数
        account, sid, token = juejin.getAccountInfo()
        account, sid, token = init.getAccountInfo()
        account_var.set(account)
        sid_var.set(sid)
        token_var.set(token)
init.py
New file
@@ -0,0 +1,278 @@
"""
掘金
"""
from __future__ import print_function, absolute_import
import time as t
import schedule
import gm.api as gmapi
import big_money_num_manager
import client_manager
import constant
import global_data_loader
import global_util
import gpcode_first_screen_manager
import gpcode_manager
import threading
import server
import tool
from db import redis_manager
import authority
import decimal
from third_data.history_k_data_util import HistoryKDatasUtils
from trade import trade_gui, l2_trade_util, trade_manager
from l2.cancel_buy_strategy import L2LimitUpSellStatisticUtil
from log import logger_juejin_tick, logger_system
from trade.trade_data_manager import CodeActualPriceProcessor
from trade.trade_queue_manager import JueJinBuy1VolumnManager
redisManager = redis_manager.RedisManager(0)
__jueJinBuy1VolumnManager = JueJinBuy1VolumnManager()
__actualPriceProcessor = CodeActualPriceProcessor()
# 设置账户信息
def setAccountInfo(accountId, strategyId, token):
    redis = redisManager.getRedis()
    redis.set("juejin-account-id", accountId)
    redis.set("juejin-strategy-id", strategyId)
    redis.set("juejin-token", token)
def getAccountInfo():
    redis = redisManager.getRedis()
    account_id = redis.get("juejin-account-id")
    strategy_id = redis.get("juejin-strategy-id")
    token = redis.get("juejin-token")
    return account_id, strategy_id, token
def init_data():
    # 删除所有的涨停卖数据
    L2LimitUpSellStatisticUtil.clear()
    # 重置所有的大单数据
    big_money_num_manager.reset_all()
    # 清除水下捞数据
    __actualPriceProcessor.clear_under_water_data()
    # 载入行业股票代码
    global_data_loader.load_industry()
    # 载入代码自由流通市值
    global_data_loader.load_zyltgb()
    # 载入量
    global_data_loader.load_volumn()
    # 9点25之前删除所有代码
    if tool.trade_time_sub(tool.get_now_time_str(), "09:25:00") <= 0:
        # 删除L2监听代码
        gpcode_manager.clear_listen_codes()
        # 删除首板代码
        gpcode_manager.clear_first_codes()
        # 删除首板未筛选代码
        gpcode_first_screen_manager.clear_first_no_screen_codes()
        # 删除禁止代码
        l2_trade_util.init_forbidden_trade_codes()
        # 清空白名单
        l2_trade_util.WhiteListCodeManager.clear()
        # 清空想要买
        gpcode_manager.WantBuyCodesManager.clear()
        # 清空分数禁止代码
        trade_manager.ForbiddenBuyCodeByScoreManager.clear()
        # 清空暂停交易代码
        gpcode_manager.PauseBuyCodesManager.clear()
# 每日初始化
def everyday_init():
    # 交易時間不能做初始化
    if not tool.is_init_time():
        raise Exception("交易时间不能初始化")
    init_data()
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
    # 今日实时涨停
    global_data_loader.add_limit_up_codes([], True)
    # 主要获取收盘价
    __get_latest_info(None)
    # 获取60天最大量与昨日量
    global_util.today_volumn.clear()
    global_util.max60_volumn.clear()
    global_util.yesterday_volumn.clear()
    # 清除大单数据
    global_util.big_money_num.clear()
    # 初始化大单数据
    for code in codes:
        big_money_num_manager.add_num(code, 0)
        big_money_num_manager.expire(code)
    # 清除涨停时间
    global_util.limit_up_time.clear()
    # 初始化同花顺主站
    l2_clients = client_manager.getValidL2Clients()
    for client in l2_clients:
        try:
            server.repair_ths_main_site(client)
        except Exception as e:
            pass
def __run_schedule():
    while True:
        schedule.run_pending()
def init(context):
    # gmapi.subscribe(symbols="SZSE.002529", frequency="1d", count=30)
    # 订阅浦发银行, bar频率为一天和一分钟
    # 订阅订阅多个频率的数据,可多次调用subscribe
    # 获取需要监听的股票
    init_data()
    logger_system.info("掘金初始化")
    schedule.every().day.at("09:15:00").do(everyday_init)
    t1 = threading.Thread(target=lambda: __run_schedule())
    # 后台运行
    t1.setDaemon(True)
    t1.start()
    # 多个时间点获取收盘价
    gmapi.schedule(schedule_func=__get_latest_info, date_rule='1d', time_rule='09:28:00')
    # 初始化内容
    clients = authority.get_l2_clients()
    for client in clients:
        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            gpcode_manager.init_listen_code_by_pos(client, i)
def __get_latest_info(context):
    # 初始化内容
    clients = authority.get_l2_clients()
    for c in clients:
        for i in range(0, constant.L2_CODE_COUNT_PER_DEVICE):
            gpcode_manager.init_listen_code_by_pos(int(c), i)
    codes = gpcode_manager.get_gp_list()
    result = HistoryKDatasUtils.get_gp_latest_info(codes)
    for item in result:
        sec_level = item['sec_level']
        symbol = item['symbol']
        symbol = symbol.split(".")[1]
        pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
        if sec_level == 1:
            if symbol in codes:
                gpcode_manager.set_price_pre(symbol, pre_close)
        else:
            gpcode_manager.rm_gp(symbol)
# 获取最新的信息
def __get_current_info():
    data = gpcode_manager.get_gp_list()
    results = HistoryKDatasUtils.get_gp_current_info(data)
    logger_juejin_tick.debug("定时获取:{}", results)
    for result in results:
        price = result["price"]
        symbol = result['symbol']
        # 保存最新价
        symbol = symbol.split(".")[1]
# 设置收盘价
def re_set_price_pre(code):
    codes = [code]
    re_set_price_pres(codes)
def re_set_price_pres(codes, force=False):
    result = HistoryKDatasUtils.get_gp_latest_info(codes)
    for item in result:
        symbol = item['symbol']
        symbol = symbol.split(".")[1]
        pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
        gpcode_manager.set_price_pre(symbol, pre_close, force)
__prices_now = {}
# 获取近90天的最大量与最近的量
# 获取最近一次涨停/涨停下一个交易日的最大值
def get_volumns_by_code(code, count=60) -> object:
    datas = HistoryKDatasUtils.get_history_tick_n(code, count, "open,high,low,close,volume,pre_close,bob")
    # 计算
    datas.sort(key=lambda x: x["bob"], reverse=True)
    return datas
# 解析最大量
def parse_max_volume(datas, is_new_top=False):
    max_volume = 0
    max_volume_date = None
    if is_new_top:
        # 如果是突破前高就取最大量
        for item in datas:
            if max_volume < item["volume"]:
                max_volume = item["volume"]
                max_volume_date = item["bob"]
        return max_volume, max_volume, max_volume_date.strftime("%Y-%m-%d")
    else:
        date = None
        target_volume = None
        for i in range(len(datas)):
            # 查询涨停
            item = datas[i]
            volume = item["volume"]
            if max_volume < volume:
                max_volume = volume
                max_volume_date = item['bob']
            # 是否有涨停
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                # 涨停
                next_volume = 0
                if i > 0:
                    next_volume = datas[i - 1]["volume"]
                date = datas[i]["bob"]
                if volume < next_volume:
                    volume = next_volume
                    date = datas[i - 1]["bob"]
                target_volume = (volume, date)
                break
        if not target_volume:
            target_volume = (max_volume, max_volume_date)
        # --判断近60天无涨停的最大量
        max_60_volume_info = [0, None]
        # 60天内是否有涨停
        has_60_limit_up = False
        for i in range(60):
            if i >= len(datas):
                break
            item = datas[i]
            volume = item["volume"]
            if max_60_volume_info[0] < volume:
                max_60_volume_info = [volume, item["bob"]]
            limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"]))
            if abs(limit_up_price - item["high"]) < 0.01:
                has_60_limit_up = True
                break
        if not has_60_limit_up and target_volume[0] > max_60_volume_info[0] * 3:
            # 60天内无涨停,且60天内最大量小于最大量的1/3,判断为地量,返回近60个交易日的最大量
            return max_60_volume_info[0], max_60_volume_info[0], max_60_volume_info[1].strftime("%Y-%m-%d")
        else:
            return target_volume[0], target_volume[0], target_volume[1].strftime("%Y-%m-%d")
if __name__ == '__main__':
    # init_data()+
    pass
juejin.py
File was deleted
l2/cancel_buy_strategy.py
@@ -17,6 +17,7 @@
import tool
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from output import kp_client_msg_manager
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log, l2_data_log, l2_data_source_util
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas
@@ -380,6 +381,8 @@
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        cancel_rate_threshold = cls.__hCancelParamsManager.get_cancel_rate(volume_index)
        process_index = start_index
        # 是否有观测的数据撤单
        has_watch_canceled = False
        try:
            for i in range(start_index, end_index + 1):
                if i <= processed_index:
@@ -393,10 +396,11 @@
                    buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                     local_today_num_operate_map)
                    if buy_index is not None and buy_index in watch_indexs_dict:
                        has_watch_canceled = True
                        cancel_num += data["re"] * val["num"]
                        # 加入
                        cls.__add_watch_canceled_index(code, f"{buy_index}-{val['num']}")
                        rate__ = round(cancel_num / total_nums, 2)
                        rate__ = round(cancel_num / total_nums, 4)
                        if rate__ > cancel_rate_threshold:
                            indexs__ = list(watch_indexs_dict.keys())
                            indexs__.sort()
@@ -406,6 +410,7 @@
                            l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            return True, data
        finally:
            l2_log.cancel_debug(code, "H级撤单计算结果 范围:{}-{} 处理进度:{} 取消计算结果:{}/{} 目标撤单比例:{}", start_index, end_index,
                                process_index, cancel_num,
@@ -416,6 +421,13 @@
            logger_l2_h_cancel.info(f"code-{code} H撤已撤订单:{cls.__get_watch_canceled_index(code)}")
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, cancel_num)
            # 有观测数据撤单
            if has_watch_canceled:
                now_rate = round(cancel_num / total_nums, 4)
                if now_rate < cancel_rate_threshold and cancel_rate_threshold - now_rate < 0.1:
                    # 距离撤单在5%以内
                    kp_client_msg_manager.add_msg(code, f"逼近H撤({now_rate*100}%/{cancel_rate_threshold*100}%),需要人工判别。")
        return False, None
    # 下单成功
@@ -568,11 +580,15 @@
                    big_num_count += data["re"]
                # 判断是否达到阈值
                if total_count >= MIN_H_COUNT and big_num_count >= constant.H_CANCEL_MIN_BIG_NUM_COUNT:  # and total_num >= threshold_num
                    finished = True
                    l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{},目标计算数量:{}", json.dumps(list(watch_set)),
                                        total_data[-1]["index"], MIN_H_COUNT)
                    break
                    if len(total_data) <= i + 1 or (len(total_data) > i + 1 and total_data[i + 1]["val"]["time"] !=
                                                    total_data[buy_exec_index]["val"]["time"]):
                        # 至少囊括执行位本秒的数据
                        finished = True
                        l2_log.cancel_debug(code, "获取到H撤监听数据:{},计算截至位置:{},目标计算数量:{}", json.dumps(list(watch_set)),
                                            total_data[-1]["index"], MIN_H_COUNT)
                        break
        final_watch_list = list(watch_set)
        final_watch_list.sort(key=lambda x: x[0])
l2/l2_data_manager_new.py
@@ -1004,12 +1004,18 @@
        last_index = None
        count = 0
        start = None
        now_time_s = tool.get_time_as_second(tool.get_now_time_str())
        for i in range(start_index, end_index + 1):
            _val = datas[i]["val"]
            time_s = L2DataUtil.get_time_as_second(_val["time"])
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
            if time_s < second_930:
                continue
            if not constant.TEST:
                if abs(now_time_s - time_s) > 2:
                    # 正式环境下不处理2s外的数据
                    continue
            if L2DataUtil.is_limit_up_price_buy(_val):
@@ -1177,6 +1183,5 @@
    #                                                                  local_today_num_operate_map.get(
    #                                                                      "600213"))
    # print(buy_index, buy_data)
    dict_ = {"code": 0}
    dict_.clear()
    print(dict_)
    volume_rate = code_volumn_manager.get_volume_rate("002343")
    print(volume_rate)
l2/l2_huaxin_util.py
New file
@@ -0,0 +1,35 @@
"""
华鑫LV2处理工具类
"""
# 处理逐笔委托
def __convert_order(item, limit_up_price):
    time_str = f"{item['OrderTime']}"
    if time_str.startswith("9"):
        time_str = f"0{time_str}"
    time_ = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
    price = item["Price"]
    if price <= 0:
        # TODO 深证的买撤无价格数据,需要去查找价格数据
        pass
    limitPrice = 1 if abs(limit_up_price - price) < 0.001 else 0
    operateType = 0
    if item[''] == 'D':
        if item['Side'] == '1':
            # 买撤
            operateType = 1
        else:
            # 卖撤
            operateType = 3
    else:
        if item['Side'] == '1':
            # 买
            operateType = 0
        else:
            # 卖
            operateType = 2
    return {"time": time_, "price": price, "num": item["Volume"] // 100, "limitPrice": limitPrice,
            "operateType": operateType, "cancelTime": 0, "cancelTimeUnit": 0, "orderNo": item["OrderNO"],
            "mainSeq": item["MainSeq"], "subSeq": item["SubSeq"]}
l2_trade_test.py
@@ -49,6 +49,7 @@
    transaction_progress.TradeBuyQueue().set_traded_index(code, 0)
class VirtualTrade(unittest.TestCase):
    def __process_buy_queue(self, code, buy_queue, time_):
@@ -90,7 +91,7 @@
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002989"
        code = "002387"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
log.py
@@ -429,7 +429,6 @@
                    continue
                if line.find(f"H级撤单计算结果") < 0:
                    continue
                print(line)
                target_rate = line.split("目标比例:")[1].split(" ")[0].strip()
                cancel_num = line.split("取消计算结果")[1][1:].split("/")[0].strip()
                total_num = line.split("取消计算结果")[1][1:].split("/")[1].split(" ")[0].strip()
output/code_info_output.py
@@ -15,7 +15,6 @@
import global_data_loader
import global_util
import gpcode_manager
import juejin
import limit_up_time_manager
import log
import tool
server.py
@@ -21,7 +21,7 @@
import gpcode_first_screen_manager
import gpcode_manager
import authority
import juejin
import init
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log, l2_log, code_price_manager
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
@@ -33,6 +33,7 @@
from output import code_info_output
from third_data import hot_block_data_process, block_info, kpl_api
from third_data.code_plate_key_manager import CodesHisReasonAndBlocksManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLCodeLimitUpReasonManager, KPLLimitUpDataRecordManager
from ths import l2_listen_pos_health_manager, l2_code_operate
from trade import trade_gui, trade_data_manager, trade_manager, l2_trade_util, deal_big_money_manager, \
@@ -225,7 +226,7 @@
                    for data in data_list:
                        code_list.append(data["code"])
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    code_datas = HistoryKDatasUtils.get_gp_latest_info(code_list)
                    # if is_add:
                    #     gpcode_manager.add_gp_list(code_datas)
                    # else:
@@ -294,7 +295,7 @@
                                        # 获取涨停价
                                        _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if not _limit_up_price:
                                            juejin.re_set_price_pres([code],True)
                                            init.re_set_price_pres([code], True)
                                            # 再次获取涨停价
                                            _limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if _limit_up_price:
@@ -333,7 +334,7 @@
                        for code in codes:
                            # 如果涨停价是空值就需要设置昨日收盘价格
                            if gpcode_manager.get_limit_up_price(code) is None:
                                juejin.re_set_price_pres([code], True)
                                init.re_set_price_pres([code], True)
                        # 板块关键字准备
                        for code in codes:
@@ -359,8 +360,8 @@
                                    code) is None:
                                need_get_volumn = True
                            if need_get_volumn:
                                volumes_data = juejin.get_volumns_by_code(code, 150)
                                volumes = juejin.parse_max_volume(volumes_data[:90], code_nature_analyse.is_new_top(
                                volumes_data = init.get_volumns_by_code(code, 150)
                                volumes = init.parse_max_volume(volumes_data[:90], code_nature_analyse.is_new_top(
                                    gpcode_manager.get_limit_up_price(code), volumes_data[:90]))
                                logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
                                code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
@@ -378,8 +379,7 @@
                                                                     volumes_data)
                        gpcode_manager.FirstCodeManager.add_record(codes)
                        if new_add_codes:
                            gpcode_manager.set_first_gp_codes_with_data(juejin.JueJinManager.get_gp_latest_info(codes,
                                                                                                                fields="symbol,sec_name,sec_type,sec_level"))
                            gpcode_manager.set_first_gp_codes_with_data(HistoryKDatasUtils.get_gp_latest_info(codes,fields = "symbol,sec_name,sec_type,sec_level"))
                            # 加入首板历史记录
                            logger_first_code_record.info("新增首板:{}", new_add_codes)
@@ -406,7 +406,7 @@
                        # 获取涨停价
                        if temp_codes:
                            # 获取涨停价
                            juejin.re_set_price_pres(temp_codes)
                            init.re_set_price_pres(temp_codes)
                            # 重新获取涨停价
                            for code in temp_codes:
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -435,7 +435,7 @@
                                gpcode_manager.FirstCodeManager.add_limited_up_record([code])
                            pricePre = gpcode_manager.get_price_pre(code)
                            if pricePre is None:
                                juejin.re_set_price_pres([code])
                                init.re_set_price_pres([code])
                            rate = round((float(price) - pricePre) * 100 / pricePre, 1)
                            prices.append(
@@ -515,7 +515,7 @@
                    # 根据代码获取代码名称
                    codes_name = {}
                    if codes:
                        codes_name = juejin.JueJinManager.get_gp_codes_names(codes)
                        codes_name = HistoryKDatasUtils.get_gp_codes_names(codes)
                    ths_industry_util.save_industry_code(dataList, codes_name)
                elif type == 6:
                    # 可用金额
@@ -775,7 +775,7 @@
                    day = tool.get_now_date_str()
                    data_dict = {}
                    for i in range(0, 2):
                        day = juejin.JueJinManager.get_previous_trading_date(day)
                        day = HistoryKDatasUtils.get_previous_trading_date(day)
                        data_list = list(block_info.KPLLimitUpDataRecordManager.list_all(day))
                        codes_set = set()
                        if data_list:
@@ -808,7 +808,7 @@
                        l2_trade_util.forbidden_trade(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
@@ -833,7 +833,7 @@
                            l2_trade_util.WhiteListCodeManager.add_code(code)
                            name = gpcode_manager.get_code_name(code)
                            if not name:
                                results = juejin.JueJinManager.get_gp_codes_names([code])
                                results = HistoryKDatasUtils.get_gp_codes_names([code])
                                if results:
                                    gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                        return_str = json.dumps({"code": 0})
@@ -878,7 +878,7 @@
                        gpcode_manager.WantBuyCodesManager.add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    if "plates" in data["data"]:
@@ -920,7 +920,7 @@
                        gpcode_manager.PauseBuyCodesManager.add_code(code)
                        name = gpcode_manager.get_code_name(code)
                        if not name:
                            results = juejin.JueJinManager.get_gp_codes_names([code])
                            results = HistoryKDatasUtils.get_gp_codes_names([code])
                            if results:
                                gpcode_manager.CodesNameManager.add_first_code_name(code, results[code])
                    return_str = json.dumps({"code": 0})
@@ -1120,10 +1120,10 @@
        try:
            global_data_loader.load_zyltgb()
            limit_up_price = float(gpcode_manager.get_limit_up_price(code))
            volumes_data = juejin.get_volumns_by_code(code, 150)
            volumes_data = init.get_volumns_by_code(code, 150)
            volumes_data = volumes_data[1:]
            volumes = juejin.parse_max_volume(volumes_data[:60],
                                              code_nature_analyse.is_new_top(limit_up_price,
            volumes = init.parse_max_volume(volumes_data[:60],
                                            code_nature_analyse.is_new_top(limit_up_price,
                                                                             volumes_data[:60]))
            logger_first_code_record.info("{} 获取到首板60天最大量:{}", code, volumes)
            code_volumn_manager.set_histry_volumn(code, volumes[0], volumes[1], volumes[2])
third_data/block_info.py
@@ -4,9 +4,10 @@
import datetime
import constant
import juejin
import init
import tool
from third_data import kpl_util, kpl_data_manager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager, KPLDataManager
__before_block_dict = {}
@@ -57,10 +58,10 @@
    if now_day in __blocks_dict:
        return __blocks_dict[now_day]
    now_date = datetime.datetime.now()
    end_date = juejin.JueJinManager.get_previous_trading_date(tool.get_now_date_str())
    end_date = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
    start_date = now_date - datetime.timedelta(days=(day_count * 2 + 10))
    start_date = start_date.strftime("%Y-%m-%d")
    days = juejin.JueJinManager.get_trading_dates(start_date, end_date)
    days = HistoryKDatasUtils.get_trading_dates(start_date, end_date)
    days = days[0 - day_count:]
    results = KPLLimitUpDataRecordManager.list_blocks_with_day(days)
    __blocks_dict[now_day] = results
third_data/code_plate_key_manager.py
@@ -189,8 +189,8 @@
            return 0, "不买该板块"
        # 10:30以前可以挂2个单
        if int(tool.get_now_time_str().replace(':', '')) < int("103000"):
            return 2, "10:30以前可以挂2个单"
        if int(tool.get_now_time_str().replace(':', '')) < int("100000"):
            return 2, "10:00以前可以挂2个单"
        # 10:30以后
        if key not in cls.top_5_key_dict:
            return 0, "净流入没在前5"
third_data/data_server.py
@@ -10,7 +10,7 @@
import global_util
import gpcode_manager
import juejin
import init
import log
import log_analyse
import tool
@@ -19,6 +19,7 @@
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, \
    KPLCodeLimitUpReasonManager
from third_data.kpl_util import KPLDataType, KPLPlatManager
@@ -49,10 +50,11 @@
        # 统计目前为止的代码涨停数量(分涨停原因)
        now_limit_up_codes_info = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP)
        limit_up_reason_dict = {}
        for d in now_limit_up_codes_info:
            if d[5] not in limit_up_reason_dict:
                limit_up_reason_dict[d[5]] = [0, 0]
            limit_up_reason_dict[d[5]][0] += 1
        if now_limit_up_codes_info:
            for d in now_limit_up_codes_info:
                if d[5] not in limit_up_reason_dict:
                    limit_up_reason_dict[d[5]] = [0, 0]
                limit_up_reason_dict[d[5]][0] += 1
        # 获取想买原因想买单的代码数量
        reason_map = self.__KPLCodeLimitUpReasonManager.list_all()
        want_codes = gpcode_manager.WantBuyCodesManager.list_code()
@@ -413,7 +415,7 @@
            # 获取上个交易日的相同涨停原因的代码信息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict["code"]
            day = juejin.JueJinManager.get_previous_trading_date(tool.get_now_date_str())
            day = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
            # 获取涨停数据
            # 获取代码的原因
third_data/history_k_data_util.py
New file
@@ -0,0 +1,163 @@
"""
历史K线服务
"""
import decimal
import tool
from db import redis_manager
import gm.api as gmapi
class JueJinApi:
    __redisManager = redis_manager.RedisManager(0)
    # 获取掘金参数
    @classmethod
    def getJueJinAccountInfo(cls):
        redis = cls.__redisManager.getRedis()
        account_id = redis.get("juejin-account-id")
        strategy_id = redis.get("juejin-strategy-id")
        token = redis.get("juejin-token")
        return account_id, strategy_id, token
    @classmethod
    def get_juejin_code_list_with_prefix(cls, codes):
        list = []
        for d in codes:
            if d[0:2] == '00':
                list.append("SZSE.{}".format(d))
            elif d[0:2] == '60':
                list.append("SHSE.{}".format(d))
        return list
    @classmethod
    def get_gp_latest_info(cls, codes, fields=None):
        if not codes:
            return []
        account_id, s_id, token = cls.getJueJinAccountInfo()
        symbols = cls.get_juejin_code_list_with_prefix(codes)
        gmapi.set_token(token)
        data = gmapi.get_instruments(symbols=",".join(symbols), fields=fields)
        print(data)
        return data
    @classmethod
    def get_history_tick_n(cls, code, count, fields=None):
        account_id, s_id, token = cls.getJueJinAccountInfo()
        symbols = cls.get_juejin_code_list_with_prefix([code])
        gmapi.set_token(token)
        # 前除权
        results = gmapi.history_n(symbol=symbols[0], frequency="1d", count=count, adjust=1, fields=fields)
        return results
    @classmethod
    def get_gp_current_info(cls, codes):
        if not codes:
            return []
        account_id, s_id, token = cls.getJueJinAccountInfo()
        symbols = cls.get_juejin_code_list_with_prefix(codes)
        gmapi.set_token(token)
        data = gmapi.current(symbols=",".join(symbols))
        print(data)
        return data
        # 返回指定日期的上个交易日
    @classmethod
    def get_previous_trading_date(cls, date):
        account_id, s_id, token = cls.getJueJinAccountInfo()
        gmapi.set_token(token)
        return gmapi.get_previous_trading_date("SHSE", date)
    # 返回指定日期的下个交易日
    @classmethod
    def get_next_trading_date(cls, date):
        account_id, s_id, token = cls.getJueJinAccountInfo()
        gmapi.set_token(token)
        return gmapi.get_previous_trading_date("SHSE", date)
    @classmethod
    def get_trading_dates(cls, start_date, end_date):
        account_id, s_id, token = cls.getJueJinAccountInfo()
        gmapi.set_token(token)
        return gmapi.get_trading_dates("SHSE", start_date, end_date)
class HistoryKDatasUtils(object):
    @classmethod
    def get_gp_latest_info(cls, codes, fields=None):
        return JueJinApi.get_gp_latest_info(codes, fields)
    @classmethod
    def get_history_tick_n(cls, code, count, fields=None):
        return JueJinApi.get_history_tick_n(code, count, fields)
    @classmethod
    def get_gp_current_info(cls, codes):
        return JueJinApi.get_gp_current_info(codes)
    # 返回指定日期的上个交易日
    @classmethod
    def get_previous_trading_date(cls, date):
        return JueJinApi.get_previous_trading_date(date)
    # 返回指定日期的下个交易日
    @classmethod
    def get_next_trading_date(cls, date):
        return JueJinApi.get_next_trading_date(date)
    @classmethod
    def get_trading_dates(cls, start_date, end_date):
        return JueJinApi.get_trading_dates(start_date, end_date)
    @classmethod
    def get_now_price(cls, codes):
        data = cls.get_gp_current_info(codes)
        prices = []
        for item in data:
            code = item["symbol"].split(".")[1]
            price = item["price"]
            prices.append((code, price))
        return prices
    # 获取代码的涨幅
    @classmethod
    def get_codes_limit_rate(cls, codes):
        datas = cls.get_gp_latest_info(codes)
        pre_price_dict = {}
        for data in datas:
            code = data["sec_id"]
            pre_close = tool.to_price(decimal.Decimal(str(data['pre_close'])))
            pre_price_dict[code] = pre_close
        now_prices = cls.get_now_price(codes)
        f_results = []
        for data in now_prices:
            code = data[0]
            price = data[1]
            pre_price = float(pre_price_dict.get(code))
            rate = round((price - pre_price) * 100 / pre_price, 2)
            f_results.append((code, rate))
        f_results.sort(key=lambda tup: tup[1])
        f_results.reverse()
        return f_results
    @classmethod
    def get_lowest_price_rate(cls, code, count):
        datas = cls.get_history_tick_n(code, count)
        low_price = datas[0]["close"]
        for data in datas:
            if low_price > data["close"]:
                low_price = data["close"]
        return (datas[-1]["close"] - low_price) / low_price
    @classmethod
    def get_gp_codes_names(cls, codes):
        datas = cls.get_gp_latest_info(codes)
        results = {}
        for data in datas:
            code = data["symbol"].split(".")[1]
            code_name = data['sec_name']
            results[code] = code_name
        return results
third_data/hot_block_data_process.py
@@ -4,13 +4,14 @@
import datetime
import json
import juejin
import init
import tool
from db import redis_manager
from db import mysql_data
import limit_up_time_manager
import gpcode_manager
from l2 import code_price_manager
from third_data.history_k_data_util import HistoryKDatasUtils
__redisManager = redis_manager.RedisManager(0)
INVALID_BLOCKS = ["其他", "ST股", "ST摘帽", "业绩增长", "业绩预增", "公告", "次新股"]
@@ -159,10 +160,10 @@
    if now_day in __blocks_dict:
        return __blocks_dict[now_day]
    now_date = datetime.datetime.now()
    end_date = juejin.JueJinManager.get_previous_trading_date(tool.get_now_date_str())
    end_date = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
    start_date = now_date - datetime.timedelta(days=(day_count * 2 + 10))
    start_date = start_date.strftime("%Y-%m-%d")
    days = juejin.JueJinManager.get_trading_dates(start_date, end_date)
    days = HistoryKDatasUtils.get_trading_dates(start_date, end_date)
    days = days[0 - day_count:]
    results = XGBHotBlockDataManager.list_blocks_with_day(days)
    __blocks_dict[now_day] = results
third_data/xgb.py
@@ -3,7 +3,7 @@
from datetime import datetime
import requests
import juejin
import init
from third_data import hot_block
@@ -40,18 +40,4 @@
if __name__ == '__main__':
    date = '2022-07-22'
    for i in range(100):
        date = juejin.JueJinManager.get_previous_trading_date(date)
        print(date)
        results = get_data(date)
        fresults = []
        for key in results:
            #print("--------",key)
            #for data in results[key]:
            #    print(data)
            fresults.append((key,None,results[key]))
        print(fresults)
        hot_block.upload_data(date,fresults)
        time.sleep(5)
    pass
trade/current_price_process_manager.py
@@ -72,7 +72,7 @@
        for d in _delete_list:
            _buy_win_codes.append(d[1])
        try:
            if not constant.JUEJIN_TRADE_ENABLE:
            if not constant.API_TRADE_ENABLE:
                trade_gui.THSBuyWinManagerNew.fill_codes(_buy_win_codes)
        except Exception as e:
            logging.exception(e)
@@ -110,6 +110,10 @@
        # 先删除应该删除的代码
        for code in del_code_list:
            if gpcode_manager.is_listen_old(code):
                cid, pid = gpcode_manager.get_listen_code_pos(code)
                # 强制移除
                if cid and pid:
                    gpcode_manager.set_listen_code_by_pos(cid, pid, "")
                # 判断是否在监听里面
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
        # 增加应该增加的代码
trade/l2_trade_factor.py
@@ -22,15 +22,18 @@
    def get_buy_rank_desc(self):
        continue_count = self.get_begin_continue_buy_count()
        time_range = self.get_time_range()
        m = self.get_m_val()[0]
        m = m//10000
        desc = ""
        if self.buy_rank == 0:
            desc = f"买入信号({continue_count})"
        elif self.buy_rank == 1:
            desc = f"买入信号({continue_count})+M值≥1000万"
            desc = f"买入信号({continue_count})+M值≥{m}万"
        elif self.buy_rank == 2:
            desc = f"买入信号({continue_count})+M值≥1000万+至少含1笔大单"
            desc = f"买入信号({continue_count})+M值≥{m}万+至少含1笔大单"
        elif self.buy_rank == 3:
            desc = f"买入信号({continue_count})+M值≥1000万+至少含2笔大单"
            desc = f"买入信号({continue_count})+M值≥{m}万+至少含2笔大单"
        else:
            desc = "常规买入"
        desc += f"+囊括时间{time_range}s"
@@ -43,37 +46,46 @@
    # 3:买入信号+M1000+2笔大单
    # 100:执行之前固有方案
    def get_buy_rank(self):
        # 判断有没有炸开
        if code_price_manager.Buy1PriceManager.is_can_buy(self.code):
            # 回封
            if self.score_index == 0:
                return 0
            elif self.score_index == 1:
                return 1
            elif self.score_index == 2:
                return 2
            else:
                return 100
        if self.score_index == 0:
            return 0
        elif self.score_index == 1:
            return 1
        elif self.score_index == 2:
            return 2
        else:
            # 首封
            if tool.trade_time_sub(self.now_time, "10:30:00") < 0 or tool.trade_time_sub(self.now_time, "14:00:00") > 0:
                if self.score_index == 0:
                    return 1
                elif self.score_index == 1:
                    return 2
                elif self.score_index == 2:
                    return 3
                else:
                    return 100
            else:
                if self.score_index == 0:
                    return 2
                elif self.score_index == 1:
                    return 3
                elif self.score_index == 2:
                    return 100
                else:
                    return 100
            return 100
        # 暂时不需要次此中策略
        # # 判断有没有炸开
        # if code_price_manager.Buy1PriceManager.is_can_buy(self.code):
        #     # 回封
        #     if self.score_index == 0:
        #         return 0
        #     elif self.score_index == 1:
        #         return 1
        #     elif self.score_index == 2:
        #         return 2
        #     else:
        #         return 100
        # else:
        #     # 首封
        #     if tool.trade_time_sub(self.now_time, "10:30:00") < 0 or tool.trade_time_sub(self.now_time, "14:00:00") > 0:
        #         if self.score_index == 0:
        #             return 1
        #         elif self.score_index == 1:
        #             return 2
        #         elif self.score_index == 2:
        #             return 3
        #         else:
        #             return 100
        #     else:
        #         if self.score_index == 0:
        #             return 2
        #         elif self.score_index == 1:
        #             return 3
        #         elif self.score_index == 2:
        #             return 100
        #         else:
        #             return 100
    # 得分
    def __init__(self, code, is_first_code, volume_rate, volume_rate_index, score, now_time=tool.get_now_time_str()):
@@ -132,8 +144,9 @@
    def get_time_range(self):
        # ts = [pow(3, 1), pow(3, 1), pow(3, 1), pow(3, 2), pow(3, 2), pow(3, 3), pow(3, 3), pow(3, 3)]
        ts = [pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1), pow(2, 1)]
        if -1 < self.score_index < 3:
            return ts[0]
        # 暂时去除分的影响
        # if -1 < self.score_index < 3:
        #     return ts[0]
        volume_rate_index = self.volume_rate_index
        if self.volume_rate_index >= len(ts):
            volume_rate_index = -1
@@ -141,14 +154,14 @@
    # 获取需要的大单个数
    def get_big_num_count(self):
        if self.is_first_code:
            if self.buy_rank < 2:
                return 0
            elif self.buy_rank == 2:
                return 1
            elif self.buy_rank == 3:
                return 2
        counts = [3, 1, 1, 1, 0, 0, 0, 0]
        # if self.is_first_code:
        #     if self.buy_rank < 2:
        #         return 0
        #     elif self.buy_rank == 2:
        #         return 1
        #     elif self.buy_rank == 3:
        #         return 2
        counts = [2, 1, 1, 1, 0, 0, 0, 0]
        volume_rate_index = self.volume_rate_index
        if self.volume_rate_index >= len(counts):
            volume_rate_index = -1
@@ -156,16 +169,16 @@
    # 获取安全笔数影响比例
    def get_safe_count_rate(self):
        rates = [0, -0.1, -0.2, -0.4, -0.6, -0.8, -0.8, -0.8]
        rates = [0, -0.1, -0.2, -0.3, -0.4, -0.5, -0.6, -0.7]
        volume_rate_index = self.volume_rate_index
        if self.volume_rate_index >= len(rates):
            volume_rate_index = -1
        return rates[volume_rate_index]
    def get_safe_count(self):
        if self.is_first_code:
            if self.buy_rank < 4:
                return 2
        # if self.is_first_code:
        #     if self.buy_rank < 4:
        #         return 2
        base_count, min_count, max_count = L2TradeFactorUtil.get_safe_buy_count(self.code, True)
        rate = self.get_safe_count_rate()
        count = int(round(base_count * (1 + rate)))
@@ -187,14 +200,14 @@
            global_data_loader.load_zyltgb()
            zyltgb = global_util.zyltgb_map.get(self.code)
        if self.is_first_code:
            if self.buy_rank == 0:
                return 0, ""
            elif self.is_want_buy and zyltgb and zyltgb < 20 * 10000 * 10000:
                # 小于20亿的想买单
                return 500 * 10000, ""
            elif self.buy_rank < 4:
                return 1000 * 10000, ""
        # if self.is_first_code:
        #     if self.buy_rank == 0:
        #         return 0, ""
        #     elif self.is_want_buy and zyltgb and zyltgb < 20 * 10000 * 10000:
        #         # 小于20亿的想买单
        #         return 500 * 10000, ""
        #     elif self.buy_rank < 4:
        #         return 1000 * 10000, ""
        base_m = L2TradeFactorUtil.get_base_safe_val(zyltgb)
        rate = self.get_m_val_rate(self.volume_rate_index)
@@ -244,7 +257,7 @@
    # 获取撤销比例
    @staticmethod
    def get_cancel_rate(volume_rate_index):
        rates = [0.39, 0.49, 0.59, 0.69, 0.69, 0.79, 0.79, 0.79]
        rates = [0.35, 0.45, 0.55, 0.65, 0.65, 0.75, 0.75, 0.75]
        if volume_rate_index >= len(rates):
            volume_rate_index = -1
        return rates[volume_rate_index]
@@ -524,7 +537,7 @@
    # print(L2TradeFactorUtil.get_safe_buy_count("003005"))
    # print(L2TradeFactorUtil.get_rate_factors("003004"))
    # print(L2TradeFactorUtil.factors_to_string("003004"))
    for i in range(2, 15):
    for i in range(2, 50):
        print(i, L2TradeFactorUtil.get_base_safe_val(100000000 * i))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00"))
trade/trade_juejin.py
@@ -16,8 +16,8 @@
__context_dict = {}
account_id = "8099a935-a991-4871-977f-206c6d3e04ca"
token = "a2eed2b159e9238dc0353fc3e73734d7677f7baf"
account_id = "77916efb-b856-46ee-9680-71be0fe18a42"
token = "38fb624832c1949708c7600abaf1e863d27663b3"
gmapi.set_token(token)
trade/trade_manager.py
@@ -346,9 +346,8 @@
# @tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
    try:
        if constant.JUEJIN_TRADE_ENABLE:
            # 每笔5000元
            count = (5000 // int(round(float(price) * 100))) * 100
        if constant.API_TRADE_ENABLE:
            count = (constant.BUY_MONEY_PER_CODE // int(round(float(price) * 100))) * 100
            trade_juejin.order_volume(code, price, count)
        else:
            guiTrade.buy(code, price)
@@ -394,16 +393,17 @@
        logger_trade.info("{}开始撤单".format(code))
        set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        logger_trade.info("{}撤单方法开始".format(code))
        if constant.JUEJIN_TRADE_ENABLE:
        if constant.API_TRADE_ENABLE:
            trade_juejin.cancel_order(code)
        else:
            guiTrade.cancel_buy(code)
        logger_trade.info("{}撤单方法结束".format(code))
        __cancel_success(code)
        try:
            cancel_buy_again(code)
        except Exception as e1:
            pass
        # 不需要再次撤单了
        # try:
        #     cancel_buy_again(code)
        # except Exception as e1:
        #     pass
    except Exception as e:
        # 状态还原
        set_trade_state(code, trade_state)
trade/trade_server.py
New file
@@ -0,0 +1,184 @@
import hashlib
import json
import random
import socket
import socketserver
import threading
import time
import trade_server_processor
class ClientSocketManager:
    # 客户端类型
    CLIENT_TYPE_TRADE = "trade"
    CLIENT_TYPE_DELEGATE_LIST = "delegate_list"
    CLIENT_TYPE_DEAL_LIST = "deal_list"
    CLIENT_TYPE_POSITION_LIST = "position_list"
    CLIENT_TYPE_MONEY = "money"
    CLIENT_TYPE_DEAL = "deal"
    CLIENT_TYPE_CMD_L2 = "l2_cmd"
    socket_client_dict = {}
    socket_client_lock_dict = {}
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_TRADE:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
            cls.socket_client_dict[_type].append((rid, sk))
            cls.socket_client_lock_dict[rid] = threading.Lock()
        else:
            cls.socket_client_dict[_type] = (rid, sk)
            cls.socket_client_lock_dict[rid] = threading.Lock()
        print(cls.socket_client_dict)
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_TRADE:
            if _type in cls.socket_client_dict:
                for d in cls.socket_client_dict[_type]:
                    if d[0] in cls.socket_client_lock_dict:
                        try:
                            if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                                return d
                        except threading.TimeoutError:
                            pass
        else:
            if _type in cls.socket_client_dict:
                try:
                    d = cls.socket_client_dict[_type]
                    if d[0] in cls.socket_client_lock_dict:
                        if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                            return d
                except threading.TimeoutError:
                    pass
        return None
    @classmethod
    def release_client(cls, rid):
        if rid in cls.socket_client_lock_dict:
            # 释放锁
            cls.socket_client_lock_dict[rid].release()
    @classmethod
    def del_client(cls, rid):
        # 删除线程锁
        if rid in cls.socket_client_lock_dict:
            cls.socket_client_lock_dict.pop(rid)
        # 删除sk
        for t in cls.socket_client_dict:
            if type(cls.socket_client_dict[t]) == list:
                for d in cls.socket_client_dict[t]:
                    if d[0] == rid:
                        cls.socket_client_dict[t].remove(d)
                        break
            elif type(cls.socket_client_dict[t]) == tuple:
                if cls.socket_client_dict[t][0] == rid:
                    cls.socket_client_dict.pop(t)
                    break
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    __inited = False
    def setup(self):
        self.__init()
    @classmethod
    def __init(cls):
        if cls.__inited:
            return True
        cls.__inited = True
        cls.__req_socket_dict = {}
    def __is_sign_right(self, data_json):
        list_str = []
        sign = data_json["sign"]
        data_json.pop("sign")
        for k in data_json:
            list_str.append(f"{k}={data_json[k]}")
        list_str.sort()
        __str = "&".join(list_str) + "JiaBei@!*."
        md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest()
        if md5 != sign:
            raise Exception("签名出错")
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
            try:
                data = sk.recv(1024 * 100)
                if data:
                    data_str = str(data, encoding="utf-8")
                    print("收到数据------", data_str)
                    data_json = json.loads(data_str)
                    if data_json["type"] == 'register':
                        client_type = data_json["data"]["client_type"]
                        rid = data_json["rid"]
                        ClientSocketManager.add_client(client_type, rid, sk)
                        sk.send(json.dumps({"type": "register"}).encode(encoding='utf-8'))
                        sk.recv(1024 * 100)
                        break
                    else:
                        result = trade_server_processor.process(data_json["data"])
                        sk.send(json.dumps({"code": 0}).encode(encoding='utf-8'))
                # sk.close()
            except:
                pass
    def finish(self):
        super().finish()
def run():
    laddr = "", 10008
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    # tcpserver.handle_request()  # 只接受一个客户端连接
    tcpserver.serve_forever()
def test1():
    r = (ClientSocketManager.acquire_client(ClientSocketManager.CLIENT_TYPE_TRADE))
    print("test1", r)
    time.sleep(random.randint(0, 3))
    if r:
        ClientSocketManager.release_client(r[0])
def test2():
    time.sleep(random.randint(0, 3))
    print("test2", ClientSocketManager.acquire_client(ClientSocketManager.CLIENT_TYPE_TRADE))
if __name__ == "__main__":
    run()
    # ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "1", None)
    # ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "2", None)
    # ClientSocketManager.add_client(ClientSocketManager.CLIENT_TYPE_TRADE, "3", None)
    #
    # for i in range(0, 3):
    #     t1 = threading.Thread(target=lambda: test1())
    #     t1.setDaemon(True)
    #     t1.start()
    #
    # for i in range(0, 3):
    #     t1 = threading.Thread(target=lambda: test2())
    #     t1.setDaemon(True)
    #     t1.start()
    #
    input()
trade/trade_server_processor.py
New file
@@ -0,0 +1,16 @@
"""
服务端处理器
"""
def process(data):
    ctype = data["type"]
    if ctype == 100:
        # L2逐笔委托数据
        pass
    elif ctype == 101:
        # L2逐笔成交数据
        pass
    elif ctype == 102:
        # 买卖10档
        pass