File: //usr/local/CyberCP/aiScanner/views.py
from django.shortcuts import render, redirect
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from loginSystem.views import loadLoginPage
from .aiScannerManager import AIScannerManager
import json
import os
def aiScannerHome(request):
"""Main AI Scanner page"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.scannerHome(request, userID)
except KeyError:
return redirect(loadLoginPage)
def setupPayment(request):
"""Setup payment method for AI scanner"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.setupPayment(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def setupComplete(request):
"""Handle return from payment setup"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.setupComplete(request, userID)
except KeyError:
return redirect(loadLoginPage)
def startScan(request):
"""Start a new AI security scan"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.startScan(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def refreshBalance(request):
"""Refresh account balance from API"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.refreshBalance(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def addPaymentMethod(request):
"""Add a new payment method"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.addPaymentMethod(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def paymentMethodComplete(request):
"""Handle return from adding payment method"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.paymentMethodComplete(request, userID)
except KeyError:
return redirect(loadLoginPage)
@csrf_exempt
def scanCallback(request):
"""Handle scan results callback from AI Scanner API"""
sm = AIScannerManager()
return sm.scanCallback(request)
def getScanHistory(request):
"""Get scan history for user"""
try:
userID = request.session['userID']
from loginSystem.models import Administrator
from .models import ScanHistory
from plogical.acl import ACLManager
admin = Administrator.objects.get(pk=userID)
currentACL = ACLManager.loadedACL(userID)
# Get scan history with ACL respect
if currentACL['admin'] == 1:
# Admin can see all scans
scans = ScanHistory.objects.all().order_by('-started_at')[:20]
else:
# Users can only see their own scans and their sub-users' scans
user_admins = ACLManager.loadUserObjects(userID)
scans = ScanHistory.objects.filter(admin__in=user_admins).order_by('-started_at')[:20]
scan_data = []
for scan in scans:
scan_data.append({
'scan_id': scan.scan_id,
'domain': scan.domain,
'status': scan.status,
'scan_type': scan.scan_type,
'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'),
'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None,
'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0,
'files_scanned': scan.files_scanned,
'issues_found': scan.issues_found,
'findings': scan.findings[:5] if scan.findings else [], # First 5 findings
'summary': scan.summary
})
return JsonResponse({'success': True, 'scans': scan_data})
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@require_http_methods(['GET'])
def getScanDetails(request, scan_id):
"""Get detailed scan results"""
try:
userID = request.session['userID']
from loginSystem.models import Administrator
from .models import ScanHistory
from .status_models import ScanStatusUpdate
from plogical.acl import ACLManager
admin = Administrator.objects.get(pk=userID)
currentACL = ACLManager.loadedACL(userID)
# Get scan with ACL respect
try:
scan = ScanHistory.objects.get(scan_id=scan_id)
# Check if user has access to this scan
if currentACL['admin'] != 1:
# Non-admin users can only see their own scans and their sub-users' scans
user_admins = ACLManager.loadUserObjects(userID)
if scan.admin not in user_admins:
return JsonResponse({'success': False, 'error': 'Access denied to this scan'})
except ScanHistory.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scan not found'})
# Get the status update for more detailed information
try:
status_update = ScanStatusUpdate.objects.get(scan_id=scan_id)
# Use detailed information from status update if available
files_scanned = status_update.files_scanned if status_update.files_scanned > 0 else scan.files_scanned
files_discovered = status_update.files_discovered
threats_found = status_update.threats_found
critical_threats = status_update.critical_threats
high_threats = status_update.high_threats
except ScanStatusUpdate.DoesNotExist:
# Fall back to basic information from scan history
files_scanned = scan.files_scanned
files_discovered = scan.files_scanned # Approximate
threats_found = scan.issues_found
critical_threats = 0
high_threats = 0
scan_data = {
'scan_id': scan.scan_id,
'domain': scan.domain,
'status': scan.status,
'scan_type': scan.scan_type,
'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'),
'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None,
'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0,
'files_scanned': files_scanned,
'files_discovered': files_discovered,
'issues_found': scan.issues_found,
'threats_found': threats_found,
'critical_threats': critical_threats,
'high_threats': high_threats,
'findings': scan.findings,
'summary': scan.summary,
'error_message': scan.error_message
}
return JsonResponse({'success': True, 'scan': scan_data})
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
except ScanHistory.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scan not found'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@require_http_methods(['GET'])
def getPlatformMonitorUrl(request, scan_id):
"""Get the platform monitor URL for a scan"""
try:
userID = request.session['userID']
from loginSystem.models import Administrator
from .models import ScanHistory, AIScannerSettings
from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
import requests
# Get scan to verify ownership
try:
scan = ScanHistory.objects.get(scan_id=scan_id)
admin = Administrator.objects.get(pk=userID)
# Verify access
from plogical.acl import ACLManager
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] != 1:
user_admins = ACLManager.loadUserObjects(userID)
if scan.admin not in user_admins:
return JsonResponse({'success': False, 'error': 'Access denied'})
except ScanHistory.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scan not found'})
# Get API key - first try current user, then scan owner
api_key = None
# Try current user's API key first
try:
current_user_settings = admin.ai_scanner_settings
if current_user_settings and current_user_settings.api_key:
api_key = current_user_settings.api_key
except AIScannerSettings.DoesNotExist:
pass
except Exception as e:
pass
# If current user doesn't have API key, try scan owner's
if not api_key:
try:
scanner_settings = scan.admin.ai_scanner_settings
if scanner_settings and scanner_settings.api_key:
api_key = scanner_settings.api_key
except AIScannerSettings.DoesNotExist:
pass
except Exception as e:
pass
# If still no API key, check if this might be a VPS free scan
if not api_key:
try:
from plogical.acl import ACLManager
from .aiScannerManager import AIScannerManager
server_ip = ACLManager.fetchIP()
sm = AIScannerManager()
vps_info = sm.check_vps_free_scans(server_ip)
if (vps_info.get('success') and
vps_info.get('is_vps') and
vps_info.get('free_scans_available', 0) > 0):
vps_key_data = sm.get_or_create_vps_api_key(server_ip)
if vps_key_data and vps_key_data.get('api_key'):
api_key = vps_key_data.get('api_key')
except Exception as e:
pass
# If still no API key, return error
if not api_key:
logging.writeToFile(f"[AI Scanner] No API key found for scan {scan_id}")
return JsonResponse({'success': False, 'error': 'API key not configured. Please configure your AI Scanner API key.'})
# Call platform API to get monitor URL
try:
url = f"https://platform.cyberpersons.com/ai-scanner/api/scan/{scan_id}/monitor-url/"
headers = {
'X-API-Key': api_key,
'Content-Type': 'application/json'
}
logging.writeToFile(f"[AI Scanner] Fetching platform monitor URL for scan {scan_id}")
response = requests.get(url, headers=headers, timeout=10)
try:
data = response.json()
except json.JSONDecodeError as e:
logging.writeToFile(f"[AI Scanner] JSON decode error: {str(e)}")
return JsonResponse({'success': False, 'error': 'Invalid response from platform'})
if response.status_code == 200 and data.get('success'):
logging.writeToFile(f"[AI Scanner] Got monitor URL: {data.get('monitor_url')}")
return JsonResponse({
'success': True,
'monitor_url': data.get('monitor_url'),
'platform_scan_id': data.get('platform_scan_id')
})
else:
error_msg = data.get('error', 'Failed to get monitor URL')
logging.writeToFile(f"[AI Scanner] Failed to get monitor URL: {error_msg}")
return JsonResponse({
'success': False,
'error': error_msg,
'scan_exists': data.get('scan_exists', False)
})
except requests.exceptions.Timeout:
logging.writeToFile(f"[AI Scanner] Platform request timeout for scan {scan_id}")
return JsonResponse({'success': False, 'error': 'Platform request timeout'})
except requests.exceptions.RequestException as e:
logging.writeToFile(f"[AI Scanner] Platform request error: {str(e)}")
return JsonResponse({'success': False, 'error': f'Platform error: {str(e)}'})
except Exception as e:
logging.writeToFile(f"[AI Scanner] Unexpected error: {str(e)}")
return JsonResponse({'success': False, 'error': f'Error: {str(e)}'})
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
except Exception as e:
logging.writeToFile(f"[AI Scanner] getPlatformMonitorUrl error: {str(e)}")
return JsonResponse({'success': False, 'error': str(e)})
def getPlatformScanStatus(request, scan_id):
"""Get real-time scan status from AI Scanner platform"""
try:
userID = request.session['userID']
from loginSystem.models import Administrator
from .models import ScanHistory
from plogical.acl import ACLManager
admin = Administrator.objects.get(pk=userID)
currentACL = ACLManager.loadedACL(userID)
# Get scan with ACL respect
try:
scan = ScanHistory.objects.get(scan_id=scan_id)
# Check if user has access to this scan
if currentACL['admin'] != 1:
# Non-admin users can only see their own scans and their sub-users' scans
user_admins = ACLManager.loadUserObjects(userID)
if scan.admin not in user_admins:
return JsonResponse({'success': False, 'error': 'Access denied to this scan'})
except ScanHistory.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scan not found'})
scanner_settings = admin.ai_scanner_settings
if not scanner_settings.api_key:
return JsonResponse({'success': False, 'error': 'API key not configured'})
# Get real-time status from platform
sm = AIScannerManager()
platform_status = sm.get_scan_status(scanner_settings.api_key, scan_id)
if platform_status and platform_status.get('success'):
status_data = platform_status.get('data', {})
# Return formatted status data for frontend
return JsonResponse({
'success': True,
'scan_id': scan_id,
'phase': status_data.get('status', 'unknown'),
'progress': status_data.get('progress', 0),
'current_file': status_data.get('current_file', ''),
'files_discovered': status_data.get('files_discovered', 0),
'files_scanned': status_data.get('files_scanned', 0),
'files_remaining': status_data.get('files_remaining', 0),
'threats_found': status_data.get('findings_count', 0),
'critical_threats': status_data.get('critical_threats', 0),
'high_threats': status_data.get('high_threats', 0),
'activity_description': status_data.get('activity_description', ''),
'last_updated': status_data.get('last_updated', ''),
'is_active': status_data.get('status') in ['scanning', 'discovering_files', 'starting'],
'cost': status_data.get('cost', '$0.00')
})
else:
# No live status available, return scan database status
return JsonResponse({
'success': True,
'scan_id': scan_id,
'phase': scan.status,
'progress': 100 if scan.status == 'completed' else 0,
'current_file': '',
'files_discovered': scan.files_scanned,
'files_scanned': scan.files_scanned,
'files_remaining': 0,
'threats_found': scan.issues_found,
'critical_threats': 0,
'high_threats': 0,
'activity_description': scan.error_message if scan.status == 'failed' else 'Scan completed',
'last_updated': scan.completed_at.isoformat() if scan.completed_at else scan.started_at.isoformat(),
'is_active': False,
'cost': f'${scan.cost_usd:.4f}' if scan.cost_usd else '$0.00'
})
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
except ScanHistory.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scan not found'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
# File Access API for AI Scanner
@csrf_exempt
@require_http_methods(['POST'])
def aiScannerAuthenticate(request):
"""Authenticate AI scanner access"""
try:
data = json.loads(request.body)
access_token = data.get('access_token')
scan_id = data.get('scan_id')
if not access_token or not scan_id:
return JsonResponse({'success': False, 'error': 'Missing parameters'})
from .models import FileAccessToken, ScanHistory
# Validate token
try:
file_token = FileAccessToken.objects.get(
token=access_token,
scan_history__scan_id=scan_id,
is_active=True
)
if file_token.is_expired():
return JsonResponse({'success': False, 'error': 'Token expired'})
# Get WordPress info
from websiteFunctions.models import Websites
try:
website = Websites.objects.get(domain=file_token.domain)
# Detect WordPress path and version
wp_path = file_token.wp_path
wp_version = 'Unknown'
php_version = 'Unknown'
# Try to get WP version from wp-includes/version.php
version_file = os.path.join(wp_path, 'wp-includes', 'version.php')
if os.path.exists(version_file):
try:
with open(version_file, 'r') as f:
content = f.read()
import re
match = re.search(r'\$wp_version\s*=\s*[\'"]([^\'"]+)[\'"]', content)
if match:
wp_version = match.group(1)
except:
pass
return JsonResponse({
'success': True,
'site_info': {
'domain': file_token.domain,
'wp_path': wp_path,
'php_version': php_version,
'wp_version': wp_version,
'scan_id': scan_id
}
})
except Websites.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Website not found'})
except FileAccessToken.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Invalid token'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@require_http_methods(['GET'])
def aiScannerListFiles(request):
"""List directory contents for AI scanner"""
try:
path = request.GET.get('path', '')
access_token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not access_token:
return JsonResponse({'success': False, 'error': 'No authorization token'})
from .models import FileAccessToken
# Validate token
try:
file_token = FileAccessToken.objects.get(token=access_token, is_active=True)
if file_token.is_expired():
return JsonResponse({'success': False, 'error': 'Token expired'})
# Construct full path
full_path = os.path.join(file_token.wp_path, path)
# Security check - ensure path is within WordPress directory
if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)):
return JsonResponse({'success': False, 'error': 'Path not allowed'})
if not os.path.exists(full_path):
return JsonResponse({'success': False, 'error': 'Path not found'})
if not os.path.isdir(full_path):
return JsonResponse({'success': False, 'error': 'Path is not a directory'})
# List directory contents
items = []
try:
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
# Skip hidden files and certain directories
if item.startswith('.') or item in ['__pycache__', 'node_modules']:
continue
if os.path.isdir(item_path):
items.append({
'name': item,
'type': 'directory',
'path': os.path.join(path, item).replace('\\', '/') if path else item
})
else:
# Only include certain file types
if item.endswith(('.php', '.js', '.html', '.htm', '.css', '.txt', '.md', '.json', '.xml')):
items.append({
'name': item,
'type': 'file',
'path': os.path.join(path, item).replace('\\', '/') if path else item,
'size': os.path.getsize(item_path)
})
return JsonResponse({
'success': True,
'path': path,
'items': items
})
except PermissionError:
return JsonResponse({'success': False, 'error': 'Permission denied'})
except FileAccessToken.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Invalid token'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@require_http_methods(['GET'])
def aiScannerGetFile(request):
"""Get file content for AI scanner"""
try:
file_path = request.GET.get('path')
access_token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not access_token or not file_path:
return JsonResponse({'success': False, 'error': 'Missing parameters'})
from .models import FileAccessToken
# Validate token
try:
file_token = FileAccessToken.objects.get(token=access_token, is_active=True)
if file_token.is_expired():
return JsonResponse({'success': False, 'error': 'Token expired'})
# Construct full path
full_path = os.path.join(file_token.wp_path, file_path)
# Security check - ensure path is within WordPress directory
if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)):
return JsonResponse({'success': False, 'error': 'Path not allowed'})
if not os.path.exists(full_path):
return JsonResponse({'success': False, 'error': 'File not found'})
if not os.path.isfile(full_path):
return JsonResponse({'success': False, 'error': 'Path is not a file'})
# Check file size (max 10MB as per API limits)
file_size = os.path.getsize(full_path)
if file_size > 10 * 1024 * 1024: # 10MB
return JsonResponse({'success': False, 'error': 'File too large'})
# Read file content
try:
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return JsonResponse({
'success': True,
'path': file_path,
'content': content,
'size': file_size
})
except UnicodeDecodeError:
# Try binary mode for non-text files
with open(full_path, 'rb') as f:
content = f.read()
# Return base64 encoded for binary files
import base64
return JsonResponse({
'success': True,
'path': file_path,
'content': base64.b64encode(content).decode('utf-8'),
'encoding': 'base64',
'size': file_size
})
except FileAccessToken.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Invalid token'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})