File: //proc/thread-self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/contracts/messages.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
from copy import deepcopy
from typing import List, Optional, Set
from defence360agent.contracts.messages import (
    Accumulatable,
    Ack,
    FilesUpdated,
    Lockable,
    Message,
    Noop,
    Received,
    Reportable,
    ReportTarget,
    ShortenReprListMixin,
    Splittable,
)
from imav.malwarelib.config import QueuedScanState
from imav.malwarelib.scan.mds.report import (
    MalwareDatabaseHitInfo,
    MalwareDatabaseScanReport,
)
class MalwareScanTask(Message):
    """
    Creates task to scan file for malware
    """
    DEFAULT_METHOD = "MALWARE_SCAN_TASK"
class MalwareScanComplete(Message):
    DEFAULT_METHOD = "MALWARE_SCAN_COMPLETE"
class MalwareCleanComplete(Message):
    DEFAULT_METHOD = "MALWARE_CLEAN_COMPLETE"
class MalwareRestoreComplete(Message):
    DEFAULT_METHOD = "MALWARE_RESTORE_COMPLETE"
class MalwareScanSummary(ShortenReprListMixin, Message, Reportable):
    DEFAULT_METHOD = "MALWARE_SCAN_SUMMARY"
    def __init__(self, *args, items, **kwargs):
        # keep the summary only without results
        items = [{"summary": item["summary"]} for item in items]
        super().__init__(*args, items=items, **kwargs)
class MalwareScanResult(ShortenReprListMixin, Message, Reportable, Splittable):
    DEFAULT_METHOD = "MALWARE_SCAN_RESULT"
    LIST_SIZE = 5
    BATCH_SIZE = 1000
    BATCH_FIELD = "results"
    def __init__(self, *args, items, **kwargs):
        items = [
            {
                # keep summary fields necessary for storing results only
                "summary": {
                    key: value
                    for key, value in item["summary"].items()
                    if key in {"scanid", "type", "args"}
                },
                # keep results
                "results": item["results"],
            }
            for item in items
        ]
        super().__init__(*args, items=items, **kwargs)
class MalwareScan(Accumulatable, Lockable):
    """
    Represents results of single scan
    """
    DEFAULT_METHOD = "MALWARE_SCAN"
    LIST_CLASS = (MalwareScanSummary, MalwareScanResult)
    def do_accumulate(self) -> bool:
        results = self.get("results")
        return results is not None
class MalwareMRSUpload(Message):
    """
    Used to isolate a possibly long running uploading to MRS from the
    other MalwareScan handlers.
    """
    DEFAULT_METHOD = "MALWARE_MRS_UPLOAD"
class MalwareScanQueue(Message):
    DEFAULT_METHOD = "MALWARE_SCAN_QUEUE"
class MalwareScanQueuePut(MalwareScanQueue):
    def __init__(self, *, paths, scan_args):
        super().__init__(paths=paths, scan_args=scan_args)
class MalwareScanQueueRemove(MalwareScanQueue):
    def __init__(self, *, scan_ids):
        super().__init__(scan_ids=scan_ids)
class MalwareScanQueueRecheck(MalwareScanQueue):
    def __init__(self):
        super().__init__()
class MalwareScanQueueStopBackground(MalwareScanQueue):
    def __init__(self):
        super().__init__()
class MalwareScanQueueUpdateStatus(MalwareScanQueue):
    def __init__(
        self, *, scan_ids: List[str] = None, status: QueuedScanState = None
    ):
        super().__init__(scan_ids=scan_ids, status=status)
class MalwareResponse(Message, Received):
    TYPES = BLACK, WHITE, KNOWN, UNKNOWN = "BLACK", "WHITE", "KNOWN", "UNKNOWN"
    DEFAULT_METHOD = "MALWARE_RESPONSE"
class MalwareSendFiles(Message, Received):
    DEFAULT_METHOD = "MALWARE_SEND_FILES"
class MalwareRescanFiles(Message, Received):
    DEFAULT_METHOD = "MALWARE_RESCAN_FILES"
class MalwareCleanupTask(Message):
    """
    Creates task to cleanup files
    """
    DEFAULT_METHOD = "MALWARE_CLEANUP_TASK"
    def __init__(
        self,
        *,
        hits,
        standard_only=None,
        cause=None,
        initiator=None,
        scan_id=None,
        post_action=None,
    ):
        super().__init__(
            hits=hits,
            standard_only=standard_only,
            cause=cause,
            initiator=initiator,
            scan_id=scan_id,
            post_action=post_action,
        )
class MalwareCleanupList(
    ShortenReprListMixin, Message, Reportable, Splittable
):
    DEFAULT_METHOD = "MALWARE_CLEANUP_LIST"
    LIST_SIZE = 5
    BATCH_SIZE = 1000
    BATCH_FIELD = "hits"
    @classmethod
    def _split_items(cls, messages: List[Accumulatable]):
        for message in super()._split_items(messages):
            result = deepcopy(message.get("result", {}))
            if result and isinstance(result, dict):
                current_paths = {
                    hit.orig_file for hit in message.get("hits", [])
                }
                # after splitting messages, leave only the actual result
                # to avoid duplicates on the correlation server
                extra_paths = result.keys() - current_paths
                for path in extra_paths:
                    result.pop(path)
                message["result"] = result
            yield message
