HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: xnsbb3110 (1041)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/CyberCP/aiScanner/models.py
from django.db import models
from loginSystem.models import Administrator
import json

# Import the status update model
from .status_models import ScanStatusUpdate


class AIScannerSettings(models.Model):
    """Store AI scanner configuration and API keys for administrators"""
    admin = models.OneToOneField(Administrator, on_delete=models.CASCADE, related_name='ai_scanner_settings')
    api_key = models.CharField(max_length=255, blank=True, null=True)
    balance = models.DecimalField(max_digits=10, decimal_places=4, default=0.0000)
    is_payment_configured = models.BooleanField(default=False)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'ai_scanner_settings'

    def __str__(self):
        return f"AI Scanner Settings for {self.admin.userName}"


class ScanHistory(models.Model):
    """Store scan history and results"""
    SCAN_STATUS_CHOICES = [
        ('pending', 'Pending'),
        ('running', 'Running'),
        ('completed', 'Completed'),
        ('failed', 'Failed'),
        ('cancelled', 'Cancelled'),
    ]

    SCAN_TYPE_CHOICES = [
        ('full', 'Full Scan'),
        ('quick', 'Quick Scan'),
        ('custom', 'Custom Scan'),
    ]

    admin = models.ForeignKey(Administrator, on_delete=models.CASCADE, related_name='scan_history')
    scan_id = models.CharField(max_length=100, unique=True)
    domain = models.CharField(max_length=255)
    scan_type = models.CharField(max_length=20, choices=SCAN_TYPE_CHOICES, default='full')
    status = models.CharField(max_length=20, choices=SCAN_STATUS_CHOICES, default='pending')
    cost_usd = models.DecimalField(max_digits=10, decimal_places=6, null=True, blank=True)
    files_scanned = models.IntegerField(default=0)
    issues_found = models.IntegerField(default=0)
    findings_json = models.TextField(blank=True, null=True)  # Store JSON findings
    summary_json = models.TextField(blank=True, null=True)  # Store JSON summary
    error_message = models.TextField(blank=True, null=True)
    started_at = models.DateTimeField(auto_now_add=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        db_table = 'ai_scanner_history'
        ordering = ['-started_at']

    def __str__(self):
        return f"Scan {self.scan_id} - {self.domain} ({self.status})"

    @property
    def findings(self):
        """Parse findings JSON"""
        if self.findings_json:
            try:
                return json.loads(self.findings_json)
            except json.JSONDecodeError:
                return []
        return []

    @property
    def summary(self):
        """Parse summary JSON"""
        if self.summary_json:
            try:
                return json.loads(self.summary_json)
            except json.JSONDecodeError:
                return {}
        return {}

    def set_findings(self, findings_list):
        """Set findings from list/dict"""
        self.findings_json = json.dumps(findings_list)

    def set_summary(self, summary_dict):
        """Set summary from dict"""
        self.summary_json = json.dumps(summary_dict)


class FileAccessToken(models.Model):
    """Temporary tokens for file access during scans"""
    token = models.CharField(max_length=100, unique=True)
    scan_history = models.ForeignKey(ScanHistory, on_delete=models.CASCADE, related_name='access_tokens')
    domain = models.CharField(max_length=255)
    wp_path = models.CharField(max_length=500)
    expires_at = models.DateTimeField()
    created_at = models.DateTimeField(auto_now_add=True)
    is_active = models.BooleanField(default=True)

    class Meta:
        db_table = 'ai_scanner_file_tokens'

    def __str__(self):
        return f"Access token {self.token} for {self.domain}"

    def is_expired(self):
        from django.utils import timezone
        return timezone.now() > self.expires_at


class ScheduledScan(models.Model):
    """Store scheduled scan configurations"""
    FREQUENCY_CHOICES = [
        ('daily', 'Daily'),
        ('weekly', 'Weekly'),
        ('monthly', 'Monthly'),
        ('quarterly', 'Quarterly'),
    ]
    
    SCAN_TYPE_CHOICES = [
        ('full', 'Full Scan'),
        ('quick', 'Quick Scan'),
        ('custom', 'Custom Scan'),
    ]
    
    STATUS_CHOICES = [
        ('active', 'Active'),
        ('paused', 'Paused'),
        ('disabled', 'Disabled'),
    ]
    
    admin = models.ForeignKey(Administrator, on_delete=models.CASCADE, related_name='scheduled_scans')
    name = models.CharField(max_length=200, help_text="Name for this scheduled scan")
    domains = models.TextField(help_text="JSON array of domains to scan")
    frequency = models.CharField(max_length=20, choices=FREQUENCY_CHOICES, default='weekly')
    scan_type = models.CharField(max_length=20, choices=SCAN_TYPE_CHOICES, default='full')
    time_of_day = models.TimeField(help_text="Time of day to run the scan (UTC)")
    day_of_week = models.IntegerField(null=True, blank=True, help_text="Day of week for weekly scans (0=Monday, 6=Sunday)")
    day_of_month = models.IntegerField(null=True, blank=True, help_text="Day of month for monthly scans (1-31)")
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='active')
    last_run = models.DateTimeField(null=True, blank=True)
    next_run = models.DateTimeField(null=True, blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    
    # Notification settings
    email_notifications = models.BooleanField(default=True)
    notification_emails = models.TextField(blank=True, help_text="JSON array of email addresses")
    notify_on_threats = models.BooleanField(default=True)
    notify_on_completion = models.BooleanField(default=False)
    notify_on_failure = models.BooleanField(default=True)
    
    class Meta:
        db_table = 'ai_scanner_scheduled_scans'
        ordering = ['-created_at']
    
    def __str__(self):
        return f"Scheduled Scan: {self.name} ({self.frequency})"
    
    @property
    def domain_list(self):
        """Parse domains JSON"""
        if self.domains:
            try:
                return json.loads(self.domains)
            except json.JSONDecodeError:
                return []
        return []
    
    @property
    def notification_email_list(self):
        """Parse notification emails JSON"""
        if self.notification_emails:
            try:
                return json.loads(self.notification_emails)
            except json.JSONDecodeError:
                return []
        return []
    
    def set_domains(self, domain_list):
        """Set domains from list"""
        self.domains = json.dumps(domain_list)
    
    def set_notification_emails(self, email_list):
        """Set notification emails from list"""
        self.notification_emails = json.dumps(email_list)
    
    def calculate_next_run(self):
        """Calculate next run time based on frequency"""
        from django.utils import timezone
        from datetime import datetime, timedelta
        import calendar
        
        now = timezone.now()
        
        if self.frequency == 'daily':
            # Daily: next run is tomorrow at specified time
            next_run = now.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
            if next_run <= now:
                next_run += timedelta(days=1)
        
        elif self.frequency == 'weekly':
            # Weekly: next run is on specified day of week at specified time
            days_ahead = self.day_of_week - now.weekday()
            if days_ahead <= 0:  # Target day already happened this week
                days_ahead += 7
            next_run = now + timedelta(days=days_ahead)
            next_run = next_run.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
        
        elif self.frequency == 'monthly':
            # Monthly: next run is on specified day of month at specified time
            year = now.year
            month = now.month
            day = min(self.day_of_month, calendar.monthrange(year, month)[1])
            
            next_run = now.replace(day=day, hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
            
            if next_run <= now:
                # Move to next month
                if month == 12:
                    year += 1
                    month = 1
                else:
                    month += 1
                day = min(self.day_of_month, calendar.monthrange(year, month)[1])
                next_run = next_run.replace(year=year, month=month, day=day)
        
        elif self.frequency == 'quarterly':
            # Quarterly: next run is 3 months from now
            next_run = now.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
            month = now.month
            year = now.year
            
            # Add 3 months
            month += 3
            if month > 12:
                year += 1
                month -= 12
            
            day = min(self.day_of_month or 1, calendar.monthrange(year, month)[1])
            next_run = next_run.replace(year=year, month=month, day=day)
            
            if next_run <= now:
                # Add another 3 months
                month += 3
                if month > 12:
                    year += 1
                    month -= 12
                day = min(self.day_of_month or 1, calendar.monthrange(year, month)[1])
                next_run = next_run.replace(year=year, month=month, day=day)
        
        else:
            # Default to weekly
            next_run = now + timedelta(weeks=1)
        
        return next_run
    
    def save(self, *args, **kwargs):
        """Override save to calculate next_run"""
        if not self.next_run or self.status == 'active':
            self.next_run = self.calculate_next_run()
        super().save(*args, **kwargs)


class ScheduledScanExecution(models.Model):
    """Track individual executions of scheduled scans"""
    STATUS_CHOICES = [
        ('pending', 'Pending'),
        ('running', 'Running'),
        ('completed', 'Completed'),
        ('failed', 'Failed'),
        ('cancelled', 'Cancelled'),
    ]
    
    scheduled_scan = models.ForeignKey(ScheduledScan, on_delete=models.CASCADE, related_name='executions')
    execution_time = models.DateTimeField(auto_now_add=True)
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
    domains_scanned = models.TextField(blank=True, help_text="JSON array of domains that were scanned")
    total_scans = models.IntegerField(default=0)
    successful_scans = models.IntegerField(default=0)
    failed_scans = models.IntegerField(default=0)
    total_cost = models.DecimalField(max_digits=10, decimal_places=6, default=0.0)
    scan_ids = models.TextField(blank=True, help_text="JSON array of scan IDs created")
    error_message = models.TextField(blank=True, null=True)
    started_at = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)
    
    class Meta:
        db_table = 'ai_scanner_scheduled_executions'
        ordering = ['-execution_time']
    
    def __str__(self):
        return f"Execution of {self.scheduled_scan.name} at {self.execution_time}"
    
    @property
    def scanned_domains(self):
        """Parse domains scanned JSON"""
        if self.domains_scanned:
            try:
                return json.loads(self.domains_scanned)
            except json.JSONDecodeError:
                return []
        return []
    
    @property
    def scan_id_list(self):
        """Parse scan IDs JSON"""
        if self.scan_ids:
            try:
                return json.loads(self.scan_ids)
            except json.JSONDecodeError:
                return []
        return []
    
    def set_scanned_domains(self, domain_list):
        """Set scanned domains from list"""
        self.domains_scanned = json.dumps(domain_list)
    
    def set_scan_ids(self, scan_id_list):
        """Set scan IDs from list"""
        self.scan_ids = json.dumps(scan_id_list)