HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: sport3497 (1034)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/thread-self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/migrate.py
#!/opt/imunify360/venv/bin/python3
"""This module import peewee_migrate and apply migrations, for Imunify-AV
it's entrypoint for service"""

import contextlib
import os
import sys
import signal
import threading
import time

from collections.abc import Iterable
from logging import getLogger

from peewee_migrate import migrator
from playhouse.sqlite_ext import SqliteExtDatabase

import defence360agent.internals.logger
from defence360agent.application import app
from defence360agent.application.settings import configure
from defence360agent.contracts.config import Core
from defence360agent.contracts.config import Model
from defence360agent.router import Router
from defence360agent.subsys import systemd_notifier
from defence360agent.model.instance import db as db_instance
from defence360agent.model import tls_check
from defence360agent.utils import (
    write_pid_file,
    IM360_RESIDENT_PID_PATH,
    cleanup_pid_file,
)
from defence360agent.utils.check_db import (
    recreate_schema_models,
)

logger = getLogger(__name__)

GO_SERVICE_NAME = "/usr/bin/imunify-resident"


@contextlib.contextmanager
def exc_handler(log_msg: str, reraise: bool):
    """
    Logs error in case of exception.
    Depending on `reraise`:
    - re-raise exception and don't include exception info in the log operation
    - do not re-raise exception and include exception info in the log operation
    """

    try:
        yield
    except Exception:
        logger.error(log_msg, exc_info=not reraise)
        if reraise:
            raise


def apply_migrations(db: SqliteExtDatabase, migrations_dirs: Iterable[str]):
    """Apply migrations: restructure db, config files, etc."""

    router = Router(
        db,
        migrations_dirs=migrations_dirs,
        logger=logger,
    )
    # HACK: Migrator uses global unconfigurable LOGGER,
    # overrride it, to use our logging settings
    migrator.LOGGER = logger
    router.run()


def prepare_databases(
    migrations_dirs: Iterable[str],
    attached_dbs: tuple[tuple[str, str], ...] = tuple(),
):
    """
    Apply migrations and recreate attached databases.

    The workflow:
    1. Apply migrations
    2. Regardless whether the migrations were applied - recreate attached databases
    3. If the recreation of the attached databases was successful - apply migrations again
        - this is done to verify that migrations will successfully apply in future for the recreated databases
        - the recreation + the migrations in this step are within the same transaction,
          so databases will only be recreated if the migrations can applied after the recreation.
    """

    # prepare database to operate in WAL journal_mode and run migrations
    tls_check.reset()
    db_instance.init(Model.PATH)
    attached_schemas = []
    for db_path, schema_name in attached_dbs:
        db_instance.execute_sql("ATTACH ? AS ?", (db_path, schema_name))
        attached_schemas.append(schema_name)

    try:
        logger.info("Applying database migrations...")
        systemd_notifier.notify(systemd_notifier.AgentState.MIGRATING)
        with db_instance.atomic("EXCLUSIVE"), exc_handler(
            "Error applying migrations", reraise=False
        ):
            apply_migrations(db_instance, migrations_dirs)

        logger.info("Recreating attached databases...")
        with db_instance.atomic("EXCLUSIVE"), exc_handler(
            "Error recreating attached databases", reraise=True
        ):
            # Migration history is stored in main db, so to automatically recreate
            # attached dbs it is required to recreate schema for them from models
            recreate_schema_models(db_instance, attached_schemas)

            # verify migrations can be applied after the attached dbs recreation
            with exc_handler(
                "Error applying migrations after recreating attached"
                " databases",
                reraise=True,
            ):
                apply_migrations(db_instance, migrations_dirs)
    finally:
        # close connection immediately since later this process
        # will be replaced by execv
        db_instance.close()


# required in case package manager or user sends signals while migrations are still running
def signal_handler(sig, _):
    logger.warning("Received signal %s in signal_handler", sig)
    logger.warning(
        "waiting %d seconds so that migrations can finish",
        Core.SIGNAL_HANDLER_MIGRATION_TIMEOUT_SECS,
    )
    time.sleep(Core.SIGNAL_HANDLER_MIGRATION_TIMEOUT_SECS)
    logger.info("Exiting")
    sys.exit(0)


def run(*, start_pkg="defence360agent", configure=configure):
    """Entry point for Imunify-AV service. Apply migrations,
    and then replace process with {start_pkg}.run module."""

    for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP):
        signal.signal(sig, signal_handler)
    try:
        if start_pkg == "im360.run_resident":
            write_pid_file(IM360_RESIDENT_PID_PATH)
        os.umask(Core.FILE_UMASK)
        configure()
        defence360agent.internals.logger.reconfigure()
        migration_thread = threading.Thread(
            target=prepare_databases,
            args=(app.MIGRATIONS_DIRS, app.MIGRATIONS_ATTACHED_DBS),
        )
        migration_thread.start()
        migration_thread.join()

        systemd_notifier.notify(systemd_notifier.AgentState.READY)
        logger.info("Starting main process...")
        systemd_notifier.notify(systemd_notifier.AgentState.STARTING)

        if start_pkg == "im360.run_resident":
            Core.GO_FLAG_FILE.touch(exist_ok=True)
            logger.info("Run imunify-resident service")
            os.execv(
                GO_SERVICE_NAME,
                [
                    GO_SERVICE_NAME,
                ]
                + sys.argv[1:],
            )
        else:
            os.execv(
                sys.executable,
                [sys.executable, "-m", "{}".format(start_pkg)] + sys.argv[1:],
            )
    except Exception:
        if start_pkg == "im360.run_resident":
            cleanup_pid_file(IM360_RESIDENT_PID_PATH)


if __name__ == "__main__":
    run()