HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: sport3497 (1034)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/utils/common.py
import asyncio
import datetime
import functools
import logging
import socket
import time
import re
import os
import sys

MINUTE = datetime.timedelta(minutes=1).total_seconds()
HOUR = datetime.timedelta(hours=1).total_seconds()
DAY = datetime.timedelta(days=1).total_seconds()
WEEK = datetime.timedelta(weeks=1).total_seconds()

logger = logging.getLogger(__name__)


class ServiceBase(object):
    """Base service class."""

    def __init__(self, loop):
        self._loop = loop
        self._should_stop = False
        self._main_task = None
        self._state = self.StoppedState(self)

    def start(self):
        return self._state.start()

    def should_stop(self):
        return self._state.should_stop()

    async def wait(self):
        return await self._state.wait()

    def is_running(self):
        return self._state.is_running()

    async def _run(self):
        raise NotImplementedError

    class State(object):
        def __init__(self, obj):
            """:type obj: ServiceBase"""
            self._obj = obj

        def start(self):
            pass

        def should_stop(self):
            pass

        async def wait(self):
            task = self._obj._main_task
            if task:
                await task

        def is_running(self):
            return False

    class StoppedState(State):
        def _on_stop(self, future):
            self._obj._state = ServiceBase.StoppedState(self._obj)
            self._obj._should_stop = False

        def start(self):
            obj = self._obj
            obj._main_task = obj._loop.create_task(obj._run())
            obj._main_task.add_done_callback(self._on_stop)
            obj._state = ServiceBase.RunningState(obj)

    class RunningState(State):
        def should_stop(self):
            obj = self._obj
            obj._should_stop = True
            obj._main_task.cancel()
            obj._state = ServiceBase.StoppingState(obj)

        def is_running(self):
            return True

    class StoppingState(State):
        def start(self):
            raise ProgrammingError(
                "Cannot start stopping service. Please wait while it stop."
            )


class ProgrammingError(Exception):
    pass


class RateLimit:
    """Decorator to limit function calls to one per *period* seconds.

    If less than *period* seconds have passed since the last call,
    then the request to call the function is replace with an *on_drop*
    call with the same arguments.

    If *on_drop* is None [default] then the call is just dropped

    """

    def __init__(self, period, timer=time.monotonic, *, on_drop=None):
        self._next_call_time = None
        self._period = period
        self._timer = timer
        self._on_drop = on_drop

    @property
    def should_be_called(self):
        return (
            self._next_call_time is None
            or self._next_call_time <= self._timer()
        )

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            if self.should_be_called:
                self._next_call_time = self._timer() + self._period
                return func(*args, **kwargs)
            elif self._on_drop is not None:
                return self._on_drop(*args, **kwargs)

        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            if self.should_be_called:
                self._next_call_time = self._timer() + self._period
                return await func(*args, **kwargs)
            elif self._on_drop is not None:
                return self._on_drop(*args, **kwargs)

        return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper


rate_limit = RateLimit


class CoalesceCalls:
    def __init__(self):
        self.call_time = float("-inf")
        self.delayed_call = None

    def coalesce_calls(self, period, *, done_callback=None):
        """
        Decorator to coalesce coroutine calls to one per *period* seconds.

        Requests for a coroutine call in a given time period are coalesced:
        If t is the time of the last call, then N call requests in the [t,
        t+period) time interval results in a single call at the
        t+period time iff N>0 i.e.,

        if less than *period* seconds have passed since the last call,
        then the calls are coalesced: (N-1) requests are dropped, Nth
        requests is performed in *period* seconds.

        It is unspecified which exact call is made if arguments differ.

        If the call is not dropped then *done_callback* is attached
        to the task when the coroutine is scheduled with the event loop.

        Given `c` is the time of the last [actual] call (`loop.create_task()`)
        And `T` is the coalesce time period
        When a call request arrives at `t` time
        Then
        | call pending?  | t>c+T                          | c<=t<=c+T  | t<c  |
        |----------------+--------------------------------+------------+------|
        | no p. call     | call soon                      | call at c+T| warn |
        | p. call at c+T | cancel the call/warn, call soon| drop call  | warn |
        """

        def decorator(coro):
            @functools.wraps(coro)
            async def wrapper(*args, **kwargs):
                loop = kwargs.get("loop")
                if loop is None:
                    loop = asyncio.get_event_loop()

                if args or kwargs:
                    args_repr = "*%r, **%r" % (args, kwargs)
                else:  # special case no args case
                    args_repr = ""
                call_repr = "%s(%s)" % (coro.__name__, args_repr)

                def log_exception(task):
                    """Log task's error
                       if any with event's loop exception handler.

                    CancelledError is not logged.
                    """
                    if not task.cancelled() and task.exception() is not None:
                        loop.call_exception_handler(
                            {
                                "message": "Unhandled exception during "
                                + call_repr,
                                "exception": task.exception(),
                                "task": task,
                            }
                        )

                def call_delayed(coro, args, kwargs):
                    """Call & schedule the delayed coroutine now."""
                    logger.info("Schedule call %s", call_repr)
                    self.call_time = loop.time()
                    self.delayed_call = None
                    task = loop.create_task(coro(*args, **kwargs))
                    task.add_done_callback(
                        log_exception
                        if done_callback is None
                        else done_callback
                    )

                now = loop.time()
                if now > (self.call_time + period):  # call immediately
                    if self.delayed_call is not None:
                        # get string representation for logs
                        #   before cancelling the call
                        old_delayed_call_repr = str(self.delayed_call)
                        self.delayed_call.cancel()
                        self.delayed_call = None
                        logger.warning(
                            "There was a scheduled call (%s)"
                            " but more than period (%r) seconds passed"
                            " since the last call (%r, now=%r)",
                            old_delayed_call_repr,
                            period,
                            self.call_time,
                            now,
                        )
                    logger.info(
                        "Satisfy the call request soon: %s. No calls in"
                        " more than %r seconds since the start",
                        call_repr,
                        period,
                    )
                    self.delayed_call = loop.call_soon(
                        call_delayed, coro, args, kwargs
                    )
                elif self.call_time <= now <= (self.call_time + period):
                    delay = (self.call_time + period) - now
                    if self.delayed_call is not None:  # drop call request
                        logger.info(
                            "Drop call request for %s"
                            ", enforcing one call per %r seconds limit"
                            ". Next call is in ~%.2f seconds",
                            call_repr,
                            period,
                            delay,
                        )
                    else:  # schedule call request
                        assert self.delayed_call is None
                        logger.info(
                            "Delay call request: %s for ~%.2f seconds"
                            ". Enforcing one call per %r seconds limit",
                            call_repr,
                            delay,
                            period,
                        )
                        self.delayed_call = loop.call_at(
                            self.call_time + period,
                            call_delayed,
                            coro,
                            args,
                            kwargs,
                        )
                else:  # now < call_time
                    logger.warning(
                        "Drop call request for %s, reason: last call time"
                        " (%r, now=%r) is in the future",
                        call_repr,
                        self.call_time,
                        now,
                    )

            return wrapper

        return decorator


