File: //proc/self/root/usr/local/CyberPanel/lib64/python3.10/site-packages/asyncssh/auth.py
# Copyright (c) 2013-2025 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation
"""SSH authentication handlers"""
from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional
from typing import Sequence, Tuple, Type, Union, cast
from .constants import DEFAULT_LANG
from .gss import GSSBase, GSSError
from .logging import SSHLogger
from .misc import ProtocolError, PasswordChangeRequired, get_symbol_names
from .misc import run_in_executor
from .packet import Boolean, String, UInt32, SSHPacket, SSHPacketHandler
from .public_key import SigningKey
from .saslprep import saslprep, SASLPrepError
if TYPE_CHECKING:
    import asyncio
    # pylint: disable=cyclic-import
    from .connection import SSHConnection, SSHClientConnection
    from .connection import SSHServerConnection
KbdIntPrompts = Sequence[Tuple[str, bool]]
KbdIntNewChallenge = Tuple[str, str, str, KbdIntPrompts]
KbdIntChallenge = Union[bool, KbdIntNewChallenge]
KbdIntResponse = Sequence[str]
PasswordChangeResponse = Tuple[str, str]
# SSH message values for GSS auth
MSG_USERAUTH_GSSAPI_RESPONSE          = 60
MSG_USERAUTH_GSSAPI_TOKEN             = 61
MSG_USERAUTH_GSSAPI_EXCHANGE_COMPLETE = 63
MSG_USERAUTH_GSSAPI_ERROR             = 64
MSG_USERAUTH_GSSAPI_ERRTOK            = 65
MSG_USERAUTH_GSSAPI_MIC               = 66
# SSH message values for public key auth
MSG_USERAUTH_PK_OK                    = 60
# SSH message values for keyboard-interactive auth
MSG_USERAUTH_INFO_REQUEST             = 60
MSG_USERAUTH_INFO_RESPONSE            = 61
# SSH message values for password auth
MSG_USERAUTH_PASSWD_CHANGEREQ         = 60
_auth_methods: List[bytes] = []
_client_auth_handlers: Dict[bytes, Type['ClientAuth']] = {}
_server_auth_handlers: Dict[bytes, Type['ServerAuth']] = {}
class Auth(SSHPacketHandler):
    """Parent class for authentication"""
    def __init__(self, conn: 'SSHConnection', coro: Awaitable[None]):
        self._conn = conn
        self._logger = conn.logger
        self._coro: Optional['asyncio.Task[None]'] = conn.create_task(coro)
    def send_packet(self, pkttype: int, *args: bytes,
                    trivial: bool = True) -> None:
        """Send an auth packet"""
        self._conn.send_userauth_packet(pkttype, *args, handler=self,
                                        trivial=trivial)
    @property
    def logger(self) -> SSHLogger:
        """A logger associated with this authentication handler"""
        return self._logger
    def create_task(self, coro: Awaitable[None]) -> None:
        """Create an asynchronous auth task"""
        self.cancel()
        self._coro = self._conn.create_task(coro)
    def cancel(self) -> None:
        """Cancel any authentication in progress"""
        if self._coro: # pragma: no branch
            self._coro.cancel()
            self._coro = None
class ClientAuth(Auth):
    """Parent class for client authentication"""
    _conn: 'SSHClientConnection'
    def __init__(self, conn: 'SSHClientConnection', method: bytes):
        self._method = method
        super().__init__(conn, self._start())
    async def _start(self) -> None:
        """Abstract method for starting client authentication"""
        # Provided by subclass
        raise NotImplementedError
    def auth_succeeded(self) -> None:
        """Callback when auth succeeds"""
    def auth_failed(self) -> None:
        """Callback when auth fails"""
    async def send_request(self, *args: bytes,
                           key: Optional[SigningKey] = None,
                           trivial: bool = True) -> None:
        """Send a user authentication request"""
        await self._conn.send_userauth_request(self._method, *args, key=key,
                                               trivial=trivial)
class _ClientNullAuth(ClientAuth):
    """Client side implementation of null auth"""
    async def _start(self) -> None:
        """Start client null authentication"""
        await self.send_request()
