$91 GRAYBYTE WORDPRESS FILE MANAGER $93

SERVER : vnpttt-amd7f72-h1.vietnix.vn #1 SMP Fri May 24 12:42:50 UTC 2024
SERVER IP : 103.200.23.149 | ADMIN IP 216.73.216.22
OPTIONS : CRL = ON | WGT = ON | SDO = OFF | PKEX = OFF
DEACTIVATED : NONE

/opt/cloudlinux/venv/lib/python3.11/site-packages/xray/console_utils/

HOME
Current File : /opt/cloudlinux/venv/lib/python3.11/site-packages/xray/console_utils//validations.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains functions for performing necessary validations
"""
import ipaddress
import json
import logging
import re
from argparse import Namespace
from urllib.parse import urlparse

from schema import Schema, And, Optional, Regex, SchemaError, Use, Or
from xray.internal.utils import read_sys_id

logger = logging.getLogger('validations')

# --- URL validator regexps taken from Django framework ---
ul = '\u00a1-\uffff'  # unicode letters range (must not be a raw string)
# IP patterns
ipv4_re = r'(?:25[0-5]|2[0-4]\d|[0-1]?\d?\d)(?:\.(?:25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}'
# Host patterns
hostname_re = r'[a-z' + ul + r'0-9](?:[a-z' + ul + r'0-9-]{0,61}[a-z' + ul + r'0-9])?'
# Max length for domain name labels is 63 characters per RFC 1034 sec. 3.1
domain_re = r'(?:\.(?!-)[a-z' + ul + r'0-9-]{1,63}(?<!-))*'
tld_re = (
        r'\.'  # dot
        r'(?!-)'  # can't start with a dash
        r'(?:[a-z' + ul + '-]{2,63}'  # domain label
                          r'|xn--[a-z0-9]{1,59})'  # or punycode label
                          r'(?<!-)'  # can't end with a dash
                          r'\.?'  # may have a trailing dot
)
host_re = '(' + hostname_re + domain_re + tld_re + '|localhost)'
regex = re.compile(
    r'^(?:http)s?://'  # scheme
    r'(?:[^\s:@/]+(?::[^\s:@/]*)?@)?'  # user:pass authentication
    r'(?:' + ipv4_re + '|' + host_re + ')'
                                       r'(?::\d{2,5})?'  # port
                                       r'(?:[/?#][^\s]*)?'  # resource path
                                       r'\Z', re.IGNORECASE)
# ---

# --- EMAIL validator regexp taken from https://emailregex.com/ ---
email_regex = r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"""
# ---


# --- Validation helper functions
def is_ip_valid(ipaddr: str) -> bool:
    """
    Checks if given IP address is a valid one
    :param ipaddr: IP address
    :return: True if IP address is valid, False otherwise
    """
    if isinstance(ipaddr, int):
        return False
    try:
        ipaddress.ip_address(ipaddr)
        return True
    except ValueError:
        return False


def is_ipv4_valid(ipaddr: str) -> bool:
    """
    Checks if given IP address is a valid one for IPv4
    '*' is also allowed
    :param ipaddr: IPv4 address
    :return: True if IPv4 address is valid, False otherwise
    """
    if isinstance(ipaddr, int):
        return False
    if ipaddr == '*':
        return True
    try:
        ipaddress.IPv4Address(ipaddr)
        return True
    except ValueError:
        return False


def url_cast(orig_url: str) -> str:
    """
    Add http scheme if original url is missing it
    """
    fragments = urlparse(orig_url)
    if not fragments.scheme:
        return f'http://{orig_url}'
    return orig_url
# ---


# --- Validation Schemas
validation_schema = Schema({
    'command': And(str, lambda c: c in ('start', 'stop', 'continue',
                                        'complete', 'delete',
                                        'autocomplete-tasks',
                                        'enable-continuous',
                                        'disable-continuous',
                                        'start-continuous',
                                        'stop-continuous',
                                        'continuous-tracing-list',
                                        'tasks-list', 'requests-list',
                                        'request-data',
                                        'enable-user-agent',
                                        'disable-user-agent',
                                        'user-agent-status',
                                        'advanced-metrics',
                                        'enable-serverwide-mode',
                                        'disable-serverwide-mode'),
                   error='Invalid command'),
    Optional('lang'): Or(str, None),
    'system_id': And(str, lambda s: s == read_sys_id(),
                     error='system_id is invalid'),
    Optional('tracing_task_id'): str,
    Optional('url'): And(Use(url_cast), Regex(regex, error='URL is invalid')),
    Optional('email'): And(str, Regex(email_regex, error='EMAIL is invalid')),
    Optional('client_ip'): And(str, is_ipv4_valid,
                               error='IP is invalid'),
    Optional('time'): And(int, lambda t: 0 < t <= 2880,
                          error='minimum time count is 1 minute, maximum - 48 hours (2880 minutes)'),
    Optional('request_qty'): And(int, lambda q: 0 < q <= 100,
                                 error='minimum request_qty count is 1, maximum - 100'),
    Optional('request_id'): And(int, lambda q: q > 0,
                                error='minimum request_id is 1'),
    Optional('enable'): Or(bool, None),
    Optional('disable'): Or(bool, None),
    Optional('status'): Or(bool, None),
})