class MalwareCleanup(Accumulatable):
    """
    Represents results of single cleanup
    """
    DEFAULT_METHOD = "MALWARE_CLEANUP"
    LIST_CLASS = MalwareCleanupList
class MalwareIgnorePathUpdated(Message):
    """Signal through a message bus that MalwareIgnorePath has been updated."""
    pass
class MalwareDatabaseScan(Message):
    """
    Represents results of a single MDS scan
    """
    args: Optional[List[str]]
    path: str
    scan_id: str
    type: Optional[str]
    error: Optional[str]
    started: int
    completed: int
    total_resources: int
    total_malicious: int
    hits: Set[MalwareDatabaseHitInfo]
    initiator: Optional[str]
    def __init__(
        self,
        *,
        args: Optional[List[str]],
        path: Optional[str],
        scan_id: str,
        type: Optional[str],
        started: int = 0,
        completed: int = 0,
        total_resources: int = 0,
        total_malicious: int = 0,
        hits: Optional[Set[MalwareDatabaseHitInfo]] = None,
        error: Optional[str] = None,
        initiator: Optional[str] = None,
    ):
        super().__init__(
            args=args,
            path=path,
            scan_id=scan_id,
            type=type,
            started=started,
            completed=completed,
            total_resources=total_resources,
            total_malicious=total_malicious,
            hits=hits,
            error=error,
            initiator=initiator,
        )
    def update_with_report(self, report: MalwareDatabaseScanReport):
        self.update(
            started=report.started,
            completed=report.completed,
            total_resources=report.total_resources,
            total_malicious=report.total_malicious,
            hits=report.hits,
        )
    def update_with_error(self, message):
        return self.update(error=message)
    def __setitem__(self, key, value):
        raise NotImplementedError
    def __delitem__(self, key):
        raise NotImplementedError
class MalwareDatabaseRestore(Message):
    """
    Represents results of a single MDS restore
    """
class MalwareDatabaseRestoreTask(Message):
    """
    Represents a single MDS restore task
    """
    path: str
    app_name: str
    signature_id: str | None
    def __init__(self, path: str, app_name: str, signature_id: str | None):
        super().__init__(
            path=path, app_name=app_name, signature_id=signature_id
        )
class MalwareDatabaseCleanup(Message):
    """
    Represents results of a single mds cleanup
    """
    succeeded: Set[MalwareDatabaseHitInfo]
    failed: Set[MalwareDatabaseHitInfo]
    def __init__(
        self,
        succeeded: Set[MalwareDatabaseHitInfo],
        failed: Set[MalwareDatabaseHitInfo],
    ):
        super().__init__(
            succeeded=succeeded,
            failed=failed,
        )
class MalwareDatabaseCleanupFailed(Message):
    """
    Signifies an MDS cleanup failure
    """
    error: str
    def __init__(self, *, error: str):
        super().__init__(error=error)
class MalwareDatabaseRestoreFailed(Message):
    """
    Signifies an MDS restore failure
    """
    error: str
    def __init__(self, *, error: str):
        super().__init__(error=error)
class RefreshImunifyPatchSubscription(Message):
    """Refresh Imunify Patch user subscription info."""
class MalwareCleanupRevertList(ShortenReprListMixin, Message, Reportable):
    DEFAULT_METHOD = "MALWARE_CLEANUP_REVERT_LIST"
class MalwareCleanupRevert(Accumulatable):
    LIST_CLASS = MalwareCleanupRevertList
class CheckDetachedScans(Message):
    DEFAULT_METHOD = "MALWARE_CHECK_DETACHED_SCANS"
class VulnerabilityPatchTask(Message):
    DEFAULT_METHOD = "VULNERABILITY_PATCH_TASK"
class VulnerabilityPatchList(ShortenReprListMixin, Message, Reportable):
    DEFAULT_METHOD = "MALWARE_CLEANUP_LIST"
class VulnerabilityPatch(Accumulatable):
    DEFAULT_METHOD = "MALWARE_CLEANUP"
    LIST_CLASS = VulnerabilityPatchList
class VulnerabilityPatchRevertList(ShortenReprListMixin, Message, Reportable):
    DEFAULT_METHOD = "MALWARE_CLEANUP_REVERT_LIST"
class VulnerabilityPatchRevert(Accumulatable):
    LIST_CLASS = VulnerabilityPatchRevertList
class VulnerabilityPatchFailed(Message, Reportable):
    DEFAULT_METHOD = "CLEANUP_FAILED"
class WordpressPluginAction(Message):
    DEFAULT_METHOD = "WP_SECURITY_PLUGIN_ACTION"
class WordpressPluginTelemetry(Message, Reportable):
    """
    Information about telemetry event related to Imunify Security WordPress plugin
    """
    DEFAULT_METHOD = "WP_SECURITY_PLUGIN_EVENT"
    TARGET = ReportTarget.API
    def __repr__(self):
        return "{}()".format(self.__class__.__qualname__)
MSGS_WITHOUT_IP = [
    msg.DEFAULT_METHOD
    for msg in (
        Ack,
        CheckDetachedScans,
        MalwareScan,
        MalwareScanSummary,
        MalwareScanResult,
        MalwareScanComplete,
        MalwareCleanComplete,
        MalwareRestoreComplete,
        MalwareSendFiles,
        Noop,
        FilesUpdated,
    )
]