class _ClientGSSKexAuth(ClientAuth):
    """Client side implementation of GSS key exchange auth"""
    async def _start(self) -> None:
        """Start client GSS key exchange authentication"""
        if self._conn.gss_kex_auth_requested():
            self.logger.debug1('Trying GSS key exchange auth')
            await self.send_request(key=self._conn.get_gss_context(),
                                    trivial=False)
        else:
            self._conn.try_next_auth(next_method=True)
class _ClientGSSMICAuth(ClientAuth):
    """Client side implementation of GSS MIC auth"""
    _handler_names = get_symbol_names(globals(), 'MSG_USERAUTH_GSSAPI_')
    def __init__(self, conn: 'SSHClientConnection', method: bytes):
        super().__init__(conn, method)
        self._gss: Optional[GSSBase] = None
        self._got_error = False
    async def _start(self) -> None:
        """Start client GSS MIC authentication"""
        if self._conn.gss_mic_auth_requested():
            self.logger.debug1('Trying GSS MIC auth')
            self._gss = self._conn.get_gss_context()
            self._gss.reset()
            mechs = b''.join(String(mech) for mech in self._gss.mechs)
            await self.send_request(UInt32(len(self._gss.mechs)), mechs)
        else:
            self._conn.try_next_auth(next_method=True)
    def _finish(self) -> None:
        """Finish client GSS MIC authentication"""
        assert self._gss is not None
        if self._gss.provides_integrity:
            data = self._conn.get_userauth_request_data(self._method)
            self.send_packet(MSG_USERAUTH_GSSAPI_MIC,
                             String(self._gss.sign(data)),
                             trivial=False)
        else:
            self.send_packet(MSG_USERAUTH_GSSAPI_EXCHANGE_COMPLETE)
    async def _process_response(self, _pkttype: int, _pktid: int,
                                packet: SSHPacket) -> None:
        """Process a GSS response from the server"""
        mech = packet.get_string()
        packet.check_end()
        assert self._gss is not None
        if mech not in self._gss.mechs:
            raise ProtocolError('Mechanism mismatch')
        try:
            token = await run_in_executor(self._gss.step)
            assert token is not None
            self.send_packet(MSG_USERAUTH_GSSAPI_TOKEN, String(token))
            if self._gss.complete:
                self._finish()
        except GSSError as exc:
            if exc.token:
                self.send_packet(MSG_USERAUTH_GSSAPI_ERRTOK, String(exc.token))
            self._conn.try_next_auth(next_method=True)
    async def _process_token(self, _pkttype: int, _pktid: int,
                             packet: SSHPacket) -> None:
        """Process a GSS token from the server"""
        token: Optional[bytes] = packet.get_string()
        packet.check_end()
        assert self._gss is not None
        try:
            token = await run_in_executor(self._gss.step, token)
            if token:
                self.send_packet(MSG_USERAUTH_GSSAPI_TOKEN, String(token))
            if self._gss.complete:
                self._finish()
        except GSSError as exc:
            if exc.token:
                self.send_packet(MSG_USERAUTH_GSSAPI_ERRTOK, String(exc.token))
            self._conn.try_next_auth(next_method=True)
    def _process_error(self, _pkttype: int, _pktid: int,
                       packet: SSHPacket) -> None:
        """Process a GSS error from the server"""
        _ = packet.get_uint32()         # major_status
        _ = packet.get_uint32()         # minor_status
        msg = packet.get_string()
        _ = packet.get_string()         # lang
        packet.check_end()
        self.logger.debug1('GSS error from server: %s', msg)
        self._got_error = True
    async def _process_error_token(self, _pkttype: int, _pktid: int,
                                   packet: SSHPacket) -> None:
        """Process a GSS error token from the server"""
        token = packet.get_string()
        packet.check_end()
        assert self._gss is not None
        try:
            await run_in_executor(self._gss.step, token)
        except GSSError as exc:
            if not self._got_error: # pragma: no cover
                self.logger.debug1('GSS error from server: %s', str(exc))
    _packet_handlers = {
        MSG_USERAUTH_GSSAPI_RESPONSE: _process_response,
        MSG_USERAUTH_GSSAPI_TOKEN:    _process_token,
        MSG_USERAUTH_GSSAPI_ERROR:    _process_error,
        MSG_USERAUTH_GSSAPI_ERRTOK:   _process_error_token
    }
