admin
2025-01-16 9d2b0ab3967761b2d4b4e1c92034c3c0ea6705f5
添加低吸中间服务器
7个文件已修改
3个文件已添加
831 ■■■■■ 已修改文件
code_attribute/code_price_manager.py 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
low_suction_proxy_server.py 332 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 99 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_l1_data_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/block_web_api.py 217 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/history_k_data_util.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hosting_api_util.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_price_manager.py
@@ -2,9 +2,10 @@
代码价格管理
"""
import json
import time
from db.redis_manager import RedisUtils
from utils import tool
from utils import tool, history_k_data_util
from db import redis_manager as redis_manager
@@ -152,5 +153,58 @@
            self.__save_buy1_price_info(code, time_str, None)
class CodesLimitRateManager:
    __pre_close_dict = {}
    __current_price_dict = {}
    """
    涨幅管理
    """
    @classmethod
    def __load_pre_close_prices(cls, codes):
        """
        获取昨天的收盘价格
        :param codes:
        :return:
        """
        fcodes = set()
        for code in codes:
            if code in cls.__pre_close_dict:
                continue
            fcodes.add(code)
        if not fcodes:
            return
        results = history_k_data_util.HistoryKDatasUtils.get_gp_latest_info(list(fcodes), fields="sec_id, pre_close")
        for result in results:
            cls.__pre_close_dict[result["sec_id"]] = round(result["pre_close"], 2)
    @classmethod
    def get_price_rates(cls, codes):
        """
        获取代码的价格涨幅
        :param codes:
        :return:
        """
        cls.__load_pre_close_prices(codes)
        # 获取现价
        now_time = time.time()
        price_codes = set()
        for code in codes:
            if code not in cls.__current_price_dict or now_time - cls.__current_price_dict[code][1] > 5:
                price_codes.add(code)
        if price_codes:
            results = history_k_data_util.HistoryKDatasUtils.get_now_price(price_codes)
            for r in results:
                cls.__current_price_dict[r[0]] = (r[1], time.time())
        rate_dict = {}
        for code in codes:
            pre_close = cls.__pre_close_dict.get(code)
            now_price_info = cls.__current_price_dict.get(code)
            if pre_close and now_price_info:
                rate = round((now_price_info[0] - pre_close) * 100 / pre_close, 2)
                rate_dict[code] = rate
        return rate_dict
if __name__ == "__main__":
    print(Buy1PriceManager().get_limit_up_info("002777"))
low_suction_proxy_server.py
New file
@@ -0,0 +1,332 @@
"""
低吸代理服务器
"""
import http
import json
import logging
import socket
import socketserver
import threading
import time
from http.server import BaseHTTPRequestHandler
from urllib.parse import parse_qs
# 禁用http.server的日志输出
import requests
from utils import socket_util, tool
logger = logging.getLogger("http.server")
logger.setLevel(logging.CRITICAL)
# 183.234.94.164/125.93.72.196
REAL_HOST, REAL_PORT = "183.234.94.164", 12881
class DataServer(BaseHTTPRequestHandler):
    # 禁用日志输出
    def log_message(self, format, *args):
        pass
    def __get_params(self, url):
        ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
        return ps_dict
    def do_GET(self):
        path = self.path
        whole_path = f"http://{REAL_HOST}:{REAL_PORT}{path}"
        response = requests.get(whole_path)
        self.send_response(response.status_code)
        if response.status_code == 200:
            # 发给请求客户端的响应数据
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(response.text.encode())
        else:
            self.wfile.write(response.text.encode())
    def do_POST(self):
        path = self.path
        whole_path = f"http://{REAL_HOST}:{REAL_PORT}{path}"
        params = self.__parse_request()
        response = requests.post(whole_path, json=params)
        self.__send_response(response.text)
    def __send_response(self, data):
        # 发给请求客户端的响应数据
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(data.encode())
    def __parse_request(self):
        params = {}
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        # print(_str)
        try:
            params = json.loads(_str)
        except:
            pass
        return params
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
class ClientSocketManager:
    # 客户端类型
    CLIENT_TYPE_COMMON = "common"
    socket_client_dict = {}
    socket_client_lock_dict = {}
    active_client_dict = {}
    @classmethod
    def add_client(cls, _type, rid, sk):
        if _type == cls.CLIENT_TYPE_COMMON:
            # 交易列表
            if _type not in cls.socket_client_dict:
                cls.socket_client_dict[_type] = []
            cls.socket_client_dict[_type].append((rid, sk))
            cls.socket_client_lock_dict[rid] = threading.Lock()
        else:
            cls.socket_client_dict[_type] = (rid, sk)
            cls.socket_client_lock_dict[rid] = threading.Lock()
    @classmethod
    def acquire_client(cls, _type):
        if _type == cls.CLIENT_TYPE_COMMON:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
                                                                                                                          0] in cls.active_client_dict else 0,
                                     reverse=True)
                for d in client_list:
                    if d[0] in cls.socket_client_lock_dict:
                        try:
                            if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                                return d
                        except threading.TimeoutError:
                            pass
        else:
            if _type in cls.socket_client_dict:
                try:
                    d = cls.socket_client_dict[_type]
                    if d[0] in cls.socket_client_lock_dict:
                        if cls.socket_client_lock_dict[d[0]].acquire(blocking=False):
                            return d
                except threading.TimeoutError:
                    pass
        return None
    @classmethod
    def release_client(cls, rid):
        if rid in cls.socket_client_lock_dict:
            # 释放锁
            cls.socket_client_lock_dict[rid].release()
    @classmethod
    def del_client(cls, rid):
        # 删除线程锁
        if rid in cls.socket_client_lock_dict:
            cls.socket_client_lock_dict.pop(rid)
        # 删除sk
        for t in cls.socket_client_dict:
            if type(cls.socket_client_dict[t]) == list:
                for d in cls.socket_client_dict[t]:
                    if d[0] == rid:
                        cls.socket_client_dict[t].remove(d)
                        try:
                            d[1].close()
                        except:
                            pass
                        break
            elif type(cls.socket_client_dict[t]) == tuple:
                if cls.socket_client_dict[t][0] == rid:
                    cls.socket_client_dict.pop(t)
                    try:
                        t[1].close()
                    except:
                        pass
                    break
    # 心跳信息
    @classmethod
    def heart(cls, rid):
        cls.active_client_dict[rid] = time.time()
    @classmethod
    def del_invalid_clients(cls):
        # 清除长时间无心跳的客户端通道
        for k in cls.active_client_dict.keys():
            if time.time() - cls.active_client_dict[k] > 20:
                # 心跳时间间隔20s以上视为无效
                cls.del_client(k)
    @classmethod
    def list_client(cls, type_=None):
        """
        :param type_:
        :return:[(客户端ID, 是否锁定, 活跃时间)]
        """
        _type = type_
        if not _type:
            _type = cls.CLIENT_TYPE_COMMON
        client_list = sorted(cls.socket_client_dict[_type],
                             key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0,
                             reverse=True)
        fdata = []
        for client in client_list:
            active_time = cls.active_client_dict.get(client[0])
            if active_time is None:
                active_time = 0
            active_time = tool.to_time_str(int(active_time))
            fdata.append(
                (client[0], cls.socket_client_lock_dict[client[0]].locked(), active_time))
        return fdata
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass):
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True)
# 如果使用异步的形式则需要再重写ThreadingTCPServer
class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    __inited = False
    def setup(self):
        self.__init()
    @classmethod
    def __init(cls):
        if cls.__inited:
            return True
        cls.__inited = True
        cls.__req_socket_dict = {}
    @classmethod
    def getRecvData(cls, skk):
        data = ""
        header_size = 10
        buf = skk.recv(header_size)
        header_str = buf
        if buf:
            start_time = time.time()
            buf = buf.decode('utf-8')
            if buf.startswith("##"):
                content_length = int(buf[2:10])
                received_size = 0
                while not received_size == content_length:
                    r_data = skk.recv(10240)
                    received_size += len(r_data)
                    data += r_data.decode('utf-8')
            else:
                data = skk.recv(1024 * 1024)
                data = buf + data.decode('utf-8')
        return data, header_str
    def handle(self):
        host = self.client_address[0]
        super().handle()
        sk: socket.socket = self.request
        while True:
            try:
                data, header = self.getRecvData(sk)
                if data:
                    data_str = data
                    # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}")
                    data_json = None
                    try:
                        data_json = json.loads(data_str)
                    except json.decoder.JSONDecodeError as e:
                        # JSON解析失败
                        sk.sendall(socket_util.load_header(json.dumps(
                            {"code": 100, "msg": f"JSON解析失败"}).encode(
                            encoding='utf-8')))
                        continue
                    type_ = data_json["type"]
                    __start_time = time.time()
                    try:
                        if type_ == 'register':
                            client_type = ClientSocketManager.CLIENT_TYPE_COMMON
                            rid = data_json["rid"]
                            ClientSocketManager.add_client(client_type, rid, sk)
                            sk.sendall(json.dumps({"type": "register"}).encode(encoding='utf-8'))
                            try:
                                # print("客户端", ClientSocketManager.socket_client_dict)
                                while True:
                                    result, header = self.getRecvData(sk)
                                    try:
                                        resultJSON = json.loads(result)
                                        if resultJSON["type"] == 'heart':
                                            # 记录活跃客户端
                                            ClientSocketManager.heart(resultJSON['client_id'])
                                    except json.decoder.JSONDecodeError as e:
                                        print("JSON解析出错", result, header)
                                        if not result:
                                            sk.close()
                                            break
                                    time.sleep(1)
                            except ConnectionResetError as ee:
                                ClientSocketManager.del_client(rid)
                            except Exception as e:
                                logging.exception(e)
                        elif data_json["type"] == "push_msg":
                            # 华鑫内部服务器推送过来的信息,需要推送到注册的客户端上面
                            client_info_list = ClientSocketManager.socket_client_dict.get(ClientSocketManager.CLIENT_TYPE_COMMON)
                            clients = [x[1] for x in client_info_list]
                            if client_info_list:
                                for client in client_info_list:
                                    try:
                                        client[1].sendall(
                                            socket_util.load_header(
                                                json.dumps(data_json).encode(encoding='utf-8')))
                                    except Exception as e:
                                        logging.exception(e)
                                        ClientSocketManager.del_client(client[0])
                            break
                    except Exception as e:
                        pass
                    finally:
                        sk.sendall(
                            socket_util.load_header(
                                json.dumps({"code": 0}).encode(encoding='utf-8')))
                else:
                    # 断开连接
                    break
                # sk.close()
            except Exception as e:
                # log.logger_tuoguan_request_debug.exception(e)
                logging.exception(e)
                break
    def finish(self):
        super().finish()
def run_socket_server(port):
    laddr = "0.0.0.0", port
    print("SocketServer is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
def run(addr, port):
    handler = DataServer
    try:
        httpd = ThreadedHTTPServer((addr, port), handler)
        print("HTTP server is at: http://%s:%d/" % (addr, port))
        httpd.serve_forever()
    except Exception as e:
        logging.exception(e)
if __name__ == "__main__":
    threading.Thread(target=run_socket_server, args=(10009,), daemon=True).start()
    run("", 12008)
middle_api_server.py
@@ -10,6 +10,7 @@
import constant
import socket_manager
import trade_manager
from code_attribute.code_price_manager import CodesLimitRateManager
from db import mysql_data, redis_manager
from db.redis_manager import RedisUtils
from log_module import log, request_log_util
@@ -83,19 +84,56 @@
                                raise Exception("签名错误")
                            codes_data = data_json["data"]
                            code = codes_data["code"]
                            volume = codes_data["volume"]
                            price = codes_data["price"]
                            money = codes_data.get("money")
                            volume = codes_data.get("volume")
                            price = codes_data.get("price")
                            price_type = codes_data.get("price_type")
                            try:
                                if not code:
                                    raise Exception("请上传code")
                                if not volume:
                                    raise Exception("请上传volume")
                                # if round(float(price), 2) <= 0:
                                #     prices = HistoryKDatasUtils.get_now_price([code])
                                #     if not prices:
                                #         raise Exception("现价获取失败")
                                #     price = prices[0][1]
                                if not price or round(float(price), 2) <= 0:
                                    if price_type is None:
                                        price_type = 0
                                        # 默认为笼子价
                                    pre_close = HistoryKDatasUtils.get_gp_latest_info([code], "sec_id,pre_close")[0][
                                        "pre_close"]
                                    if price_type == 0:  # 价格笼子
                                        # 获取现价
                                        prices = HistoryKDatasUtils.get_now_price([code])
                                        if not prices:
                                            raise Exception("现价获取失败")
                                        now_price = prices[0][1]
                                        limit_up_price = round(
                                            float(tool.get_limit_up_price_by_preprice(code, pre_close)),
                                            2)
                                        price = min(tool.get_buy_max_price(now_price), limit_up_price)
                                    elif price_type == 1:  # 跌停价
                                        limit_down_price = round(
                                            float(tool.get_limit_down_price_by_preprice(code, pre_close)),
                                            2)
                                        price = limit_down_price
                                    elif price_type == 2:  # 涨停价
                                        limit_up_price = round(
                                            float(tool.get_limit_up_price_by_preprice(code, pre_close)),
                                            2)
                                        price = limit_up_price
                                    elif price_type == 3:  # 现价
                                        prices = HistoryKDatasUtils.get_now_price([code])
                                        if not prices:
                                            raise Exception("现价获取失败")
                                        now_price = prices[0][1]
                                        price = now_price
                                    elif price_type == 4:  # 买5价
                                        prices = HistoryKDatasUtils.get_now_price([code])
                                        if not prices:
                                            raise Exception("现价获取失败")
                                        now_price = prices[0][1]
                                        price = now_price - 0.04
                                if not volume and money:
                                    volume = (money // int(round(float(price) * 100))) * 100
                                    if volume < 100:
                                        volume = 100
                                # 下单
                                result = hosting_api_util.trade_order(hosting_api_util.TRADE_DIRECTION_BUY, code,
                                                                      volume, price)
@@ -161,7 +199,8 @@
                        elif type_ == 'common':
                            params = data_json["data"]
                            ctype = params.get("ctype")
                            trade_sell_types = {"get_current_l1_codes", "get_positions", "get_l2_deal_price"}
                            trade_sell_types = {"get_current_l1_codes", "get_positions", "get_l2_deal_price",
                                                "buy_cb_for_commission", "sell_cb_for_commission", "get_deal_queue"}
                            if ctype in trade_sell_types:
                                result = hosting_api_util.common_request(params,
                                                                         client_type=socket_manager.ClientSocketManager.CLIENT_TYPE_TRADE_SELL)
@@ -224,12 +263,23 @@
                        elif type_ == "get_huaxin_subscript_codes":
                            # 获取华鑫订阅的代码
                            fresults = global_data_cache_util.huaxin_subscript_codes
                            fdata = []
                            try:
                                # 获取当前涨停比例
                                rate_results_dict = CodesLimitRateManager.get_price_rates(set([r[0] for r in fresults]))
                                for r in fresults:
                                    fdata.append(
                                        (r[0], r[1], rate_results_dict.get(r[0]) if r[0] in rate_results_dict else 0,
                                         r[2]))
                                fdata.sort(key=lambda r: r[2], reverse=True)
                            except:
                                fdata = fresults
                            update_time = global_data_cache_util.huaxin_subscript_codes_update_time
                            if update_time is None:
                                update_time = ''
                            return_str = json.dumps(
                                {"code": 0,
                                 "data": {"count": len(fresults), "list": fresults, "update_time": update_time},
                                 "data": {"count": len(fresults), "list": fdata, "update_time": update_time},
                                 "msg": ""})
                            pass
                        elif type_ == "get_huaxin_position_subscript_codes":
@@ -516,7 +566,7 @@
                    return_str = json.dumps({"code": 1, "msg": "不可以取消"})
            elif type == 421:
                # 加入暂不买
                # 加红
                data = json.loads(_str)
                codes = data["data"]["codes"]
                for code in codes:
@@ -524,7 +574,7 @@
                return_str = json.dumps({"code": 0})
            elif type == 422:
                # 移除暂不买
                # 移红
                data = json.loads(_str)
                codes = data["data"]["codes"]
                for code in codes:
@@ -532,10 +582,31 @@
                return_str = json.dumps({"code": 0})
            elif type == 423:
                # 暂不买列表
                # 红单列表
                result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_MUST_BUY)
                return_str = json.dumps(result)
            elif type == 441:
                # 加绿
                data = json.loads(_str)
                codes = data["data"]["codes"]
                for code in codes:
                    hosting_api_util.add_code_list(code, hosting_api_util.CODE_LIST_GREEN)
                return_str = json.dumps({"code": 0})
            elif type == 442:
                # 移绿
                data = json.loads(_str)
                codes = data["data"]["codes"]
                for code in codes:
                    hosting_api_util.remove_code_list(code, hosting_api_util.CODE_LIST_GREEN)
                return_str = json.dumps({"code": 0})
            elif type == 443:
                # 绿单列表
                result = hosting_api_util.get_code_list(hosting_api_util.CODE_LIST_GREEN)
                return_str = json.dumps(result)
            elif type == 430:
                # 查询代码属性
middle_l1_data_server.py
@@ -169,7 +169,7 @@
                f.write(f"{code}\n")
    def get_target_codes(self):
        return  self.__target_codes
        return self.__target_codes
def run(port):
@@ -181,4 +181,4 @@
if __name__ == "__main__":
    print( L1DataManager().get_target_codes())
    print(L1DataManager().get_target_codes())
middle_server.py
@@ -18,7 +18,8 @@
from log_module.log import logger_debug
from middle_l1_data_server import L1DataManager
from output import push_msg_manager
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool, \
    block_web_api
from utils.juejin_util import JueJinHttpApi
trade_data_request_queue = queue.Queue()
@@ -155,7 +156,7 @@
                            try:
                                data = data_json["data"]
                                datas = data["data"]
                                print("l2_subscript_codes", data_json)
                                # print("l2_subscript_codes", data_json)
                                global_data_cache_util.huaxin_subscript_codes = datas
                                global_data_cache_util.huaxin_subscript_codes_update_time = tool.get_now_time_str()
                            finally:
@@ -318,6 +319,47 @@
                            datas = data_json["data"]
                            L1DataManager().add_datas(datas)
                            break
                        elif data_json["type"] == 'get_l1_target_codes':
                            # 获取目标代码
                            codes = L1DataManager().get_target_codes()
                            result_str = json.dumps({"code": 0, "data": list(codes)})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            break
                        # 获取三方板块
                        elif data_json["type"] == 'get_third_blocks':
                            data = data_json["data"]
                            data = data["data"]
                            source = data["source"]
                            code = data["code"]
                            result_str = json.dumps({"code": 1, "msg": "source不匹配"})
                            if source == 2:
                                # 通达信
                                try:
                                    blocks = block_web_api.get_tdx_blocks(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                            elif source == 3:
                                # 同花顺
                                try:
                                    blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    try:
                                        block_web_api.THSBlocksApi.load_hexin_v()
                                        blocks = block_web_api.THSBlocksApi().get_ths_blocks(code)
                                        result_str = json.dumps({"code": 0, "data": list(blocks)})
                                    except Exception as e1:
                                        result_str = json.dumps({"code": 1, "msg": str(e1)})
                            elif source == 4:
                                # 东方财富
                                try:
                                    blocks = block_web_api.get_eastmoney_block(code)
                                    result_str = json.dumps({"code": 0, "data": list(blocks)})
                                except Exception as e:
                                    result_str = json.dumps({"code": 1, "msg": str(e)})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                            break
                    except Exception as e:
                        log.logger_tuoguan_request_debug.exception(e)
                    finally:
test.py
New file
@@ -0,0 +1,17 @@
import json
import socket
import requests
from code_attribute.code_price_manager import CodesLimitRateManager
from utils import tool, socket_util
from utils.block_web_api import THSBlocksApi
from utils.history_k_data_util import HistoryKDatasUtils
if __name__ == "__main__":
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 生成socket,连接server
    # client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
    # client.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 60 * 1000, 30 * 1000))
    client.connect(("192.168.3.122", 10009))
    client.sendall(socket_util.load_header(
        json.dumps({"type": "push_msg", "data": {"type": "update_position"}}).encode("utf-8")))
utils/block_web_api.py
New file
@@ -0,0 +1,217 @@
"""
代码的板块网页接口
"""
import json
import logging
import urllib
import requests
from utils import tool
__tdx_session_data = {}
def get_tdx_blocks(code):
    """
    通达信板块:https://wenda.tdx.com.cn/site/wenda/stock_index.html?message=%E4%B8%9C%E6%96%B9%E9%80%9A%E4%BF%A1%E6%89%80%E5%B1%9E%E6%A6%82%E5%BF%B5
    @param code:
    @return:
    """
    if not __tdx_session_data.get("session"):
        __tdx_session_data["session"] = requests.Session()
        __tdx_session_data["session"].get(
            "https://wenda.tdx.com.cn/TOUCH?Device=Browser&Ip=0.0.0.0&Mac=00-00-00-00-00-00-00-00&Build=WEB&Type=41&Ver=1.0.0&EP=0")
    session = __tdx_session_data.get("session")
    try:
        headers = {
            "User-Agent":
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 "
                "Safari/537.36",
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
            # "Cookie": f"LST=10; ASPSessionID="
        }
        session.headers.update(headers)
        data = [{"message": f"{code}的所属概念", "TDXID": "", "wdbk": "", "RANG": "AG", "forward": "1"}]
        response = session.post("https://wenda.tdx.com.cn/TQL?Entry=NLPSE.StockSelect&RI=", data=json.dumps(data))
        response_headers = response.headers
        if response.status_code == 200:
            text = response.text
            result = json.loads(text)
            # print(result)
            data = [
                {"nlpse_id": result[3][0], "POS": 0, "COUNT": 30, "order_field": "", "dynamic_order": "",
                 "order_flag": "",
                 "timestamps": 0, "op_flag": 1, "screen_type": 1, "RANG": "AG", "forward": "1"}]
            response = session.post(f"https://wenda.tdx.com.cn/TQL?Entry=NLPSE.NLPQuery&RI={result[0][3]}",
                                    data=json.dumps(data))
            response.encoding = 'utf-8'
            if response.status_code == 200:
                text = response.text
                print(text)
                result = json.loads(text)
                fset = set()
                if len(result) > 3 and len(result[3]) > 8:
                    fset |= set([x[2:-2] for x in result[3][8].split(";")])
                data = [{"sec_code": code, "RANG": "AG"}]
                response = session.post(f"https://wenda.tdx.com.cn/TQL?Entry=NLPSE.SSTheme&RI=",
                                        data=json.dumps(data))
                response.encoding = 'utf-8'
                if response.status_code == 200:
                    text = response.text
                    arr = json.loads(text)
                    if len(arr) > 3:
                        for i in range(3, len(arr)):
                            fset.add(arr[i][2][1:-1])
                return fset
            else:
                raise Exception(response.text)
        else:
            raise Exception(response.text)
    finally:
        pass
from playwright.sync_api import sync_playwright
class THSBlocksApi:
    __ths_session_data = {}
    __ths_hexin_v = "AwY2VNcinP6Nq0ia5LnmR52uV_eIZ0ohHKt-hfAv8ikE86itWPeaMew7zofD"
    def get_ths_blocks(self, code):
        """
        同花顺板块
        @param code:
        @return:
        """
        if not self.__ths_session_data.get("session"):
            self.__ths_session_data["session"] = requests.Session()
        self.__ths_session_data["session"].headers.update({
            "content-type": "application/json",
            "sec-ch-ua":
                "\"Chromium\";v=\"128\", \"Not;A=Brand\";v=\"24\", \"Google Chrome\";v=\"128\"",
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": "\"Windows\"",
            "sec-fetch-dest":
                "document",
            "sec-fetch-mode":
                "navigate",
            "sec-fetch-site":
                "none",
            "sec-fetch-user":
                "?1",
            "upgrade-insecure-requests": "1",
            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/128.0.0.0 Safari/537.36 ",
            "cookie": "other_uid=Ths_iwencai_Xuangu_lqs6bw1f7wshz8rx2c6zrag86ie1r401; ta_random_userid=vtmc00tyg9; "
                      "cid=f35cdfcfd72e8bb1c8e6dbd66fd5df9d1726938491; v=" + self.__ths_hexin_v + "; "
                                                                                                  "cid"
                                                                                                  "=f35cdfcfd72e8bb1c8e6dbd66fd5df9d1726938491; ComputerID=f35cdfcfd72e8bb1c8e6dbd66fd5df9d1726938491; WafStatus=0; PHPSESSID=5766f6a83a4ade56ae767161ee9c2990 "
        })
        session = self.__ths_session_data["session"]
        params = {"source": "Ths_iwencai_Xuangu", "version": "2.0", "query_area": "", "block_list": "",
                  "add_info": "{\"urp"
                              "\":{"
                              "\"scene\":1,\"company\":1,\"business\":1},\"contentType\":\"json\",\"searchInfo\":true}",
                  "question": f"{code}所属板块", "perpage": 50, "page": 1, "secondary_intent": "stock",
                  "log_info": "{\"input_type\":\"typewrite\"}", "rsh": "null"}
        url = "https://www.iwencai.com/customized/chart/get-robot-data"
        print(url)
        response = session.post(url, data=json.dumps(params))
        text = response.text
        index = text.find("\"所属概念\":")
        if index >= 0:
            start = text.rfind("{", 0, index)
            end = text.find("}", index)
            text = text[start:end + 1]
            result = json.loads(text)
            return set(result["所属概念"].split(";"))
        raise Exception("没有获取到内容:" + text)
    @classmethod
    def load_hexin_v(cls):
        with sync_playwright() as p:
            # 启动浏览器
            browser = p.chromium.launch(headless=True, args=['--disable-blink-features=AutomationControlled'])
            context = browser.new_context(
                bypass_csp=True,  # 绕过内容安全策略
                accept_downloads=True,  # 接受下载
                ignore_https_errors=True,
                user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                           "Chrome/129.0.0.0 Safari/537.38",
                viewport={"width": 2000, "height": 720}
            )
            page = context.new_page()
            page.set_extra_http_headers({
                "Accept-Language": "zh-CN,zh;q=0.9",
                "Accept-Encoding": "gzip, deflate, br, zstd",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,"
                          "*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"
            })
            def intercept(route, request):
                route.continue_()
                headers = request.headers
                cookie = headers.get("cookie")
                if cookie:
                    params = {v.split("=")[0].strip(): v.split("=")[1].strip() for v in cookie.split(";")}
                    try:
                        cls.__ths_hexin_v = params.get("v")
                        print("获取到hexin_v:", cls.__ths_hexin_v)
                    except Exception as e:
                        logging.exception(e)
            page.route("**/block-detail*", intercept)
            # 打开网页
            page.goto(
                "https://www.iwencai.com/unifiedwap/result?w=%E7%BE%8E%E7%9A%84%E9%9B%86%E5%9B%A2%E6%89%80%E5%B1%9E%E6%A6%82%E5%BF%B5&querytype=stock")
            page.wait_for_timeout(4000)
            # 关闭浏览器
            browser.close()
            # input()
        pass
def get_eastmoney_block(code):
    """
    获取东方财富板块
    @param code:
    @return:
    """
    params = {
        "type": "RPT_F10_CORETHEME_BOARDTYPE",
        "sty": "BOARD_CODE,BOARD_NAME,IS_PRECISE,BOARD_RANK,BOARD_TYPE",
        "filter": f"(SECUCODE=\"{code}.{'SH' if tool.is_sh_code(code) else 'SZ'}\")",
        "p": "1",
        "ps": "",
        "sr": "1", "st": "BOARD_RANK", "source": "HSF10", "client": "PC", "v": "0734332486357663"
    }
    list = []
    for k in params:
        list.append(f"{k}={urllib.parse.quote(params[k])}")
    url = "https://datacenter.eastmoney.com/securities/api/data/get?" + "&".join(list)
    response = requests.get(url)
    if response.status_code == 200:
        text = response.text
        print(text)
        data = json.loads(text)
        if data["success"]:
            blocks = []
            for x in data["result"]["data"]:
                # if x["IS_PRECISE"] == "1":
                blocks.append(x["BOARD_NAME"])
            return blocks
    return None
if __name__ == "__main__":
   pass
utils/history_k_data_util.py
@@ -23,6 +23,7 @@
                    '+') > -1:
                return datetime.datetime.fromisoformat(val)
            return val
        fdata = None
        if True:
            url = f'{cls.__BASE_URL}{path_str}'
@@ -98,9 +99,9 @@
    def get_juejin_code_list_with_prefix(cls, codes):
        list = []
        for d in codes:
            if d[0:2] == '00':
            if tool.is_sz_code(d):
                list.append("SZSE.{}".format(d))
            elif d[0:2] == '60':
            elif tool.is_sh_code(d):
                list.append("SHSE.{}".format(d))
        return list
@@ -131,8 +132,8 @@
    @classmethod
    def get_exchanges_codes(cls, exchanges):
        return JueJinHttpApi.get_exchanges_codes(exchanges=exchanges, sec_types=[1], skip_suspended=True,
                                                     skip_st=True,
                                                     fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
                                                 skip_st=True,
                                                 fields="symbol,sec_type,sec_id,sec_name,listed_date,sec_level,is_suspended,pre_close")
    @classmethod
    def get_previous_trading_date(cls, date):
@@ -231,4 +232,4 @@
if __name__ == "__main__":
    constant.JUEJIN_LOCAL_API = False
    print(HistoryKDatasUtils.get_lowest_price_rate("000725", 30))
    print(HistoryKDatasUtils.get_gp_latest_info(["000333"], "sec_id,pre_close,adj_factor"))
utils/hosting_api_util.py
@@ -24,6 +24,7 @@
CODE_LIST_WANT = "want"
CODE_LIST_PAUSE_BUY = "pause_buy"
CODE_LIST_MUST_BUY = "must_buy"
CODE_LIST_GREEN = "green"
# 类型
API_TYPE_TRADE = "trade"  # 交易
API_TYPE_TRADE_STATE = "trade_state"  # 交易状态
utils/tool.py
@@ -3,6 +3,7 @@
"""
import ctypes
import decimal
import math
import random
import threading
import time
@@ -180,7 +181,8 @@
def time_sub_as_ms(time_str_1, time_str_2):
    time_1 = get_time_as_second(time_str_1[:8])
    time_2 = get_time_as_second(time_str_2[:8])
    return (time_1 - time_2) * 1000 + (int(time_str_1[9:]) - int(time_str_2[9:])) if len(time_str_1)>8 and len(time_str_2)>8 else 0
    return (time_1 - time_2) * 1000 + (int(time_str_1[9:]) - int(time_str_2[9:])) if len(time_str_1) > 8 and len(
        time_str_2) > 8 else 0
# 交易时间加几s
@@ -192,6 +194,14 @@
    if s >= 11 * 3600 + 30 * 60 > s_:
        s += 90 * 60
    return time_seconds_format(s)
def is_sh_code(code):
    return code.find('60') == 0 or code.find('11') == 0 or code.find('68') == 0
def is_sz_code(code):
    return code.find('00') == 0 or code.find('12') == 0 or code.find('30') == 0
def compute_buy1_real_time(time_):
@@ -254,3 +264,39 @@
    except:
        pass
    return None
def get_limit_up_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 1.1
    else:
        return 1.2
def get_limit_down_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 0.9
    else:
        return 0.8
def get_limit_up_price_by_preprice(code, price):
    if price is None:
        return None
    return to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{get_limit_up_rate(code)}"))
def get_limit_down_price_by_preprice(code, price):
    if price is None:
        return None
    return to_price(decimal.Decimal(str(price)) * decimal.Decimal(f"{get_limit_down_rate(code)}"))
# 获取买入价格笼子的最高价
def get_buy_max_price(price):
    price1 = price * (1 + 0.02)
    price1 = math.ceil(price1 * 100) / 100
    price2 = price + 0.1
    return min(price1, price2)