admin
2023-11-02 7093bbad9379116c432e4da278a40cc86303c76f
bug修复
7个文件已修改
288 ■■■■■ 已修改文件
constant.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py 182 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hosting_api_util.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/huaxin_trade_record_manager.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -1,6 +1,6 @@
import platform
TEST = False
TEST = True
# 买入分数分档
BUY_SCORE_RANK_0 = 150
data_server.py
@@ -16,13 +16,12 @@
from third_data.kpl_util import KPLPlatManager, KPLDataType
from trade import trade_manager
from trade.l2_trade_util import BlackListCodeManager
from utils import tool, global_util, kp_client_msg_manager
from utils import tool, global_util, kp_client_msg_manager, hosting_api_util
from utils.history_k_data_util import HistoryKDatasUtils
import urllib.parse as urlparse
from urllib.parse import parse_qs
class DataServer(BaseHTTPRequestHandler):
@@ -39,7 +38,6 @@
    # 精选,行业数据缓存
    __jingxuan_cache_dict = {}
    __industry_cache_dict = {}
    def __get_limit_up_list(self):
        # 统计目前为止的代码涨停数量(分涨停原因)
@@ -261,185 +259,17 @@
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        response_data = ""
        if url.path == "/get_kpl_data":
            best_feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.BEST_FENG_KOU)
            if not best_feng_kou:
                best_feng_kou = []
            best_feng_kou = best_feng_kou[:22]
            feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_KOU)
            if not feng_kou:
                feng_kou = []
            feng_kou = feng_kou[:22]
            industry_rank = self.__kplDataManager.get_data(kpl_util.KPLDataType.INDUSTRY_RANK)
            if not industry_rank:
                industry_rank = []
            industry_rank = industry_rank[:22]
            feng_xiang = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_XIANG)
            if not feng_xiang:
                feng_xiang = []
            feng_xiang = feng_xiang[:22]
            response_data = json.dumps({"code": 0, "data": {"best_feng_kou": best_feng_kou, "feng_kou": feng_kou,
                                                            "industry_rank": industry_rank, "feng_xiang": feng_xiang}})
        elif url.path == "/get_score_info":
            start_time = time.time()
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict['code']
            name = ps_dict.get('name')
            data = code_info_output.get_output_params(code, self.__jingxuan_cache_dict, self.__industry_cache_dict)
            if data["code_name"].find("None") > -1 and name:
                data["code_name"] = f"{name} {code}"
            self.__history_plates_dict[code] = (time.time(), data["kpl_code_info"]["code_records"])
            if "plate" in data["kpl_code_info"]:
                self.__blocks_dict[code] = (time.time(), data["kpl_code_info"]["plate"])
            response_data = json.dumps({"code": 0, "data": data})
            print("get_score_info 耗时:", time.time() - start_time)
            # 获取评分信息
            pass
        elif url.path == "/kpl/get_limit_up_list":
        if url.path == "/kpl/get_limit_up_list":
            response_data = self.__get_limit_up_list()
        elif url.path == "/kpl/get_plate_info":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            response_data = self.__get_plate_info(ps_dict)
        elif url.path == "/kpl/get_market_data":
            # 获取板块信息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            type_ = int(ps_dict['type'])
            result = []
            if type_ == 0:
                # 行业,主力净额倒序
                result = kpl_api.getMarketIndustryRealRankingInfo(True)
                result = kpl_util.parseMarketIndustry(result)
            elif type_ == 1:
                # 行业,主力净额顺序
                result = kpl_api.getMarketIndustryRealRankingInfo(False)
                result = kpl_util.parseMarketIndustry(result)
            elif type_ == 2:
                # 精选,主力净额倒序
                result = kpl_api.getMarketJingXuanRealRankingInfo(True)
                result = kpl_util.parseMarketJingXuan(result)
            elif type_ == 3:
                # 精选,主力净额顺序
                result = kpl_api.getMarketJingXuanRealRankingInfo(False)
                result = kpl_util.parseMarketJingXuan(result)
            forbidden_plates = self.__KPLPlateForbiddenManager.list_all()
            fresult = []
            for d in result:
                if type_ == 2 or type_ == 3:
                    self.__jingxuan_cache_dict[d[1]] = d
                elif type_ == 0 or type_ == 1:
                    self.__industry_cache_dict[d[1]] = d
                d = list(d)
                d.append(1 if d[1] in forbidden_plates else 0)
                fresult.append(d)
            response_data = json.dumps({"code": 0, "data": fresult})
        elif url.path == "/kpl/add_ignore_code":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict['code']
            type_ = ps_dict['type']
            self.__IgnoreCodeManager.ignore_code(type_, code)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/forbidden_plate":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 加入禁止
            self.__KPLPlateForbiddenManager.save_plate(plate)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/get_plate_codes":
            # 获取涨停原因下面的代码
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 获取板块下的代码
            # 统计目前为止的代码涨停数量(分涨停原因)
            now_limit_up_codes_info = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP)
            now_limit_up_codes = set([d[0] for d in now_limit_up_codes_info])
            # 获取历史涨停
            record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas
            if not record_limit_up_datas:
                KPLLimitUpDataRecordManager.load_total_datas()
                record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas
            codes_info = []
            for d in record_limit_up_datas:
                if d[2] != plate:
                    continue
                # 代码,名称,涨停时间,是否炸板,是否想买,是否已经下过单
                codes_info.append(
                    [d[3], d[4], tool.to_time_str(int(d[5])), 1 if d[3] not in now_limit_up_codes else 0, 0, 0])
            codes_info.sort(key=lambda x: x[2])
            # 查询是否为想买单
            want_codes = gpcode_manager.WantBuyCodesManager().list_code()
            for code_info in codes_info:
                code_info[4] = 1 if code_info[0] in want_codes else 0
                # 获取代码状态
                if trade_manager.CodesTradeStateManager().get_trade_state(code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE:
                    code_info[5] = 1
            response_data = json.dumps({"code": 0, "data": codes_info})
        elif url.path == "/get_h_cancel_data":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict["code"]
            if code:
                trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # TODO 获取H撤数据
                    response_data = json.dumps({"code": 1, "msg": "无H撤数据"})
                else:
                    response_data = json.dumps({"code": 1, "msg": "无H撤数据"})
            else:
                response_data = json.dumps({"code": 1, "msg": "请上传code"})
        elif url.path == "/get_last_trade_day_reasons":
            # 获取上个交易日的相同涨停原因的代码信息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict["code"]
            day = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
            # 获取涨停数据
            # 获取代码的原因
            reasons = kpl_data_manager.KPLLimitUpDataRecordManager.list_by_code(code, day)
            if reasons:
                reasons = list(reasons)
                reasons.sort(key=lambda x: x[9])
                reason = reasons[-1][2]
                datas = self.__kplDataManager.get_from_file(kpl_util.KPLDataType.LIMIT_UP, day)
                # (代码,名称,首次涨停时间,最近涨停时间,几板,涨停原因,板块,实际流通,主力净额,涨停原因代码,涨停原因代码数量)
                result_list = []
                if datas:
                    for d in datas:
                        if d[5] == reason and d[0] != code:
                            # (代码,名称)
                            result_list.append((d[0], d[1]))
                response_data = json.dumps({"code": 0, "data": {"reason": reason, "data": result_list}})
            else:
                response_data = json.dumps({"code": 1, "msg": "昨日未涨停"})
        elif url.path == "/pull_kp_client_msg":
            # 拉取客户端消息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            client = ps_dict["client"]
            msg = kp_client_msg_manager.read_msg(client)
            if msg:
                response_data = json.dumps({"code": 0, "data": msg})
            else:
                response_data = json.dumps({"code": 1, "msg": "暂无消息"})
        elif url.path == "/list_kp_client_msg":
            msg_list = kp_client_msg_manager.list_msg_from_local()
            msg_list.reverse()
            msg_list = [f"{msg.split('|')[0]}{msg.split('|')[-1].split('-')[1].strip()}" for msg in msg_list]
            response_data = json.dumps({"code": 0, "data": msg_list})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
        else:
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            result = hosting_api_util.get_from_data_server(url.path, ps_dict)
            self.__send_response(result)
    def do_POST(self):
        path = self.path
middle_api_server.py
@@ -12,6 +12,7 @@
from db.redis_manager import RedisUtils
from utils import socket_util, hosting_api_util, huaxin_trade_record_manager, huaxin_util, tool, global_data_cache_util
from utils.history_k_data_util import HistoryKDatasUtils, JueJinApi
from utils.huaxin_trade_record_manager import PositionManager
class MyTCPServer(socketserver.TCPServer):
@@ -109,7 +110,8 @@
                        orderSysID = codes_data.get("orderSysID")
                        accountId = codes_data.get("accountId")
                        if code:
                            result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code,accountId,
                            result = hosting_api_util.trade_cancel_order(hosting_api_util.TRADE_DIRECTION_BUY, code,
                                                                         accountId,
                                                                         orderSysID, True)
                            print("---撤单结果----")
                            print(result)
@@ -147,6 +149,16 @@
                            raise Exception(result["msg"])
                        print("---卖出结果----")
                        print(result)
                        break
                    elif type_ == 'get_cost_price':
                        # 获取成本价
                        codes_data = data_json["data"]
                        code = codes_data["code"]
                        try:
                            price = PositionManager.get_cost_price(code)
                            return_str = json.dumps({"code": 0, "data": {"price": price}})
                        except Exception as e:
                            return_str = json.dumps({"code": 1, "msg": str(e)})
                        break
                    elif type_ == 'delegate_list':
                        # 委托列表
@@ -255,10 +267,17 @@
                                {"code": 0, "data": times, "msg": ""})
                        finally:
                            redis.close()
                    elif type_ == 'get_code_trade_info':
                        # 获取环境信息
                        code = data_json["data"]["code"]
                        result = hosting_api_util.get_code_trade_info(code)
                        return_str = json.dumps(result)
                    elif type_ == 'get_l2_listen_active_count':
                        result = hosting_api_util.get_l2_listen_active_count()
                        return_str = json.dumps(result)
                    elif type_ == "trade_server_channels":
                        channels = socket_manager.ClientSocketManager.list_client()
                        return_str = json.dumps({"code": 0, "data": channels})
                break
                # sk.close()
            except Exception as e:
