$77 GRAYBYTE WORDPRESS FILE MANAGER $43

SERVER : vnpttt-amd7f72-h1.vietnix.vn #1 SMP Fri May 24 12:42:50 UTC 2024
SERVER IP : 103.200.23.149 | ADMIN IP 216.73.216.22
OPTIONS : CRL = ON | WGT = ON | SDO = OFF | PKEX = OFF
DEACTIVATED : NONE

/opt/cloudlinux/venv/lib/python3.11/site-packages/xray/internal/

HOME
Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/xray/internal//user_manager.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains class implementing part of X-Ray Manager
related to User Agent management (enable/disable/status)
"""
import os
import subprocess
import logging
from xray import gettext as _
from .exceptions import XRayManagerError

from clcommon.utils import is_ubuntu


class ManageUserAgent:
    """
    Manager actions related to user plugin
    """
    def __init__(self):
        self.service_name = 'xray-user-agent'
        self.service_socket = 'xray-user-agent.socket'
        self.logger = logging.getLogger('user_plugin_manager')
        self.subprocess_errors = (
            OSError, ValueError, subprocess.SubprocessError
        )

    @staticmethod
    def is_systemd():
        """
        Check for systemd available
        """
        return os.path.isfile('/bin/systemctl')

    def _run_external(self, *args, capture_output=True, text=True,
                      check=False) -> subprocess.CompletedProcess:
        """
        Call subprocess described with given *args
        """
        try:
            result = subprocess.run(args,
                                    capture_output=capture_output,
                                    text=text,
                                    check=check)
            self.logger.info("Command '%s' succeeded", {' '.join(args)})
        except subprocess.CalledProcessError as e:
            self.logger.error("External command '%s' failed with code %s: %s",
                              e.cmd, e.returncode, e.stdout or e.stderr,
                              extra={'cmd': e.cmd, 'retcode': e.returncode,
                                     'stdout': e.stdout, 'stderr': e.stderr})
            raise XRayManagerError(
                _("External command '{}' failed with code {}: {}".format(str(e.cmd),
                                                                             str(e.returncode),
                                                                             str(e.stderr) or str(e.stderr))))
        except self.subprocess_errors as e:
            self.logger.error("Failed to run '%s' external command",
                              ' '.join(args),
                              extra={'err': str(e)})
            raise XRayManagerError(
                _("Failed to run '{}' external command: {}".format(' '.join(args), str(e))))
        return result

    def _autostart(self, switch=True):
        """
        Switch autostart ON or OFF
        """
        autostart_select = {
            True: ('enable', 'on'),
            False: ('disable', 'off')
        }
        if self.is_systemd():
            self._run_external('/bin/systemctl',
                               autostart_select[switch][0],
                               self.service_socket)
        else:
            self._run_external('/sbin/chkconfig',
                               self.service_name,
                               autostart_select[switch][1])

    def user_agent_service(self, command: str, socket_unit=False,
                           check_retcode=False) -> subprocess.CompletedProcess:
        """
        Run /sbin/service utility to make given operation
        with X-Ray User Agent service
        :command: command to invoke
        :check_retcode: whether to run with check or not
        :return: subprocess info about completed process
        """
        if is_ubuntu():
            srv = self.service_name
        else:
            srv = self.service_socket if socket_unit else self.service_name
        try:
            return self._run_external('/sbin/service', srv, command,
                                      check=check_retcode)
        except XRayManagerError as e:
            raise XRayManagerError(_("X-Ray User Agent '{}' failed: {}".format(command, str(e.reason))))

    def enable(self) -> 'json str':
        """
        Enable X-Ray User Agent:
            start or restart service if it is accidentally already running
        For systemd systems -- start socket unit only
        For SysV -- start the entire service
        :return: JSON encoded result of enable action
        """
        self._autostart()
        agent_status = self.user_agent_service('status')
        if agent_status.returncode:
            if self.is_systemd():
                self.user_agent_service('start', socket_unit=True,
                                        check_retcode=True)
            else:
                self.user_agent_service('start', check_retcode=True)
        else:
            self.user_agent_service('restart', check_retcode=True)

    def disable(self) -> 'json str':
        """
        Disable X-Ray User Agent:
             stop the entire service
             or do nothing if it is accidentally not running
        For systemd systems -- also check if socket unit is running
                               and stop it too
        :return: JSON encoded result of disable action
        """
        self._autostart(False)
        agent_status = self.user_agent_service('status')
        if not agent_status.returncode:
            self.user_agent_service('stop', check_retcode=True)
        if self.is_systemd():
            if not self.user_agent_service('status',
                                           socket_unit=True).returncode:
                self.user_agent_service('stop', socket_unit=True,
                                        check_retcode=True)

    def status(self) -> 'json str':
        """
        Get status of X-Ray User Agent service
        :return: JSON encoded result of status action
        """
        if self.is_systemd():
            _status = self.user_agent_service('status', socket_unit=True)
        else:
            _status = self.user_agent_service('status')
        return 'disabled' if _status.returncode else 'enabled'

Current_dir [ NOT WRITEABLE ] Document_root [ WRITEABLE ]


[ Back ]
NAME
SIZE
LAST TOUCH
USER
CAN-I?
FUNCTIONS
..
--
16 Dec 2025 9.31 PM
root / root
0755
__pycache__
--
17 Dec 2025 2.54 AM
root / root
0755
__init__.py
0.537 KB
20 Oct 2025 7.04 PM
root / root
0644
advanced_metrics.py
1.415 KB
20 Oct 2025 7.04 PM
root / root
0644
clwpos_safe_imports.py
2.213 KB
20 Oct 2025 7.04 PM
root / root
0644
constants.py
2.14 KB
20 Oct 2025 7.04 PM
root / root
0644
exceptions.py
3.693 KB
20 Oct 2025 7.04 PM
root / root
0644
fpm_utils.py
2.247 KB
20 Oct 2025 7.04 PM
root / root
0644
local_counters.py
3.369 KB
20 Oct 2025 7.04 PM
root / root
0644
nginx_utils.py
7.106 KB
20 Oct 2025 7.04 PM
root / root
0644
phpinfo_utils.py
5.998 KB
20 Oct 2025 7.04 PM
root / root
0644
types.py
23.512 KB
20 Oct 2025 7.04 PM
root / root
0644
user_limits.py
4.514 KB
20 Oct 2025 7.04 PM
root / root
0644
user_manager.py
5.854 KB
20 Oct 2025 7.04 PM
root / root
0644
user_plugin_utils.py
8.846 KB
20 Oct 2025 7.04 PM
root / root
0644
utils.py
27.327 KB
20 Oct 2025 7.04 PM
root / root
0644

GRAYBYTE WORDPRESS FILE MANAGER @ 2026 CONTACT ME
Static GIF