class _ClientHostBasedAuth(ClientAuth):
    """Client side implementation of host based auth"""
    async def _start(self) -> None:
        """Start client host based authentication"""
        keypair, client_host, client_username = \
            await self._conn.host_based_auth_requested()
        if keypair is None:
            self._conn.try_next_auth(next_method=True)
            return
        self.logger.debug1('Trying host based auth of user %s on host %s '
                           'with %s host key', client_username, client_host,
                           keypair.algorithm)
        try:
            await self.send_request(String(keypair.algorithm),
                                    String(keypair.public_data),
                                    String(client_host),
                                    String(client_username), key=keypair)
        except ValueError as exc:
            self.logger.debug1('Host based auth failed: %s', str(exc))
            self._conn.try_next_auth()
class _ClientPublicKeyAuth(ClientAuth):
    """Client side implementation of public key auth"""
    _handler_names = get_symbol_names(globals(), 'MSG_USERAUTH_PK_')
    async def _start(self) -> None:
        """Start client public key authentication"""
        self._keypair = await self._conn.public_key_auth_requested()
        if self._keypair is None:
            self._conn.try_next_auth(next_method=True)
            return
        self.logger.debug1('Trying public key auth with %s key',
                           self._keypair.algorithm)
        await self.send_request(Boolean(False),
                                String(self._keypair.algorithm),
                                String(self._keypair.public_data))
    async def _send_signed_request(self) -> None:
        """Send signed public key request"""
        assert self._keypair is not None
        self.logger.debug1('Signing request with %s key',
                           self._keypair.algorithm)
        try:
            await self.send_request(Boolean(True),
                                    String(self._keypair.algorithm),
                                    String(self._keypair.public_data),
                                    key=self._keypair, trivial=False)
        except ValueError as exc:
            self.logger.debug1('Public key auth failed: %s', str(exc))
            self._conn.try_next_auth()
    def _process_public_key_ok(self, _pkttype: int, _pktid: int,
                               packet: SSHPacket) -> None:
        """Process a public key ok response"""
        algorithm = packet.get_string()
        key_data = packet.get_string()
        packet.check_end()
        assert self._keypair is not None
        if (algorithm != self._keypair.algorithm or
                key_data != self._keypair.public_data):
            raise ProtocolError('Key mismatch')
        self.create_task(self._send_signed_request())
    _packet_handlers = {
        MSG_USERAUTH_PK_OK: _process_public_key_ok
    }
class _ClientKbdIntAuth(ClientAuth):
    """Client side implementation of keyboard-interactive auth"""
    _handler_names = get_symbol_names(globals(), 'MSG_USERAUTH_INFO_')
    async def _start(self) -> None:
        """Start client keyboard interactive authentication"""
        submethods = await self._conn.kbdint_auth_requested()
        if submethods is None:
            self._conn.try_next_auth(next_method=True)
            return
        self.logger.debug1('Trying keyboard-interactive auth')
        await self.send_request(String(''), String(submethods))
    async def _receive_challenge(self, name: str, instruction: str, lang: str,
                                 prompts: KbdIntPrompts) -> None:
        """Receive and respond to a keyboard interactive challenge"""
        responses = \
            await self._conn.kbdint_challenge_received(name, instruction,
                                                       lang, prompts)
        if responses is None:
            self._conn.try_next_auth(next_method=True)
            return
        self.send_packet(MSG_USERAUTH_INFO_RESPONSE, UInt32(len(responses)),
                         b''.join(String(r) for r in responses),
                         trivial=not responses)
    def _process_info_request(self, _pkttype: int, _pktid: int,
                              packet: SSHPacket) -> None:
        """Process a keyboard interactive authentication request"""
        name_bytes = packet.get_string()
        instruction_bytes = packet.get_string()
        lang_bytes = packet.get_string()
        try:
            name = name_bytes.decode('utf-8')
            instruction = instruction_bytes.decode('utf-8')
            lang = lang_bytes.decode('ascii')
        except UnicodeDecodeError:
            raise ProtocolError('Invalid keyboard interactive '
                                'info request') from None
        num_prompts = packet.get_uint32()
        prompts = []
        for _ in range(num_prompts):
            prompt_bytes = packet.get_string()
            echo = packet.get_boolean()
            try:
                prompt = prompt_bytes.decode('utf-8')
            except UnicodeDecodeError:
                raise ProtocolError('Invalid keyboard interactive '
                                    'info request') from None
            prompts.append((prompt, echo))
        self.create_task(self._receive_challenge(name, instruction,
                                                 lang, prompts))
    _packet_handlers = {
        MSG_USERAUTH_INFO_REQUEST: _process_info_request
    }
