Administrator
2024-04-26 5574d5002c50cbe9199d9192a2db37af7811179b
开盘啦任务修复/bug修复
4个文件已修改
115 ■■■■■ 已修改文件
main.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 107 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py
@@ -154,7 +154,7 @@
        # 主进程
        createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r,
                          queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w,
                          queue_l1_trade_w_strategy_r,trade_ipc_addr)
                          queue_l1_trade_w_strategy_r,(order_ipc_addr, cancel_order_ipc_addr))
        # 将tradeServer作为主进程
        l1Process.join()
third_data/data_server.py
@@ -850,7 +850,7 @@
def run(addr, port):
    # 运行看盘消息采集
    # kp_client_msg_manager.run_capture()
    kpl_data_manager.run_pull_task()
    kpl_data_manager.PullTask.run_pull_task()
    handler = DataServer
    # httpd = socketserver.TCPServer((addr, port), handler)
third_data/kpl_data_manager.py
@@ -95,7 +95,8 @@
        if records:
            cls.latest_origin_datas = records
            cls.__LimitUpCodesPlateKeyManager.set_today_limit_up([(r[0], r[5], r[6].split('、') if r[6] else []) for r in records])
            cls.__LimitUpCodesPlateKeyManager.set_today_limit_up(
                [(r[0], r[5], r[6].split('、') if r[6] else []) for r in records])
        code_reasons_dict = {}
        reason_codes_dict = {}
@@ -159,7 +160,8 @@
    @classmethod
    def load_total_datas(cls):
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
        cls.__LimitUpCodesPlateKeyManager.set_today_total_limit_up([(r[3], r[2], r[6].split("、") if r[6] else []) for r in cls.total_datas])
        cls.__LimitUpCodesPlateKeyManager.set_today_total_limit_up(
            [(r[3], r[2], r[6].split("、") if r[6] else []) for r in cls.total_datas])
        for d in cls.total_datas:
            cls.__load_hist_and_blocks(d[3])
@@ -406,23 +408,40 @@
    return __latest_current_limit_up_records.get(day)
# 运行拉取任务
def run_pull_task():
    def __upload_data(type, datas):
class PullTask:
    # 最近更新时间
    __latest_update_time_dict = {}
    @classmethod
    def __upload_data(cls, type, datas):
        root_data = {
            "type": type,
            "data": datas
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
    def get_limit_up():
    @classmethod
    def repaire_pull_task(cls):
        """
        修复拉取任务
        @return:
        """
        # 修复涨停
        key = "limit_up"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:涨停列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
    @classmethod
    def run_limit_up_task(cls):
        while True:
            try:
                if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")):
                    results = kpl_api.getLimitUpInfoNew()
                    result = json.loads(results)
                    start_time = time.time()
                    __upload_data("limit_up", result)
                    cls.__upload_data("limit_up", result)
            except Exception as e:
                try:
                    logging.exception(e)
@@ -432,46 +451,50 @@
            except:
                pass
            finally:
                cls.__latest_update_time_dict["limit_up"] = time.time()
                time.sleep(3)
    def get_bidding_money():
        # 竞价数据上传
        while True:
            if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
                try:
                    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
                    result = json.loads(results)
                    __upload_data("biddings", result)
                except Exception as e:
                    pass
            time.sleep(3)
    @classmethod
    # 运行拉取任务
    def run_pull_task(cls):
        def get_bidding_money():
            # 竞价数据上传
            while True:
                if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
                    try:
                        results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
                        result = json.loads(results)
                        cls.__upload_data("biddings", result)
                    except Exception as e:
                        pass
                time.sleep(3)
    def get_market_industry():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketIndustryRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("industry_rank", result)
                except:
                    pass
            time.sleep(3)
        def get_market_industry():
            while True:
                if tool.is_trade_time():
                    try:
                        results = kpl_api.getMarketIndustryRealRankingInfo()
                        result = json.loads(results)
                        cls.__upload_data("industry_rank", result)
                    except:
                        pass
                time.sleep(3)
    def get_market_jingxuan():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketJingXuanRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("jingxuan_rank", result)
                except:
                    pass
            time.sleep(3)
        def get_market_jingxuan():
            while True:
                if tool.is_trade_time():
                    try:
                        results = kpl_api.getMarketJingXuanRealRankingInfo()
                        result = json.loads(results)
                        cls.__upload_data("jingxuan_rank", result)
                    except:
                        pass
                time.sleep(3)
    threading.Thread(target=get_limit_up, daemon=True).start()
    # threading.Thread(target=get_bidding_money, daemon=True).start()
    # threading.Thread(target=get_market_industry, daemon=True).start()
    # threading.Thread(target=get_market_jingxuan, daemon=True).start()
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        # threading.Thread(target=get_market_jingxuan, daemon=True).start()
if __name__ == "__main__":
trade/huaxin/huaxin_trade_server.py
@@ -1649,6 +1649,10 @@
                self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id)
            elif ctype == "get_per_code_buy_money":
                self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id)
            elif ctype == "repaire_task":
                # 修复任务
                kpl_data_manager.PullTask.repaire_pull_task()
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
        except Exception as e:
            logging.exception(e)
            self.send_response({"code": 1, "msg": f"数据处理出错:{e}"}, client_id, request_id)