HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: xnsbb3110 (1041)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //usr/local/CyberCP/lib64/python3.10/site-packages/anyio/__pycache__/pytest_plugin.cpython-310.pyc
o

�h�$�@s�UddlmZddlZddlZddlmZmZmZddlm	Z	m
Z
ddlmZm
Z
mZddlmZmZddlZddlZddlmZddlmZd	d
lmZmZd	dlmZd	dlmZejd
krfddl m!Z!da"de#d<da$de#d<da%dLdd�Z&e
dMdd��Z'dNd#d$�Z(ej)d%d&�dOd*d+��Z*ej)d%d,�dPd0d1��Z+ej)d%d,�dQd4d5��Z,ej-d6e�d7�dRd8d9��Z.ej-dSd:d;��Z/ej-dTd<d=��Z0Gd>d?�d?�Z1ej-d@dA�dUdBdC��Z2ej-d@dA�dUdDdE��Z3ej-dVdHdI��Z4ej-dWdJdK��Z5dS)X�)�annotationsN)�Callable�	Generator�Iterator)�	ExitStack�contextmanager)�isasyncgenfunction�iscoroutinefunction�ismethod)�Any�cast)�
SubRequest)�Exit�)�get_all_backends�get_async_backend)�iterate_exceptions)�
TestRunner)��)�ExceptionGroupzTestRunner | None�_current_runnerzExitStack | None�
_runner_stack�backend�object�return�tuple[str, dict[str, Any]]cCsft|t�r	|ifSt|t�r/t|�dkr/t|dt�r/t|dt�r/ttttttff|�Std��)N�rrz@anyio_backend must be either a string or tuple of (string, dict))�
isinstance�str�tuple�len�dictrr�	TypeError)r�r$�F/usr/local/CyberCP/lib/python3.10/site-packages/anyio/pytest_plugin.py�extract_backend_and_optionss
r&�backend_namer�backend_options�dict[str, Any]�Iterator[TestRunner]ccs��tdur.t|�}t�atj�d�dur"tj�|�}t�tjj	|�|p%i}t�
|�|��atd7aztVWtd8atsMtdusCJ�t�
�daadSdStd8atsctdus[J�t�
�daaw�Nr)rrrr�sniffio�current_async_library_cvar�get�set�callback�reset�
enter_context�create_test_runner�_runner_leases�close)r'r(�asynclib�tokenr$r$r%�
get_runner%s2����
r8�configr�NonecCs|�dd�dS)N�markerszManyio: mark the (coroutine function) test to be run asynchronously via anyio.)�addinivalue_line)r9r$r$r%�pytest_configureCs�r=T)�hookwrapper�
fixturedef�request�Generator[Any]c#s��d���fdd	�}|j�t��st��rNd|jvrN||_|j}d|jv�s.|jd
7_d|jv�s<|jd7_z
dVW�|_||_S�|_||_wdVS)
N�argsr�
anyio_backendr@r
�kwargsrc?s��|jrt��rt�j�t|j�ur�j�|j�}n�}t|�\}}�r(||d<�r.||d<t||��&}t|�rB|�	||�EdHn|�
||�VWd�dSWd�dS1s\wYdS)NrCr@)�instancer
�type�__self__�__func__�__get__r&r8r�run_asyncgen_fixture�run_fixture)rCr@rBrD�
local_funcr'r(�runner��func�has_backend_arg�has_request_argr$r%�wrapperLs(����"�z%pytest_fixture_setup.<locals>.wrapper�rC�r@)
rBrrCrr@r
rDrrr)rOrr	�fixturenames�argnames)r?r@rR�original_argnamer$rNr%�pytest_fixture_setupJs$�
�rX)�tryfirst�	collector�name�objcCsv|�||�r5t|d�r|jjn|}t|�r7|�d�}t|dd�}|s+tdd�|D��r9tj	�
d�|�dSdSdSdS)N�
hypothesis�anyio�
pytestmarkr$css�|]}|jdkVqdS)r^N)r[)�.0�markerr$r$r%�	<genexpr>�s�z,pytest_pycollect_makeitem.<locals>.<genexpr>rC)�istestfunction�hasattrr]�
inner_testr	�get_closest_marker�getattr�any�pytest�mark�usefixtures)rZr[r\�
inner_funcra�own_markersr$r$r%�pytest_pycollect_makeitem}s
�rn�
pyfuncitem�bool | Nonecsd���fdd�}|j�d�}|r�t|�\��t|jd�r3|jjj��j|jkr1t��r1||jj_dSt|j�r�|j��fd	d
�|j	j
D�}t����2}z	|�|j|�Wnt
yt}zt|�D]
}t|tttf�rn||�qa�d}~wwWd�dS1s�wYdSdS)
NrDrrr:cs<t����}|��|�Wd�dS1swYdS�N)r8�run_test)rDrM)r'r(�
original_funcr$r%�run_with_hypothesis�s"�z/pytest_pyfunc_call.<locals>.run_with_hypothesisrCr]csi|]}|�|�qSr$r$)r`�arg)�funcargsr$r%�
<dictcomp>�sz&pytest_pyfunc_call.<locals>.<dictcomp>T)rDrrr:)rvr.r&rdr\r]re�__qualname__r	�_fixtureinforVr8rrrrrr�KeyboardInterrupt�
SystemExit)rortr�testargsrM�excgrp�excr$)r'r(rvrsr%�pytest_pyfunc_call�s<