class _ClientPasswordAuth(ClientAuth):
    """Client side implementation of password auth"""
    _handler_names = get_symbol_names(globals(), 'MSG_USERAUTH_PASSWD_')
    def __init__(self, conn: 'SSHClientConnection', method: bytes):
        super().__init__(conn, method)
        self._password_change = False
    async def _start(self) -> None:
        """Start client password authentication"""
        password = await self._conn.password_auth_requested()
        if password is None:
            self._conn.try_next_auth(next_method=True)
            return
        self.logger.debug1('Trying password auth')
        await self.send_request(Boolean(False), String(password),
                                trivial=False)
    async def _change_password(self, prompt: str, lang: str) -> None:
        """Start password change"""
        result = await self._conn.password_change_requested(prompt, lang)
        if result == NotImplemented:
            # Password change not supported - move on to the next auth method
            self._conn.try_next_auth(next_method=True)
            return
        self.logger.debug1('Trying to chsnge password')
        old_password, new_password = cast(PasswordChangeResponse, result)
        self._password_change = True
        await self.send_request(Boolean(True),
                                String(old_password.encode('utf-8')),
                                String(new_password.encode('utf-8')),
                                trivial=False)
    def auth_succeeded(self) -> None:
        if self._password_change:
            self._password_change = False
            self._conn.password_changed()
    def auth_failed(self) -> None:
        if self._password_change:
            self._password_change = False
            self._conn.password_change_failed()
    def _process_password_change(self, _pkttype: int, _pktid: int,
                                 packet: SSHPacket) -> None:
        """Process a password change request"""
        prompt_bytes = packet.get_string()
        lang_bytes = packet.get_string()
        try:
            prompt = prompt_bytes.decode('utf-8')
            lang = lang_bytes.decode('ascii')
        except UnicodeDecodeError:
            raise ProtocolError('Invalid password change request') from None
        self.auth_failed()
        self.create_task(self._change_password(prompt, lang))
    _packet_handlers = {
        MSG_USERAUTH_PASSWD_CHANGEREQ: _process_password_change
    }
class ServerAuth(Auth):
    """Parent class for server authentication"""
    _conn: 'SSHServerConnection'
    def __init__(self, conn: 'SSHServerConnection', username: str,
                 method: bytes, packet: SSHPacket):
        self._username = username
        self._method = method
        super().__init__(conn, self._start(packet))
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return whether this authentication method is supported"""
        raise NotImplementedError
    async def _start(self, packet: SSHPacket) -> None:
        """Abstract method for starting server authentication"""
        # Provided by subclass
        raise NotImplementedError
    def send_failure(self, partial_success: bool = False) -> None:
        """Send a user authentication failure response"""
        self._conn.send_userauth_failure(partial_success)
    async def send_success(self) -> None:
        """Send a user authentication success response"""
        await self._conn.send_userauth_success()
class _ServerNullAuth(ServerAuth):
    """Server side implementation of null auth"""
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return that null authentication is never a supported auth mode"""
        return False
    async def _start(self, packet: SSHPacket) -> None:
        """Supported always returns false, so we never get here"""
