HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: xnsbb3110 (1041)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/CyberCP/lib64/python3.10/site-packages/docker/credentials/store.py
import errno
import json
import shutil
import subprocess
import warnings

from . import constants, errors
from .utils import create_environment_dict


class Store:
    def __init__(self, program, environment=None):
        """ Create a store object that acts as an interface to
            perform the basic operations for storing, retrieving
            and erasing credentials using `program`.
        """
        self.program = constants.PROGRAM_PREFIX + program
        self.exe = shutil.which(self.program)
        self.environment = environment
        if self.exe is None:
            warnings.warn(
                f'{self.program} not installed or not available in PATH',
                stacklevel=1,
            )

    def get(self, server):
        """ Retrieve credentials for `server`. If no credentials are found,
            a `StoreError` will be raised.
        """
        if not isinstance(server, bytes):
            server = server.encode('utf-8')
        data = self._execute('get', server)
        result = json.loads(data.decode('utf-8'))

        # docker-credential-pass will return an object for inexistent servers
        # whereas other helpers will exit with returncode != 0. For
        # consistency, if no significant data is returned,
        # raise CredentialsNotFound
        if result['Username'] == '' and result['Secret'] == '':
            raise errors.CredentialsNotFound(
                f'No matching credentials in {self.program}'
            )

        return result

    def store(self, server, username, secret):
        """ Store credentials for `server`. Raises a `StoreError` if an error
            occurs.
        """
        data_input = json.dumps({
            'ServerURL': server,
            'Username': username,
            'Secret': secret
        }).encode('utf-8')
        return self._execute('store', data_input)

    def erase(self, server):
        """ Erase credentials for `server`. Raises a `StoreError` if an error
            occurs.
        """
        if not isinstance(server, bytes):
            server = server.encode('utf-8')
        self._execute('erase', server)

    def list(self):
        """ List stored credentials. Requires v0.4.0+ of the helper.
        """
        data = self._execute('list', None)
        return json.loads(data.decode('utf-8'))

    def _execute(self, subcmd, data_input):
        if self.exe is None:
            raise errors.StoreError(
                f'{self.program} not installed or not available in PATH'
            )
        output = None
        env = create_environment_dict(self.environment)
        try:
            output = subprocess.check_output(
                [self.exe, subcmd], input=data_input, env=env,
            )
        except subprocess.CalledProcessError as e:
            raise errors.process_store_error(e, self.program) from e
        except OSError as e:
            if e.errno == errno.ENOENT:
                raise errors.StoreError(
                    f'{self.program} not installed or not available in PATH'
                ) from e
            else:
                raise errors.StoreError(
                    f'Unexpected OS error "{e.strerror}", errno={e.errno}'
                ) from e
        return output