HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: sport3497 (1034)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/scan/ai_bolit/report.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import base64
import binascii
import csv
import os
from collections import namedtuple
from contextlib import suppress
from pathlib import Path
from time import time

from . import AIBOLIT

(
    SUSPICIOUS,
    VULNERS,
    EXTENDED_SUSPICIOUS,
    IGNORED_SUSPICIOUS,
) = (
    "suspicious",
    "vulners",
    "extended-suspicious",
    "ignored-suspicious",
)
SECTIONS = {
    "p": "php_malware",
    "j": "js_malware",
    "c": "cloudhash",
    "s": SUSPICIOUS,
    "v": VULNERS,
    "es": EXTENDED_SUSPICIOUS,
    "is": IGNORED_SUSPICIOUS,
}

SUSPICIOUS_SECTIONS = {
    SUSPICIOUS,
    VULNERS,
    EXTENDED_SUSPICIOUS,
    IGNORED_SUSPICIOUS,
}

AiBolitCSVReport = namedtuple(
    "AiBolitCSVReport",
    [
        "section",
        "path",
        "signature",
        "ctime",
        "mtime",
        "size",
        "etime",
        "signature_id",
        "hash",
        "signature_name",
        "sha256",
    ],
)


def parse_report_csv(report_path: Path):
    with report_path.open(newline="") as report_stream:
        for raw_row in csv.reader(report_stream, delimiter=","):
            row = AiBolitCSVReport(*raw_row)
            try:
                section = SECTIONS[row.section]
            except KeyError:
                continue
            sig = row.signature_name or "{}.{}".format(
                section, row.signature_id
            )
            timestamp = (
                int(float(row.etime)) if row.section != "v" else int(time())
            )
            file_name = row.path
            with suppress(binascii.Error):
                file_name = base64.b64decode(file_name, validate=True)
            file_name = os.fsdecode(file_name)
            yield {
                "name": AIBOLIT,
                "file_name": file_name,
                "signature": sig,
                "ctime": int(row.ctime),
                "modification_time": int(row.mtime),
                "suspicious": section in SUSPICIOUS_SECTIONS,
                "size": int(row.size or 0),
                "hash": row.sha256 or row.hash or None,
                "timestamp": timestamp,
                "extended_suspicious": section == EXTENDED_SUSPICIOUS,
            }


def parse_report_json(report, base64_path=True):
    for section in SECTIONS.values():
        for hit in report.get(section, []):
            sig = hit.get("sn") or ".".join([section, str(hit["sigid"])])
            # vulners section does not provide timestamp ('et' field)
            #  so current time is used instead.
            #  'et' - time when the file was scanned
            timestamp = (
                int(float(hit["et"])) if section != "vulners" else int(time())
            )
            file_name = hit["fn"]
            if base64_path:
                with suppress(binascii.Error):
                    file_name = base64.b64decode(file_name, validate=True)
            file_name = os.fsdecode(file_name)
            yield {
                "name": AIBOLIT,
                "file_name": file_name,
                "signature": sig,
                "suspicious": section in SUSPICIOUS_SECTIONS,
                "size": hit["sz"],
                "ctime": hit["ct"],
                "modification_time": hit["mt"],
                # 'hash' field is still used in 'cloudhash' section
                "hash": hit.get("sha256", hit.get("hash")),
                "timestamp": timestamp,
                "extended_suspicious": section == EXTENDED_SUSPICIOUS,
            }