HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: xnsbb3110 (1041)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/CyberCP/lib64/python3.10/site-packages/docker/utils/utils.py
import base64
import collections
import json
import os
import os.path
import shlex
import string
from datetime import datetime, timezone
from functools import lru_cache
from itertools import zip_longest
from urllib.parse import urlparse, urlunparse

from .. import errors
from ..constants import (
    BYTE_UNITS,
    DEFAULT_HTTP_HOST,
    DEFAULT_NPIPE,
    DEFAULT_UNIX_SOCKET,
)
from ..tls import TLSConfig

URLComponents = collections.namedtuple(
    'URLComponents',
    'scheme netloc url params query fragment',
)


def create_ipam_pool(*args, **kwargs):
    raise errors.DeprecatedMethod(
        'utils.create_ipam_pool has been removed. Please use a '
        'docker.types.IPAMPool object instead.'
    )


def create_ipam_config(*args, **kwargs):
    raise errors.DeprecatedMethod(
        'utils.create_ipam_config has been removed. Please use a '
        'docker.types.IPAMConfig object instead.'
    )


def decode_json_header(header):
    data = base64.b64decode(header)
    data = data.decode('utf-8')
    return json.loads(data)


@lru_cache(maxsize=None)
def compare_version(v1, v2):
    """Compare docker versions

    >>> v1 = '1.9'
    >>> v2 = '1.10'
    >>> compare_version(v1, v2)
    1
    >>> compare_version(v2, v1)
    -1
    >>> compare_version(v2, v2)
    0
    """
    if v1 == v2:
        return 0
    # Split into `sys.version_info` like tuples.
    s1 = tuple(int(p) for p in v1.split('.'))
    s2 = tuple(int(p) for p in v2.split('.'))
    # Compare each component, padding with 0 if necessary.
    for c1, c2 in zip_longest(s1, s2, fillvalue=0):
        if c1 == c2:
            continue
        elif c1 > c2:
            return -1
        else:
            return 1
    return 0


def version_lt(v1, v2):
    return compare_version(v1, v2) > 0


def version_gte(v1, v2):
    return not version_lt(v1, v2)


def _convert_port_binding(binding):
    result = {'HostIp': '', 'HostPort': ''}
    if isinstance(binding, tuple):
        if len(binding) == 2:
            result['HostPort'] = binding[1]
            result['HostIp'] = binding[0]
        elif isinstance(binding[0], str):
            result['HostIp'] = binding[0]
        else:
            result['HostPort'] = binding[0]
    elif isinstance(binding, dict):
        if 'HostPort' in binding:
            result['HostPort'] = binding['HostPort']
            if 'HostIp' in binding:
                result['HostIp'] = binding['HostIp']
        else:
            raise ValueError(binding)
    else:
        result['HostPort'] = binding

    if result['HostPort'] is None:
        result['HostPort'] = ''
    else:
        result['HostPort'] = str(result['HostPort'])

    return result


def convert_port_bindings(port_bindings):
    result = {}
    for k, v in iter(port_bindings.items()):
        key = str(k)
        if '/' not in key:
            key += '/tcp'
        if isinstance(v, list):
            result[key] = [_convert_port_binding(binding) for binding in v]
        else:
            result[key] = [_convert_port_binding(v)]
    return result


def convert_volume_binds(binds):
    if isinstance(binds, list):
        return binds

    result = []
    for k, v in binds.items():
        if isinstance(k, bytes):
            k = k.decode('utf-8')

        if isinstance(v, dict):
            if 'ro' in v and 'mode' in v:
                raise ValueError(
                    f'Binding cannot contain both "ro" and "mode": {v!r}'
                )

            bind = v['bind']
            if isinstance(bind, bytes):
                bind = bind.decode('utf-8')

            if 'ro' in v:
                mode = 'ro' if v['ro'] else 'rw'
            elif 'mode' in v:
                mode = v['mode']
            else:
                mode = 'rw'

            # NOTE: this is only relevant for Linux hosts
            # (doesn't apply in Docker Desktop)
            propagation_modes = [
                'rshared',
                'shared',
                'rslave',
                'slave',
                'rprivate',
                'private',
            ]
            if 'propagation' in v and v['propagation'] in propagation_modes:
                if mode:
                    mode = f"{mode},{v['propagation']}"
                else:
                    mode = v['propagation']

            result.append(
                f'{k}:{bind}:{mode}'
            )
        else:
            if isinstance(v, bytes):
                v = v.decode('utf-8')
            result.append(
                f'{k}:{v}:rw'
            )
    return result