class _ServerGSSKexAuth(ServerAuth):
    """Server side implementation of GSS key exchange auth"""
    def __init__(self, conn: 'SSHServerConnection', username: str,
                 method: bytes, packet: SSHPacket):
        super().__init__(conn, username, method, packet)
        self._gss = conn.get_gss_context()
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return whether GSS key exchange authentication is supported"""
        return conn.gss_kex_auth_supported()
    async def _start(self, packet: SSHPacket) -> None:
        """Start server GSS key exchange authentication"""
        mic = packet.get_string()
        packet.check_end()
        self.logger.debug1('Trying GSS key exchange auth')
        data = self._conn.get_userauth_request_data(self._method)
        if (self._gss.complete and self._gss.verify(data, mic) and
                (await self._conn.validate_gss_principal(self._username,
                                                         self._gss.user,
                                                         self._gss.host))):
            await self.send_success()
        else:
            self.send_failure()
class _ServerGSSMICAuth(ServerAuth):
    """Server side implementation of GSS MIC auth"""
    _handler_names = get_symbol_names(globals(), 'MSG_USERAUTH_GSSAPI_')
    def __init__(self, conn: 'SSHServerConnection', username: str,
                 method: bytes, packet: SSHPacket) -> None:
        super().__init__(conn, username, method, packet)
        self._gss = conn.get_gss_context()
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return whether GSS MIC authentication is supported"""
        return conn.gss_mic_auth_supported()
    async def _start(self, packet: SSHPacket) -> None:
        """Start server GSS MIC authentication"""
        mechs = set()
        n = packet.get_uint32()
        for _ in range(n):
            mechs.add(packet.get_string())
        packet.check_end()
        match = None
        for mech in self._gss.mechs:
            if mech in mechs:
                match = mech
                break
        if not match:
            self.send_failure()
            return
        self.logger.debug1('Trying GSS MIC auth')
        self._gss.reset()
        self.send_packet(MSG_USERAUTH_GSSAPI_RESPONSE, String(match))
    async def _finish(self) -> None:
        """Finish server GSS MIC authentication"""
        if (await self._conn.validate_gss_principal(self._username,
                                                    self._gss.user,
                                                    self._gss.host)):
            await self.send_success()
        else:
            self.send_failure()
    async def _process_token(self, _pkttype: int, _pktid: int,
                             packet: SSHPacket) -> None:
        """Process a GSS token from the client"""
        token: Optional[bytes] = packet.get_string()
        packet.check_end()
        try:
            token = await run_in_executor(self._gss.step, token)
            if token:
                self.send_packet(MSG_USERAUTH_GSSAPI_TOKEN, String(token))
        except GSSError as exc:
            self.send_packet(MSG_USERAUTH_GSSAPI_ERROR, UInt32(exc.maj_code),
                             UInt32(exc.min_code), String(str(exc)),
                             String(DEFAULT_LANG))
            if exc.token:
                self.send_packet(MSG_USERAUTH_GSSAPI_ERRTOK, String(exc.token))
            self.send_failure()
    def _process_exchange_complete(self, _pkttype: int, _pktid: int,
                                   packet: SSHPacket) -> None:
        """Process a GSS exchange complete message from the client"""
        packet.check_end()
        if self._gss.complete and not self._gss.provides_integrity:
            self.create_task(self._finish())
        else:
            self.send_failure()
    async def _process_error_token(self, _pkttype: int, _pktid: int,
                                   packet: SSHPacket) -> None:
        """Process a GSS error token from the client"""
        token = packet.get_string()
        packet.check_end()
        try:
            await run_in_executor(self._gss.step, token)
        except GSSError as exc:
            self.logger.debug1('GSS error from client: %s', str(exc))
    def _process_mic(self, _pkttype: int, _pktid: int,
                     packet: SSHPacket) -> None:
        """Process a GSS MIC from the client"""
        mic = packet.get_string()
        packet.check_end()
        data = self._conn.get_userauth_request_data(self._method)
        if (self._gss.complete and self._gss.provides_integrity and
                self._gss.verify(data, mic)):
            self.create_task(self._finish())
        else:
            self.send_failure()
    _packet_handlers = {
        MSG_USERAUTH_GSSAPI_TOKEN:             _process_token,
        MSG_USERAUTH_GSSAPI_EXCHANGE_COMPLETE: _process_exchange_complete,
        MSG_USERAUTH_GSSAPI_ERRTOK:            _process_error_token,
        MSG_USERAUTH_GSSAPI_MIC:               _process_mic
    }
