Administrator
2024-07-12 dd4636adb21cba5eec0b217b14b1d2bd09d0a09c
检测/修复数据服务
4个文件已修改
1个文件已添加
79 ■■■■ 已修改文件
huaxin_client/socket_util.py 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/outside_api_command_callback.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/server_util.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/socket_util.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/socket_util.py
@@ -5,6 +5,7 @@
# 添加数据头
import json
import socket
from huaxin_client import crypt
@@ -73,3 +74,23 @@
        return True
    else:
        return False
def is_port_open(host, port, timeout=1):
    try:
        # 创建一个socket对象
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 设置超时时间为1秒钟
        s.settimeout(timeout)
        # 尝试连接到指定的主机和端口
        s.connect((host, port))
        # 如果连接成功,则端口是打开的
        s.close()
        return True
    except socket.error:
        # 如果连接出现异常,则端口是关闭的
        return False
if __name__ == "__main__":
    print(is_port_open("127.0.0.1",8080))
main.py
@@ -24,7 +24,7 @@
from trade.huaxin import huaxin_trade_server, huaxin_trade_api_server
# from huaxin_api import trade_client, l2_client, l1_client
from utils import tool
from utils import tool, server_util
def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue,
@@ -50,7 +50,7 @@
    server.global_data_loader.init()
    # # 数据服务
    t1 = threading.Thread(target=createDataServer, name="createDataServer", daemon=True)
    t1 = threading.Thread(target=server_util.run_data_server, name="createDataServer", daemon=True)
    t1.start()
    #
    # 交易接口服务
@@ -81,17 +81,6 @@
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{laddr[1]} 启动失败")
def createDataServer():
    logger_system.info("create DataServer")
    logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}")
    tcpserver = data_server.run("", 9004)
    tcpserver.serve_forever()
    try:
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{9004} 启动失败")
if __name__ == '__main__1':
trade/huaxin/outside_api_command_callback.py
@@ -49,7 +49,7 @@
from trade.sell.sell_rule_manager import TradeRuleManager, SellRule
from trade.trade_manager import TradeTargetCodeModeManager, AutoCancelSellModeManager
from settings.trade_setting import MarketSituationManager
from utils import socket_util, data_export_util, tool, huaxin_util, output_util, global_util
from utils import socket_util, data_export_util, tool, huaxin_util, output_util, global_util, server_util
class OutsideApiCommandCallback(outside_api_command_manager.ActionCallback):
@@ -509,6 +509,14 @@
            except Exception as e:
                logger_debug.exception(e)
                fdata["today_history_k_bar_count"] = -1
            # 获取数据服务器是否联通
            try:
                is_data_server_open = socket_util.is_port_bind(9004)
                fdata["data_server_open"] = 1 if is_data_server_open else 0
            except Exception as e:
                logger_debug.exception(e)
                fdata["data_server_open"] = -1
            # 获取交易通道
            result = {"code": 0, "data": fdata, "msg": ""}
@@ -1133,8 +1141,10 @@
            elif ctype == "get_per_code_buy_money":
                self.send_response({"code": 0, "data": {"money": constant.BUY_MONEY_PER_CODE}}, client_id, request_id)
            elif ctype == "repaire_task":
                # 修复任务
                # 修复开盘啦任务
                kpl_data_manager.PullTask.repaire_pull_task()
                # 修复数据服务
                server_util.repaire_data_server()
                self.send_response({"code": 0, "data": {}}, client_id, request_id)
            elif ctype == "get_trade_queue":
                code = data["code"]
utils/server_util.py
New file
@@ -0,0 +1,23 @@
import threading
from log_module.log import logger_system
from third_data import data_server
from utils import tool, socket_util
DATA_SERVER_PORT = 9004
def run_data_server():
    logger_system.info("create DataServer")
    logger_system.info(f"createDataServer 线程ID:{tool.get_thread_id()}")
    tcpserver = data_server.run("", DATA_SERVER_PORT)
    tcpserver.serve_forever()
    try:
        tcpserver.serve_forever()
    except Exception as e:
        logger_system.exception(e)
        logger_system.error(f"端口服务器:{DATA_SERVER_PORT} 启动失败")
def repaire_data_server():
    if not socket_util.is_port_bind(DATA_SERVER_PORT):
        threading.Thread(target=run_data_server, daemon=True).start()
utils/socket_util.py
@@ -82,9 +82,11 @@
# 端口是否被占用
def is_port_bind(port):
def is_port_bind(port, timeout=1):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)
    result = sock.connect_ex(('127.0.0.1', port))
    sock.close()
    if result == 0:
        return True
    else:
@@ -92,4 +94,4 @@
if __name__ == "__main__":
    pass
    print(is_port_bind(9004))