validation_user_schema = Schema({
    'command': And(str, lambda c: c in ('start', 'stop', 'continue',
                                        'complete', 'delete',
                                        'tasks-list', 'requests-list',
                                        'request-data', 'user-agent-status'),
                   error='Invalid user command'),
    Optional('lang'): Or(str, None),
    Optional('tracing_task_id'): str,
    Optional('url'): And(Use(url_cast), Regex(regex, error='URL is invalid')),
    Optional('client_ip'): And(str, is_ipv4_valid,
                               error='IP is invalid'),
    Optional('time'): And(int, lambda t: 0 < t <= 2880,
                          error='minimum time count is 1 minute, maximum - 48 hours (2880 minutes)'),
    Optional('request_qty'): And(int, lambda q: 0 < q <= 100,
                                 error='minimum request_qty count is 1, maximum - 100'),
    Optional('request_id'): And(int, lambda q: q > 0,
                                error='minimum request_id is 1')
})

validation_adviser_schema = Schema({
    Optional('api_version'): Or(str, None),
    Optional('lang'): Or(str, None),
    'command': And(str, lambda c: c in ('list',
                                        'details',
                                        'apply',
                                        'rollback',
                                        'subscription',
                                        'agreement',
                                        'counters',
                                        'status',
                                        'sites-status',
                                        'wordpress-plugin-install',
                                        'wordpress-plugin-uninstall',
                                        'get-options',
                                        'get-limits',
                                        'get-usage',
                                        'update-advices-metadata',

                                        'awp-cdn-get-pullzone',
                                        'awp-cdn-remove-pullzone',
                                        'awp-cdn-purge',
                                        'awp-sync',
                                        'get-cdn-usage',
                                        'report-analytics',

                                        'wp-plugin-data',
                                        'wp-plugin-copy'),
                   error='Invalid command'),
    Optional('advice_id'): Or(str, None),
    Optional('ignore_errors'): bool,
    Optional('async_mode'): bool,
    Optional('source'): Or(str, None),
    Optional('reason'): Or(str, None),
    Optional('extends'): bool,
    Optional('listen'): bool,
    Optional('username'): Or(str, None),

    Optional('account_id'): Or(str, None),
    Optional('domain'): str,
    Optional('website'): str,
    Optional('text'): str,
    Optional('accept_license_terms'): bool,

    Optional('analytics_data'): Or(str, None),

    # for analytics reporting
    Optional('feature'): Or(str, None),
    Optional('event'): Or(str, None),
    Optional('user_hash'): Or(str, None),
    Optional('journey_id'): Or(str, None),
    Optional('variant_id'): Or(str, None),

    # for wp plugin manager
    Optional('plugin_name'): str,
    Optional('plugin_version'): str,
    Optional('tmp_dir'): str,
})

# ---


def _validate(input_args: dict, _scheme: Schema) -> Namespace:
    """
    Validate given input with schema
    Input arguments expected in s dict form
    :param input_args: dict with input data
    :param _scheme: schema for validation
    """
    try:
        return Namespace(**(_scheme.validate(input_args)))
    except SchemaError as e:
        logger.error('Input validation error', extra={'err': str(e)})
        raise SystemExit(json.dumps({
            'result': f'Input validation error: {str(e)}'
        }))


def validate(input_args: dict) -> Namespace:
    """
    Validate given input with schema
    Input arguments expected in s dict form
    :param input_args: dict with input data
    :param _scheme: schema for validation
    """
    return _validate(input_args, validation_schema)


def validate_user(input_args: dict) -> Namespace:
    """
    Validate given input with schema
    Input arguments expected in s dict form
    :param input_args: dict with input data
    :param _scheme: schema for validation
    """
    return _validate(input_args, validation_user_schema)


def validate_adviser(input_args: dict) -> Namespace:
    """
    Validate given input with schema
    Input arguments expected in s dict form
    :param input_args: dict with input data
    :param _scheme: schema for validation
    """
    return _validate(input_args, validation_adviser_schema)

Current_dir [ NOT WRITEABLE ] Document_root [ WRITEABLE ]


[ Back ]
NAME
SIZE
LAST TOUCH
USER
CAN-I?
FUNCTIONS
..
--
16 Dec 2025 9.31 PM
root / root
0755
__pycache__
--
17 Dec 2025 2.54 AM
root / root
0755
run_user
--
16 Dec 2025 9.31 PM
root / root
0755
__init__.py
0 KB
20 Oct 2025 7.04 PM
root / root
0644
base_app.py
0.921 KB
20 Oct 2025 7.04 PM
root / root
0644
cmdline_parser.py
23.708 KB
20 Oct 2025 7.04 PM
root / root
0644
run_agent.py
1.715 KB
20 Oct 2025 7.04 PM
root / root
0644
run_continuous.py
0.618 KB
20 Oct 2025 7.04 PM
root / root
0644
run_manager.py
3.946 KB
20 Oct 2025 7.04 PM
root / root
0644
run_smart_advice.py
7.438 KB
20 Oct 2025 7.04 PM
root / root
0644
validations.py
10.066 KB
20 Oct 2025 7.04 PM
root / root
0644

GRAYBYTE WORDPRESS FILE MANAGER @ 2026 CONTACT ME
Static GIF