����
�
�
r�module)�scope�paramscC�|jSrq)�paramrTr$r$r%rC��rCcCst|t�r|S|dS)Nr�rrrSr$r$r%�anyio_backend_name��
r�cCst|t�riS|dSr+r�rSr$r$r%�anyio_backend_options�r�r�c@s4eZdZdZddd�Zeddd	��Zdddd�Zd
S)�FreePortFactoryaO
    Manages port generation based on specified socket kind, ensuring no duplicate
    ports are generated.

    This class provides functionality for generating available free ports on the
    system. It is initialized with a specific socket kind and can generate ports
    for given address families while avoiding reuse of previously generated ports.

    Users should not instantiate this class directly, but use the
    ``free_tcp_port_factory`` and ``free_udp_port_factory`` fixtures instead. For simple
    uses cases, ``free_tcp_port`` and ``free_udp_port`` can be used instead.
    �kind�socket.SocketKindrr:cCs||_tt�|_dSrq)�_kindr/�int�
_generated)�selfr�r$r$r%�__init__�szFreePortFactory.__init__cCr�)z�
        The type of socket connection (e.g., :data:`~socket.SOCK_STREAM` or
        :data:`~socket.SOCK_DGRAM`) used to bind for checking port availability

        )r�)r�r$r$r%r��szFreePortFactory.kindN�family�socket.AddressFamily | Noner�c
Cs�|dur|g}n
tjg}tjr|�tj�	d}t��P}|D]1}|�t�||j��}|tjkr1dnd}z	|�||f�Wn
t	yFYnw|sO|�
�d}q||jvrd|j�|�|Wd�SWd�n1snwYq)z�
        Return an unbound port for the given address family.

        :param family: if omitted, both IPv4 and IPv6 addresses will be tried
        :return: a port number

        NTrz::1z	127.0.0.1r)
�socket�AF_INET�has_ipv6�append�AF_INET6rr2r��bind�OSError�getsocknamer��add)r�r��families�port�stack�sock�addrr$r$r%�__call__�s4��
���zFreePortFactory.__call__)r�r�rr:)rr�rq)r�r�rr�)�__name__�
__module__rx�__doc__r��propertyr�r�r$r$r$r%r��s

r��session)r�cC�
ttj�Srq)r�r��SOCK_STREAMr$r$r$r%�free_tcp_port_factory��
r�cCr�rq)r�r��
SOCK_DGRAMr$r$r$r%�free_udp_port_factoryr�r��Callable[[], int]r�cC�|�Srqr$)r�r$r$r%�
free_tcp_port	r�r�cCr�rqr$)r�r$r$r%�
free_udp_portr�r�)rrrr)r'rr(r)rr*)r9rrr:)r?rr@rrrA)rZrr[rr\rrr:)rorrrp)r@rrr)rCrrr)rCrrr))rr�)r�r�rr�)r�r�rr�)6�
__future__rr��sys�collections.abcrrr�
contextlibrr�inspectrr	r
�typingrrrir,�_pytest.fixturesr
�_pytest.outcomesr�_core._eventlooprr�_core._exceptionsr�abcr�version_info�exceptiongrouprr�__annotations__rr4r&r8r=�hookimplrXrnr�fixturerCr�r�r�r�r�r�r�r$r$r$r%�<module>sX





2

$
=