webserver_gracefull_restart = CoalesceCalls()


def get_hostname():
    """Returns readable name of the server.

    It is sent to CLN and allows user to sort out his servers.
    """
    hostname = socket.getfqdn()
    if hostname is None or hostname.lower().startswith("localhost"):
        return socket.gethostname()
    return hostname


# Everything from there is copied from setuptools package


# Copied from setuptools/_distutils/version.py
class Version:
    """Abstract base class for version numbering classes.  Just provides
    constructor (__init__) and reproducer (__repr__), because those
    seem to be the same for all version numbering classes; and route
    rich comparisons to _cmp.
    """

    def __init__(self, vstring=None):
        if vstring:
            self.parse(vstring)

    def __repr__(self):
        return "{} ('{}')".format(self.__class__.__name__, str(self))

    def __eq__(self, other):
        c = self._cmp(other)
        if c is NotImplemented:
            return c
        return c == 0

    def __lt__(self, other):
        c = self._cmp(other)
        if c is NotImplemented:
            return c
        return c < 0

    def __le__(self, other):
        c = self._cmp(other)
        if c is NotImplemented:
            return c
        return c <= 0

    def __gt__(self, other):
        c = self._cmp(other)
        if c is NotImplemented:
            return c
        return c > 0

    def __ge__(self, other):
        c = self._cmp(other)
        if c is NotImplemented:
            return c
        return c >= 0


# Copied from setuptools/_distutils/version.py
class LooseVersion(Version):

    """Version numbering for anarchists and software realists.
    Implements the standard interface for version number classes as
    described above.  A version number consists of a series of numbers,
    separated by either periods or strings of letters.  When comparing
    version numbers, the numeric components will be compared
    numerically, and the alphabetic components lexically.  The following
    are all valid version numbers, in no particular order:

        1.5.1
        1.5.2b2
        161
        3.10a
        8.02
        3.4j
        1996.07.12
        3.2.pl0
        3.1.1.6
        2g6
        11g
        0.960923
        2.2beta29
        1.13++
        5.5.kw
        2.0b1pl0

    In fact, there is no such thing as an invalid version number under
    this scheme; the rules for comparison are simple and predictable,
    but may not always give the results you want (for some definition
    of "want").
    """

    component_re = re.compile(r"(\d+ | [a-z]+ | \.)", re.VERBOSE)

    def parse(self, vstring):
        # I've given up on thinking I can reconstruct the version string
        # from the parsed tuple -- so I just store the string here for
        # use by __str__
        self.vstring = vstring
        components = [
            x for x in self.component_re.split(vstring) if x and x != "."
        ]
        for i, obj in enumerate(components):
            try:
                components[i] = int(obj)
            except ValueError:
                pass

        self.version = components

    def __str__(self):
        return self.vstring

    def __repr__(self):
        return "LooseVersion ('%s')" % str(self)

    def _cmp(self, other):
        if isinstance(other, str):
            other = LooseVersion(other)
        elif not isinstance(other, LooseVersion):
            return NotImplemented

        if self.version == other.version:
            return 0
        if self.version < other.version:
            return -1
        if self.version > other.version:
            return 1


# Copied from setuptools/_distutils/spawn.py
def find_executable(executable, path=None):
    """Tries to find 'executable' in the directories listed in 'path'.

    A string listing directories separated by 'os.pathsep'; defaults to
    os.environ['PATH'].  Returns the complete filename or None if not found.
    """
    _, ext = os.path.splitext(executable)
    if (sys.platform == "win32") and (ext != ".exe"):
        executable = executable + ".exe"

    if os.path.isfile(executable):
        return executable

    if path is None:
        path = os.environ.get("PATH", None)
        if path is None:
            try:
                path = os.confstr("CS_PATH")
            except (AttributeError, ValueError):
                # os.confstr() or CS_PATH is not available
                path = os.defpath
        # bpo-35755: Don't use os.defpath if the PATH environment variable is
        # set to an empty string

    # PATH='' doesn't match, whereas PATH=':' looks in the current directory
    if not path:
        return None

    paths = path.split(os.pathsep)
    for p in paths:
        f = os.path.join(p, executable)
        if os.path.isfile(f):
            # the file exists, we have a shot at spawn working
            return f
    return None