middle_server.py
@@ -142,12 +142,13 @@
                        try:
                            data = data_json["data"]
                            ctype = data["ctype"]
                            data = data["data"]
                            result_str = ''
                            if ctype == "queue_size":
                                # TODO 设置队列大小
                                result_str = json.dumps({"code": 0})
                            elif ctype == "cmd":
                                data = data["data"]
                                db = data["db"]
                                cmd = data["cmd"]
                                key = data["key"]
@@ -167,6 +168,30 @@
                                if type(result) == set:
                                    result = list(result)
                                result_str = json.dumps({"code": 0, "data": result})
                            elif ctype == "cmds":
                                datas = data["data"]
                                result_list=[]
                                for d in datas:
                                    db = d["db"]
                                    cmd = d["cmd"]
                                    key = d["key"]
                                    args = d.get("args")
                                    redis = RedisManager(db).getRedis()
                                    method = getattr(RedisUtils, cmd)
                                    args_ = [redis, key]
                                    if args is not None:
                                        if type(args) == tuple or type(args) == list:
                                            args = list(args)
                                            for a in args:
                                                args_.append(a)
                                        else:
                                            args_.append(args)
                                    args_ = tuple(args_)
                                    result = method(*args_)
                                    if type(result) == set:
                                        result = list(result)
                                    result_list.append(result)
                                result_str = json.dumps({"code": 0, "data": result_list})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logger_debug.exception(e)