def convert_tmpfs_mounts(tmpfs):
    if isinstance(tmpfs, dict):
        return tmpfs

    if not isinstance(tmpfs, list):
        raise ValueError(
            'Expected tmpfs value to be either a list or a dict, '
            f'found: {type(tmpfs).__name__}'
        )

    result = {}
    for mount in tmpfs:
        if isinstance(mount, str):
            if ":" in mount:
                name, options = mount.split(":", 1)
            else:
                name = mount
                options = ""

        else:
            raise ValueError(
                "Expected item in tmpfs list to be a string, "
                f"found: {type(mount).__name__}"
            )

        result[name] = options
    return result


def convert_service_networks(networks):
    if not networks:
        return networks
    if not isinstance(networks, list):
        raise TypeError('networks parameter must be a list.')

    result = []
    for n in networks:
        if isinstance(n, str):
            n = {'Target': n}
        result.append(n)
    return result


def parse_repository_tag(repo_name):
    parts = repo_name.rsplit('@', 1)
    if len(parts) == 2:
        return tuple(parts)
    parts = repo_name.rsplit(':', 1)
    if len(parts) == 2 and '/' not in parts[1]:
        return tuple(parts)
    return repo_name, None


def parse_host(addr, is_win32=False, tls=False):
    # Sensible defaults
    if not addr and is_win32:
        return DEFAULT_NPIPE
    if not addr or addr.strip() == 'unix://':
        return DEFAULT_UNIX_SOCKET

    addr = addr.strip()

    parsed_url = urlparse(addr)
    proto = parsed_url.scheme
    if not proto or any(x not in f"{string.ascii_letters}+" for x in proto):
        # https://bugs.python.org/issue754016
        parsed_url = urlparse(f"//{addr}", 'tcp')
        proto = 'tcp'

    if proto == 'fd':
        raise errors.DockerException('fd protocol is not implemented')

    # These protos are valid aliases for our library but not for the
    # official spec
    if proto == 'http' or proto == 'https':
        tls = proto == 'https'
        proto = 'tcp'
    elif proto == 'http+unix':
        proto = 'unix'

    if proto not in ('tcp', 'unix', 'npipe', 'ssh'):
        raise errors.DockerException(
            f"Invalid bind address protocol: {addr}"
        )

    if proto == 'tcp' and not parsed_url.netloc:
        # "tcp://" is exceptionally disallowed by convention;
        # omitting a hostname for other protocols is fine
        raise errors.DockerException(
            f'Invalid bind address format: {addr}'
        )

    if any([
        parsed_url.params, parsed_url.query, parsed_url.fragment,
        parsed_url.password
    ]):
        raise errors.DockerException(
            f'Invalid bind address format: {addr}'
        )

    if parsed_url.path and proto == 'ssh':
        raise errors.DockerException(
            f'Invalid bind address format: no path allowed for this protocol: {addr}'
        )
    else:
        path = parsed_url.path
        if proto == 'unix' and parsed_url.hostname is not None:
            # For legacy reasons, we consider unix://path
            # to be valid and equivalent to unix:///path
            path = f"{parsed_url.hostname}/{path}"

    netloc = parsed_url.netloc
    if proto in ('tcp', 'ssh'):
        port = parsed_url.port or 0
        if port <= 0:
            if proto != 'ssh':
                raise errors.DockerException(
                    f'Invalid bind address format: port is required: {addr}'
                )
            port = 22
            netloc = f'{parsed_url.netloc}:{port}'

        if not parsed_url.hostname:
            netloc = f'{DEFAULT_HTTP_HOST}:{port}'

    # Rewrite schemes to fit library internals (requests adapters)
    if proto == 'tcp':
        proto = f"http{'s' if tls else ''}"
    elif proto == 'unix':
        proto = 'http+unix'

    if proto in ('http+unix', 'npipe'):
        return f"{proto}://{path}".rstrip('/')

    return urlunparse(URLComponents(
        scheme=proto,
        netloc=netloc,
        url=path,
        params='',
        query='',
        fragment='',
    )).rstrip('/')


def parse_devices(devices):
    device_list = []
    for device in devices:
        if isinstance(device, dict):
            device_list.append(device)
            continue
        if not isinstance(device, str):
            raise errors.DockerException(
                f'Invalid device type {type(device)}'
            )
        device_mapping = device.split(':')
        if device_mapping:
            path_on_host = device_mapping[0]
            if len(device_mapping) > 1:
                path_in_container = device_mapping[1]
            else:
                path_in_container = path_on_host
            if len(device_mapping) > 2:
                permissions = device_mapping[2]
            else:
                permissions = 'rwm'
            device_list.append({
                'PathOnHost': path_on_host,
                'PathInContainer': path_in_container,
                'CgroupPermissions': permissions
            })
    return device_list