class _ServerHostBasedAuth(ServerAuth):
    """Server side implementation of host based auth"""
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return whether host based authentication is supported"""
        return conn.host_based_auth_supported()
    async def _start(self, packet: SSHPacket) -> None:
        """Start server host based authentication"""
        algorithm = packet.get_string()
        key_data = packet.get_string()
        client_host_bytes = packet.get_string()
        client_username_bytes = packet.get_string()
        msg = packet.get_consumed_payload()
        signature = packet.get_string()
        packet.check_end()
        try:
            client_host = client_host_bytes.decode('utf-8')
            client_username = saslprep(client_username_bytes.decode('utf-8'))
        except (UnicodeDecodeError, SASLPrepError):
            raise ProtocolError('Invalid host-based auth request') from None
        self.logger.debug1('Verifying host based auth of user %s '
                           'on host %s with %s host key', client_username,
                           client_host, algorithm)
        if (await self._conn.validate_host_based_auth(self._username,
                                                      key_data, client_host,
                                                      client_username,
                                                      msg, signature)):
            await self.send_success()
        else:
            self.send_failure()
class _ServerPublicKeyAuth(ServerAuth):
    """Server side implementation of public key auth"""
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return whether public key authentication is supported"""
        return conn.public_key_auth_supported()
    async def _start(self, packet: SSHPacket) -> None:
        """Start server public key authentication"""
        sig_present = packet.get_boolean()
        algorithm = packet.get_string()
        key_data = packet.get_string()
        if sig_present:
            msg = packet.get_consumed_payload()
            signature = packet.get_string()
        else:
            msg = b''
            signature = b''
        packet.check_end()
        if sig_present:
            self.logger.debug1('Verifying request with %s key', algorithm)
        else:
            self.logger.debug1('Trying public key auth with %s key', algorithm)
        if (await self._conn.validate_public_key(self._username, key_data,
                                                 msg, signature)):
            if sig_present:
                await self.send_success()
            else:
                self.send_packet(MSG_USERAUTH_PK_OK, String(algorithm),
                                 String(key_data))
        else:
            self.send_failure()
