HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: sport3497 (1034)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/scan/mds/scanner.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import base64
import json
import os
import time

from defence360agent.contracts.config import Malware, MalwareSignatures
from defence360agent.contracts.hook_events import HookEvent
from defence360agent.internals.global_scope import g
from defence360agent.utils import resource_limits
from imav.malwarelib.config import (
    AIBOLIT_SCAN_INTENSITY_KEY,
    MalwareScanResourceType,
)
from imav.malwarelib.model import MalwareIgnorePath
from imav.malwarelib.scan.mds import MDS_PATH
from imav.malwarelib.scan.mds.detached import MDSDetachedScanDir
from imav.malwarelib.utils import get_memory


class MalwareDatabaseScanner:
    def __init__(
        self,
        initial_path,
        paths,
        intensity_cpu,
        intensity_io,
        intensity_ram,
        scan_type,
        scan_id,
        **_,
    ):
        self.scan_id = scan_id
        self.initial_path = initial_path
        self.paths = paths
        self.intensity_cpu = intensity_cpu
        self.intensity_io = intensity_io
        self.intensity_ram = intensity_ram
        self.scan_type = scan_type

    def _cmd(self, work_dir):
        return [
            "/opt/ai-bolit/wrapper",
            MDS_PATH,
            "--scan",
            "--path",
            self.paths[0],  # TODO: use whole list of files
            "--avdb",
            MalwareSignatures.MDS_AI_BOLIT_HOSTER,
            "--report-file",
            str(work_dir.report_file),
            "--detached",
            self.scan_id,
            "--progress",
            str(work_dir.progress_file),
            "--ignore-list",
            str(work_dir.ignore_file),
            "--procudb",
            MalwareSignatures.MDS_PROCU_DB,
            "--memory",
            get_memory(self.intensity_ram),
            "--db-timeout",
            str(Malware.MDS_DB_TIMEOUT),
        ]

    async def scan(self, started=None):
        with MDSDetachedScanDir(self.scan_id) as work_dir:
            work_dir.ignore_file.write_bytes(
                b"\n".join(
                    base64.b64encode(os.fsencode(path))
                    for path in MalwareIgnorePath.path_list(
                        resource_type=MalwareScanResourceType.DB.value
                    )
                )
            )
            cmd = self._cmd(work_dir)
            scan_info = {
                "cmd": cmd,
                "initial_path": self.initial_path,
                "scan_type": self.scan_type,
            }
            with work_dir.scan_info_file.open(mode="w") as fp:
                json.dump(scan_info, fp)
            with work_dir.log_file.open(
                mode="w"
            ) as l_f, work_dir.err_file.open(mode="w") as e_f:
                scan_started_event = HookEvent.MalwareScanningStarted(
                    scan_id=self.scan_id,
                    scan_type=self.scan_type,
                    path=self.initial_path,
                    started=started or time.time(),
                )
                await g.sink.process_message(scan_started_event)

                await resource_limits.create_subprocess(
                    cmd,
                    intensity_cpu=self.intensity_cpu,
                    intensity_io=self.intensity_io,
                    start_new_session=True,
                    stdout=l_f,
                    stderr=e_f,
                    cwd=str(work_dir),
                    key=AIBOLIT_SCAN_INTENSITY_KEY[self.scan_type],
                )