def kwargs_from_env(environment=None):
    if not environment:
        environment = os.environ
    host = environment.get('DOCKER_HOST')

    # empty string for cert path is the same as unset.
    cert_path = environment.get('DOCKER_CERT_PATH') or None

    # empty string for tls verify counts as "false".
    # Any value or 'unset' counts as true.
    tls_verify = environment.get('DOCKER_TLS_VERIFY')
    if tls_verify == '':
        tls_verify = False
    else:
        tls_verify = tls_verify is not None
    enable_tls = cert_path or tls_verify

    params = {}

    if host:
        params['base_url'] = host

    if not enable_tls:
        return params

    if not cert_path:
        cert_path = os.path.join(os.path.expanduser('~'), '.docker')

    params['tls'] = TLSConfig(
        client_cert=(os.path.join(cert_path, 'cert.pem'),
                     os.path.join(cert_path, 'key.pem')),
        ca_cert=os.path.join(cert_path, 'ca.pem'),
        verify=tls_verify,
    )

    return params


def convert_filters(filters):
    result = {}
    for k, v in iter(filters.items()):
        if isinstance(v, bool):
            v = 'true' if v else 'false'
        if not isinstance(v, list):
            v = [v, ]
        result[k] = [
            str(item) if not isinstance(item, str) else item
            for item in v
        ]
    return json.dumps(result)


def datetime_to_timestamp(dt):
    """Convert a datetime to a Unix timestamp"""
    delta = dt.astimezone(timezone.utc) - datetime(1970, 1, 1, tzinfo=timezone.utc)
    return delta.seconds + delta.days * 24 * 3600


def parse_bytes(s):
    if isinstance(s, (int, float,)):
        return s
    if len(s) == 0:
        return 0

    if s[-2:-1].isalpha() and s[-1].isalpha():
        if s[-1] == "b" or s[-1] == "B":
            s = s[:-1]
    units = BYTE_UNITS
    suffix = s[-1].lower()

    # Check if the variable is a string representation of an int
    # without a units part. Assuming that the units are bytes.
    if suffix.isdigit():
        digits_part = s
        suffix = 'b'
    else:
        digits_part = s[:-1]

    if suffix in units.keys() or suffix.isdigit():
        try:
            digits = float(digits_part)
        except ValueError as ve:
            raise errors.DockerException(
                'Failed converting the string value for memory '
                f'({digits_part}) to an integer.'
            ) from ve

        # Reconvert to long for the final result
        s = int(digits * units[suffix])
    else:
        raise errors.DockerException(
            f'The specified value for memory ({s}) should specify the units. '
            'The postfix should be one of the `b` `k` `m` `g` characters'
        )

    return s


def normalize_links(links):
    if isinstance(links, dict):
        links = iter(links.items())

    return [f'{k}:{v}' if v else k for k, v in sorted(links)]


def parse_env_file(env_file):
    """
    Reads a line-separated environment file.
    The format of each line should be "key=value".
    """
    environment = {}

    with open(env_file) as f:
        for line in f:

            if line[0] == '#':
                continue

            line = line.strip()
            if not line:
                continue

            parse_line = line.split('=', 1)
            if len(parse_line) == 2:
                k, v = parse_line
                environment[k] = v
            else:
                raise errors.DockerException(
                    f'Invalid line in environment file {env_file}:\n{line}')

    return environment


def split_command(command):
    return shlex.split(command)


def format_environment(environment):
    def format_env(key, value):
        if value is None:
            return key
        if isinstance(value, bytes):
            value = value.decode('utf-8')

        return f'{key}={value}'
    return [format_env(*var) for var in iter(environment.items())]


def format_extra_hosts(extra_hosts, task=False):
    # Use format dictated by Swarm API if container is part of a task
    if task:
        return [
            f'{v} {k}' for k, v in sorted(iter(extra_hosts.items()))
        ]

    return [
        f'{k}:{v}' for k, v in sorted(iter(extra_hosts.items()))
    ]


def create_host_config(self, *args, **kwargs):
    raise errors.DeprecatedMethod(
        'utils.create_host_config has been removed. Please use a '
        'docker.types.HostConfig object instead.'
    )