class _ServerKbdIntAuth(ServerAuth):
    """Server side implementation of keyboard-interactive auth"""
    _handler_names = get_symbol_names(globals(), 'MSG_USERAUTH_INFO_')
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return whether keyboard interactive authentication is supported"""
        return conn.kbdint_auth_supported()
    async def _start(self, packet: SSHPacket) -> None:
        """Start server keyboard interactive authentication"""
        lang_bytes = packet.get_string()
        submethods_bytes = packet.get_string()
        packet.check_end()
        try:
            lang = lang_bytes.decode('ascii')
            submethods = submethods_bytes.decode('utf-8')
        except UnicodeDecodeError:
            raise ProtocolError('Invalid keyboard interactive '
                                'auth request') from None
        self.logger.debug1('Trying keyboard-interactive auth')
        challenge = await self._conn.get_kbdint_challenge(self._username,
                                                          lang, submethods)
        await self._send_challenge(challenge)
    async def _send_challenge(self, challenge: KbdIntChallenge) -> None:
        """Send a keyboard interactive authentication request"""
        if isinstance(challenge, (tuple, list)):
            name, instruction, lang, prompts = challenge
            num_prompts = len(prompts)
            prompts_bytes = (String(prompt) + Boolean(echo)
                             for prompt, echo in prompts)
            self.send_packet(MSG_USERAUTH_INFO_REQUEST, String(name),
                             String(instruction), String(lang),
                             UInt32(num_prompts), *prompts_bytes)
        elif challenge:
            await self.send_success()
        else:
            self.send_failure()
    async def _validate_response(self, responses: KbdIntResponse) -> None:
        """Validate a keyboard interactive authentication response"""
        next_challenge = \
            await self._conn.validate_kbdint_response(self._username, responses)
        await self._send_challenge(next_challenge)
    def _process_info_response(self, _pkttype: int, _pktid: int,
                               packet: SSHPacket) -> None:
        """Process a keyboard interactive authentication response"""
        num_responses = packet.get_uint32()
        responses = []
        for _ in range(num_responses):
            response_bytes = packet.get_string()
            try:
                response = response_bytes.decode('utf-8')
            except UnicodeDecodeError:
                raise ProtocolError('Invalid keyboard interactive '
                                    'info response') from None
            responses.append(response)
        packet.check_end()
        self.create_task(self._validate_response(responses))
    _packet_handlers = {
        MSG_USERAUTH_INFO_RESPONSE: _process_info_response
    }
class _ServerPasswordAuth(ServerAuth):
    """Server side implementation of password auth"""
    @classmethod
    def supported(cls, conn: 'SSHServerConnection') -> bool:
        """Return whether password authentication is supported"""
        return conn.password_auth_supported()
    async def _start(self, packet: SSHPacket) -> None:
        """Start server password authentication"""
        password_change = packet.get_boolean()
        password_bytes = packet.get_string()
        new_password_bytes = packet.get_string() if password_change else b''
        packet.check_end()
        try:
            password = saslprep(password_bytes.decode('utf-8'))
            new_password = saslprep(new_password_bytes.decode('utf-8'))
        except (UnicodeDecodeError, SASLPrepError):
            raise ProtocolError('Invalid password auth request') from None
        try:
            if password_change:
                self.logger.debug1('Trying to chsnge password')
                result = await self._conn.change_password(self._username,
                                                          password,
                                                          new_password)
            else:
                self.logger.debug1('Trying password auth')
                result = \
                    await self._conn.validate_password(self._username, password)
            if result:
                await self.send_success()
            else:
                self.send_failure()
        except PasswordChangeRequired as exc:
            self.send_packet(MSG_USERAUTH_PASSWD_CHANGEREQ,
                             String(exc.prompt), String(exc.lang))
def register_auth_method(alg: bytes, client_handler: Type[ClientAuth],
        server_handler: Type[ServerAuth]) -> None:
    """Register an authentication method"""
    _auth_methods.append(alg)
    _client_auth_handlers[alg] = client_handler
    _server_auth_handlers[alg] = server_handler
def get_supported_client_auth_methods() -> Sequence[bytes]:
    """Return a list of supported client auth methods"""
    return [method for method in _client_auth_handlers
            if method != b'none']
def lookup_client_auth(conn: 'SSHClientConnection',
                       method: bytes) -> Optional[ClientAuth]:
    """Look up the client authentication method to use"""
    if method in _auth_methods:
        return _client_auth_handlers[method](conn, method)
    else:
        return None
def get_supported_server_auth_methods(conn: 'SSHServerConnection') -> \
        Sequence[bytes]:
    """Return a list of supported server auth methods"""
    auth_methods = []
    for method in _auth_methods:
        if _server_auth_handlers[method].supported(conn):
            auth_methods.append(method)
    return auth_methods
def lookup_server_auth(conn: 'SSHServerConnection', username: str,
                       method: bytes, packet: SSHPacket) -> \
        Optional[ServerAuth]:
    """Look up the server authentication method to use"""
    handler = _server_auth_handlers.get(method)
    if handler and handler.supported(conn):
        return handler(conn, username, method, packet)
    else:
        conn.send_userauth_failure(False)
        return None
_auth_method_list = (
    (b'none',                 _ClientNullAuth,      _ServerNullAuth),
    (b'gssapi-keyex',         _ClientGSSKexAuth,    _ServerGSSKexAuth),
    (b'gssapi-with-mic',      _ClientGSSMICAuth,    _ServerGSSMICAuth),
    (b'hostbased',            _ClientHostBasedAuth, _ServerHostBasedAuth),
    (b'publickey',            _ClientPublicKeyAuth, _ServerPublicKeyAuth),
    (b'keyboard-interactive', _ClientKbdIntAuth,    _ServerKbdIntAuth),
    (b'password',             _ClientPasswordAuth,  _ServerPasswordAuth)
)
for _args in _auth_method_list:
    register_auth_method(*_args)