third_data/kpl_data_manager.py
@@ -373,5 +373,10 @@
if __name__ == "__main__":
    run_pull_task()
    input()
    # run_pull_task()
    # input()
    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_UP)
    results = json.loads(results)
    results = results["list"]
    for result in results:
        print(result)
utils/hosting_api_util.py
@@ -36,7 +36,9 @@
API_TYPE_GET_ENV = "get_env"  # 获取环境信息
API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes"  # 同步L1需要订阅的代码
API_TYPE_SYSTEM_LOG = "system_log"  # 系统日志
API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server"  # 从数据服务器拉取数据
API_TYPE_CODE_TRADE_INFO = "code_trade_info"  # 代码交易信息
API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count"  # L2有效监听数量
# 超时时间2s
TIMEOUT = 5.0
@@ -143,7 +145,7 @@
                                    "code": code,
                                    "accountID": accountID,
                                    "orderSysID": orderSysID,
                                    "sinfo": f"cancel_order_{code}_{round(time.time() * 1000)}"})
                                    "sinfo": f"cb_{code}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking)
@@ -269,6 +271,30 @@
    return __read_response(client, request_id, blocking)
# 拉取data_server的内容
def get_from_data_server(path, params, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                                   {"type": API_TYPE_GET_FROM_DATA_SERVER, "path": path, "params": params,
                                    "sinfo": f"cb_{API_TYPE_GET_FROM_DATA_SERVER}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
# 获取代码的交易信息
def get_code_trade_info(code, blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                                   {"type": API_TYPE_CODE_TRADE_INFO, "code": code,
                                    "sinfo": f"cb_{API_TYPE_CODE_TRADE_INFO}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
# L2有效监听数量
def get_l2_listen_active_count(blocking=True):
    request_id, client = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                                   {"type": API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT,
                                    "sinfo": f"cb_{API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT}_{round(time.time() * 1000)}"})
    return __read_response(client, request_id, blocking, timeout=30)
if __name__ == "__main__":
    d = {"id": "123123"}
    print(d.pop("id"))
utils/huaxin_trade_record_manager.py
@@ -10,7 +10,6 @@
from utils import tool
from db import mysql_data, redis_manager
# 委托列表
from utils.history_k_data_util import HistoryKDatasUtils
@@ -230,7 +229,19 @@
    @classmethod
    def get_volume_by_code(cls, code):
        mysqldb = mysql_data.Mysqldb()
        mysqldb.select_one(f"select currentPosition from hx_trade_position where securityID='{code}'")
        return mysqldb.select_one(f"select currentPosition from hx_trade_position where securityID='{code}'")
    @classmethod
    def get_cost_price(cls, code, day=tool.get_now_date_str("%Y%m%d")):
        # 获取成本价
        mysqldb = mysql_data.Mysqldb()
        result = mysqldb.select_one(
            f"select totalPosCost,currentPosition from hx_trade_position where securityID='{code}' and tradingDay='{day}'")
        if not result:
            raise Exception("尚未持仓")
        if result[1] == 0:
            raise Exception("已经清仓")
        return round(float(result[0]) / result[1], 2)
# 成交记录