File: //usr/local/CyberCP/firewall/firewallManager.py
#!/usr/local/CyberCP/bin/python
import os
import os.path
import sys
import django
from loginSystem.models import Administrator
from plogical.httpProc import httpProc
sys.path.append('/usr/local/CyberCP')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
django.setup()
import json
from plogical.acl import ACLManager
import plogical.CyberCPLogFileWriter as logging
from plogical.virtualHostUtilities import virtualHostUtilities
import subprocess
from django.shortcuts import HttpResponse, render, redirect
from random import randint
import time
from plogical.firewallUtilities import FirewallUtilities
from firewall.models import FirewallRules
from plogical.modSec import modSec
from plogical.csf import CSF
from plogical.processUtilities import ProcessUtilities
from serverStatus.serverStatusUtil import ServerStatusUtil
class FirewallManager:
imunifyPath = '/usr/bin/imunify360-agent'
CLPath = '/etc/sysconfig/cloudlinux'
imunifyAVPath = '/etc/sysconfig/imunify360/integration.conf'
def __init__(self, request = None):
self.request = request
def securityHome(self, request = None, userID = None):
proc = httpProc(request, 'firewall/index.html',
None, 'admin')
return proc.render()
def firewallHome(self, request = None, userID = None):
csfPath = '/etc/csf'
if os.path.exists(csfPath):
return redirect('/configservercsf/')
else:
proc = httpProc(request, 'firewall/firewall.html',
None, 'admin')
return proc.render()
def getCurrentRules(self, userID = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
rules = FirewallRules.objects.all()
json_data = "["
checker = 0
for items in rules:
dic = {
'id': items.id,
'name': items.name,
'proto': items.proto,
'port': items.port,
'ipAddress': items.ipAddress,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'status': 1, 'fetchStatus': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addRule(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('add_status', 0)
ruleName = data['ruleName']
ruleProtocol = data['ruleProtocol']
rulePort = data['rulePort']
ruleIP = data['ruleIP']
FirewallUtilities.addRule(ruleProtocol, rulePort, ruleIP)
newFWRule = FirewallRules(name=ruleName, proto=ruleProtocol, port=rulePort, ipAddress=ruleIP)
newFWRule.save()
final_dic = {'status': 1, 'add_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'add_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def deleteRule(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('delete_status', 0)
ruleID = data['id']
ruleProtocol = data['proto']
rulePort = data['port']
ruleIP = data['ruleIP']
FirewallUtilities.deleteRule(ruleProtocol, rulePort, ruleIP)
delRule = FirewallRules.objects.get(id=ruleID)
delRule.delete()
final_dic = {'status': 1, 'delete_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'delete_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def reloadFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('reload_status', 0)
command = 'sudo firewall-cmd --reload'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'reload_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'reload_status': 0,
'error_message': "Can not reload firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'reload_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def startFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('start_status', 0)
command = 'sudo systemctl start firewalld'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'start_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'start_status': 0,
'error_message': "Can not start firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'start_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def stopFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('stop_status', 0)
command = 'sudo systemctl stop firewalld'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'stop_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'stop_status': 0,
'error_message': "Can not stop firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'stop_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def firewallStatus(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
command = 'systemctl status firewalld'
status = ProcessUtilities.outputExecutioner(command)
if status.find("dead") > -1:
final_dic = {'status': 1, 'error_message': "none", 'firewallStatus': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 1, 'error_message': "none", 'firewallStatus': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def secureSSH(self, request = None, userID = None):
proc = httpProc(request, 'firewall/secureSSH.html',
None, 'admin')
return proc.render()
def getSSHConfigs(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
type = data['type']
if type == "1":
## temporarily changing permission for sshd files
pathToSSH = "/etc/ssh/sshd_config"
cat = "sudo cat " + pathToSSH
data = ProcessUtilities.outputExecutioner(cat).split('\n')
permitRootLogin = 0
sshPort = "22"
for items in data:
if items.find("PermitRootLogin") > -1:
if items.find("Yes") > -1 or items.find("yes") > -1:
permitRootLogin = 1
continue
if items.find("Port") > -1 and not items.find("GatewayPorts") > -1:
sshPort = items.split(" ")[1].strip("\n")
final_dic = {'status': 1, 'permitRootLogin': permitRootLogin, 'sshPort': sshPort}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
pathToKeyFile = "/root/.ssh/authorized_keys"
cat = "sudo cat " + pathToKeyFile
data = ProcessUtilities.outputExecutioner(cat).split('\n')
json_data = "["
checker = 0
for items in data:
if items.find("ssh-rsa") > -1:
keydata = items.split(" ")
try:
key = "ssh-rsa " + keydata[1][:50] + " .. " + keydata[2]
try:
userName = keydata[2][:keydata[2].index("@")]
except:
userName = keydata[2]
except:
key = "ssh-rsa " + keydata[1][:50]
userName = ''
dic = {'userName': userName,
'key': key,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'status': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveSSHConfigs(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
type = data['type']
sshPort = data['sshPort']
rootLogin = data['rootLogin']
if rootLogin == True:
rootLogin = "1"
else:
rootLogin = "0"
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " saveSSHConfigs --type " + str(type) + " --sshPort " + sshPort + " --rootLogin " + rootLogin
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
csfPath = '/etc/csf'
if os.path.exists(csfPath):
dataIn = {'protocol': 'TCP_IN', 'ports': sshPort}
self.modifyPorts(dataIn)
dataIn = {'protocol': 'TCP_OUT', 'ports': sshPort}
self.modifyPorts(dataIn)
else:
try:
updateFW = FirewallRules.objects.get(name="SSHCustom")
FirewallUtilities.deleteRule("tcp", updateFW.port, "0.0.0.0/0")
updateFW.port = sshPort
updateFW.save()
FirewallUtilities.addRule('tcp', sshPort, "0.0.0.0/0")
except:
try:
newFireWallRule = FirewallRules(name="SSHCustom", port=sshPort, proto="tcp")
newFireWallRule.save()
FirewallUtilities.addRule('tcp', sshPort, "0.0.0.0/0")
command = 'firewall-cmd --permanent --remove-service=ssh'
ProcessUtilities.executioner(command)
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg))
final_dic = {'status': 1, 'saveStatus': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 0, 'saveStatus': 0, "error_message": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0 ,'saveStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def deleteSSHKey(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('delete_status', 0)
key = data['key']
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " deleteSSHKey --key '" + key + "'"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
final_dic = {'status': 1, 'delete_status': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 1, 'delete_status': 1, "error_mssage": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'delete_status': 0, 'error_mssage': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addSSHKey(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('add_status', 0)
key = data['key']
tempPath = "/home/cyberpanel/" + str(randint(1000, 9999))
writeToFile = open(tempPath, "w")
writeToFile.write(key)
writeToFile.close()
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " addSSHKey --tempPath " + tempPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
final_dic = {'status': 1, 'add_status': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 0, 'add_status': 0, "error_mssage": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'add_status': 0, 'error_mssage': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def loadModSecurityHome(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
OLS = 1
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
OLS = 0
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurity.html',
{'modSecInstalled': modSecInstalled, 'OLS': OLS}, 'admin')
return proc.render()
def installModSec(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installModSec', 0)
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " installModSec"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(3)
final_json = json.dumps({'installModSec': 1, 'error_message': "None"})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'installModSec': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installStatusModSec(self, userID = None, data = None):
try:
command = "sudo cat " + modSec.installLogPath
installStatus = ProcessUtilities.outputExecutioner(command)
if installStatus.find("[200]") > -1:
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " installModSecConfigs"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
pass
else:
final_json = json.dumps({
'error_message': "Failed to install ModSecurity configurations.",
'requestStatus': installStatus,
'abort': 1,
'installed': 0,
})
return HttpResponse(final_json)
final_json = json.dumps({
'error_message': "None",
'requestStatus': installStatus,
'abort': 1,
'installed': 1,
})
return HttpResponse(final_json)
elif installStatus.find("[404]") > -1:
final_json = json.dumps({
'abort': 1,
'installed': 0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
else:
final_json = json.dumps({
'abort': 0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'abort': 1, 'installed': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def fetchModSecSettings(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
modsecurity = 0
SecAuditEngine = 0
SecRuleEngine = 0
SecDebugLogLevel = "9"
SecAuditLogRelevantStatus = '^(?:5|4(?!04))'
SecAuditLogParts = 'ABIJDEFHZ'
SecAuditLogType = 'Serial'
confPath = os.path.join(virtualHostUtilities.Server_root, 'conf/httpd_config.conf')
modSecPath = os.path.join(virtualHostUtilities.Server_root, 'modules', 'mod_security.so')
if os.path.exists(modSecPath):
command = "sudo cat " + confPath
data = ProcessUtilities.outputExecutioner(command).split('\n')
for items in data:
if items.find('modsecurity ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
modsecurity = 1
continue
if items.find('SecAuditEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecAuditEngine = 1
continue
if items.find('SecRuleEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecRuleEngine = 1
continue
if items.find('SecDebugLogLevel') > -1:
result = items.split(' ')
if result[0] == 'SecDebugLogLevel':
SecDebugLogLevel = result[1]
continue
if items.find('SecAuditLogRelevantStatus') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogRelevantStatus':
SecAuditLogRelevantStatus = result[1]
continue
if items.find('SecAuditLogParts') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogParts':
SecAuditLogParts = result[1]
continue
if items.find('SecAuditLogType') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogType':
SecAuditLogType = result[1]
continue
final_dic = {'fetchStatus': 1,
'installed': 1,
'SecRuleEngine': SecRuleEngine,
'modsecurity': modsecurity,
'SecAuditEngine': SecAuditEngine,
'SecDebugLogLevel': SecDebugLogLevel,
'SecAuditLogParts': SecAuditLogParts,
'SecAuditLogRelevantStatus': SecAuditLogRelevantStatus,
'SecAuditLogType': SecAuditLogType,
}
else:
final_dic = {'fetchStatus': 1,
'installed': 0}
else:
SecAuditEngine = 0
SecRuleEngine = 0
SecDebugLogLevel = "9"
SecAuditLogRelevantStatus = '^(?:5|4(?!04))'
SecAuditLogParts = 'ABIJDEFHZ'
SecAuditLogType = 'Serial'
confPath = os.path.join(virtualHostUtilities.Server_root, 'conf/modsec.conf')
command = "sudo cat " + confPath
data = ProcessUtilities.outputExecutioner(command).split('\n')
for items in data:
if items.find('SecAuditEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecAuditEngine = 1
continue
if items.find('SecRuleEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecRuleEngine = 1
continue
if items.find('SecDebugLogLevel') > -1:
result = items.split(' ')
if result[0] == 'SecDebugLogLevel':
SecDebugLogLevel = result[1]
continue
if items.find('SecAuditLogRelevantStatus') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogRelevantStatus':
SecAuditLogRelevantStatus = result[1]
continue
if items.find('SecAuditLogParts') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogParts':
SecAuditLogParts = result[1]
continue
if items.find('SecAuditLogType') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogType':
SecAuditLogType = result[1]
continue
final_dic = {'fetchStatus': 1,
'installed': 1,
'SecRuleEngine': SecRuleEngine,
'SecAuditEngine': SecAuditEngine,
'SecDebugLogLevel': SecDebugLogLevel,
'SecAuditLogParts': SecAuditLogParts,
'SecAuditLogRelevantStatus': SecAuditLogRelevantStatus,
'SecAuditLogType': SecAuditLogType,
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveModSecConfigurations(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
modsecurity = data['modsecurity_status']
SecAuditEngine = data['SecAuditEngine']
SecRuleEngine = data['SecRuleEngine']
SecDebugLogLevel = data['SecDebugLogLevel']
SecAuditLogParts = data['SecAuditLogParts']
SecAuditLogRelevantStatus = data['SecAuditLogRelevantStatus']
SecAuditLogType = data['SecAuditLogType']
if modsecurity == True:
modsecurity = "modsecurity on"
else:
modsecurity = "modsecurity off"
if SecAuditEngine == True:
SecAuditEngine = "SecAuditEngine on"
else:
SecAuditEngine = "SecAuditEngine off"
if SecRuleEngine == True:
SecRuleEngine = "SecRuleEngine On"
else:
SecRuleEngine = "SecRuleEngine off"
SecDebugLogLevel = "SecDebugLogLevel " + str(SecDebugLogLevel)
SecAuditLogParts = "SecAuditLogParts " + str(SecAuditLogParts)
SecAuditLogRelevantStatus = "SecAuditLogRelevantStatus " + SecAuditLogRelevantStatus
SecAuditLogType = "SecAuditLogType " + SecAuditLogType
## writing data temporary to file
tempConfigPath = "/home/cyberpanel/" + str(randint(1000, 9999))
confPath = open(tempConfigPath, "w")
confPath.writelines(modsecurity + "\n")
confPath.writelines(SecAuditEngine + "\n")
confPath.writelines(SecRuleEngine + "\n")
confPath.writelines(SecDebugLogLevel + "\n")
confPath.writelines(SecAuditLogParts + "\n")
confPath.writelines(SecAuditLogRelevantStatus + "\n")
confPath.writelines(SecAuditLogType + "\n")
confPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecConfigs --tempConfigPath " + tempConfigPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
SecAuditEngine = data['SecAuditEngine']
SecRuleEngine = data['SecRuleEngine']
SecDebugLogLevel = data['SecDebugLogLevel']
SecAuditLogParts = data['SecAuditLogParts']
SecAuditLogRelevantStatus = data['SecAuditLogRelevantStatus']
SecAuditLogType = data['SecAuditLogType']
if SecAuditEngine == True:
SecAuditEngine = "SecAuditEngine on"
else:
SecAuditEngine = "SecAuditEngine off"
if SecRuleEngine == True:
SecRuleEngine = "SecRuleEngine On"
else:
SecRuleEngine = "SecRuleEngine off"
SecDebugLogLevel = "SecDebugLogLevel " + str(SecDebugLogLevel)
SecAuditLogParts = "SecAuditLogParts " + str(SecAuditLogParts)
SecAuditLogRelevantStatus = "SecAuditLogRelevantStatus " + SecAuditLogRelevantStatus
SecAuditLogType = "SecAuditLogType " + SecAuditLogType
## writing data temporary to file
tempConfigPath = "/home/cyberpanel/" + str(randint(1000, 9999))
confPath = open(tempConfigPath, "w")
confPath.writelines(SecAuditEngine + "\n")
confPath.writelines(SecRuleEngine + "\n")
confPath.writelines(SecDebugLogLevel + "\n")
confPath.writelines(SecAuditLogParts + "\n")
confPath.writelines(SecAuditLogRelevantStatus + "\n")
confPath.writelines(SecAuditLogType + "\n")
confPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecConfigs --tempConfigPath " + tempConfigPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def modSecRules(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurityRules.html',
{'modSecInstalled': modSecInstalled}, 'admin')
return proc.render()
def fetchModSecRules(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
rulesPath = os.path.join(virtualHostUtilities.Server_root + "/conf/modsec/rules.conf")
if modSecInstalled:
command = "sudo cat " + rulesPath
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'modSecInstalled': 1,
'currentModSecRules': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'modSecInstalled': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
rulesPath = os.path.join(virtualHostUtilities.Server_root + "/conf/rules.conf")
command = "sudo cat " + rulesPath
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'modSecInstalled': 1,
'currentModSecRules': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'modSecInstalled': 0,
'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveModSecRules(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
newModSecRules = data['modSecRules']
## writing data temporary to file
rulesPath = open(modSec.tempRulesFile, "w")
rulesPath.write(newModSecRules)
rulesPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecRules"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def modSecRulesPacks(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurityRulesPacks.html',
{'modSecInstalled': modSecInstalled}, 'admin')
return proc.render()
def getOWASPAndComodoStatus(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
comodoInstalled = 0
owaspInstalled = 0
if modSecInstalled:
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
for items in httpdConfig:
if items.find('modsec/comodo') > -1:
comodoInstalled = 1
elif items.find('modsec/owasp') > -1:
owaspInstalled = 1
if owaspInstalled == 1 and comodoInstalled == 1:
break
final_dic = {
'modSecInstalled': 1,
'owaspInstalled': owaspInstalled,
'comodoInstalled': comodoInstalled
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'modSecInstalled': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
comodoInstalled = 0
owaspInstalled = 0
try:
command = 'sudo ls /usr/local/lsws/conf/comodo_litespeed/'
output = ProcessUtilities.outputExecutioner(command)
if output.find('No such') > -1:
comodoInstalled = 0
else:
comodoInstalled = 1
except subprocess.CalledProcessError:
pass
try:
command = 'cat /usr/local/lsws/conf/modsec.conf'
output = ProcessUtilities.outputExecutioner(command)
if output.find('modsec/owasp') > -1:
owaspInstalled = 1
except:
pass
final_dic = {
'modSecInstalled': 1,
'owaspInstalled': owaspInstalled,
'comodoInstalled': comodoInstalled
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'modSecInstalled': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installModSecRulesPack(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
packName = data['packName']
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + packName
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'installStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'installStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
# if packName == 'disableOWASP' or packName == 'installOWASP':
# final_json = json.dumps({'installStatus': 0, 'error_message': "OWASP will be available later.", })
# return HttpResponse(final_json)
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + packName
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'installStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'installStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'installStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def getRulesFiles(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
packName = data['packName']
confPath = os.path.join('/usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf')
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
json_data = "["
checker = 0
counter = 0
for items in httpdConfig:
if items.find('modsec/' + packName) > -1:
counter = counter + 1
if items[0] == '#':
status = False
else:
status = True
fileName = items.lstrip('#')
fileName = fileName.split('/')[-1]
dic = {
'id': counter,
'fileName': fileName,
'packName': packName,
'status': status,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
# if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
# confPath = os.path.join('/usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf')
#
# command = "sudo cat " + confPath
# httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
#
# json_data = "["
# checker = 0
# counter = 0
#
# for items in httpdConfig:
#
# if items.find('modsec/' + packName) > -1:
# counter = counter + 1
# if items[0] == '#':
# status = False
# else:
# status = True
#
# fileName = items.lstrip('#')
# fileName = fileName.split('/')[-1]
#
# dic = {
# 'id': counter,
# 'fileName': fileName,
# 'packName': packName,
# 'status': status,
#
# }
#
# if checker == 0:
# json_data = json_data + json.dumps(dic)
# checker = 1
# else:
# json_data = json_data + ',' + json.dumps(dic)
#
# json_data = json_data + ']'
# final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
# return HttpResponse(final_json)
# else:
#
# command = 'cat /usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf'
# files = ProcessUtilities.outputExecutioner(command).splitlines()
#
# json_data = "["
#
# counter = 0
# checker = 0
# for fileName in files:
#
# if fileName == 'categories.conf':
# continue
#
# if fileName.endswith('bak'):
# status = 0
# fileName = fileName.rstrip('.bak')
# elif fileName.endswith('conf'):
# status = 1
# else:
# continue
#
# dic = {
# 'id': counter,
# 'fileName': fileName,
# 'packName': packName,
# 'status': status,
#
# }
#
# counter = counter + 1
#
# if checker == 0:
# json_data = json_data + json.dumps(dic)
# checker = 1
# else:
# json_data = json_data + ',' + json.dumps(dic)
#
# json_data = json_data + ']'
# final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
# return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def enableDisableRuleFile(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
packName = data['packName']
fileName = data['fileName']
currentStatus = data['status']
if currentStatus == True:
functionName = 'disableRuleFile'
else:
functionName = 'enableRuleFile'
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + functionName + ' --packName ' + packName + ' --fileName "%s"' % (fileName)
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def csf(self):
csfInstalled = 1
try:
command = 'csf -h'
output = ProcessUtilities.outputExecutioner(command)
if output.find("command not found") > -1:
csfInstalled = 0
except subprocess.CalledProcessError:
csfInstalled = 0
proc = httpProc(self.request, 'firewall/csf.html',
{'csfInstalled': csfInstalled}, 'admin')
return proc.render()
def installCSF(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " installCSF"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(2)
data_ret = {"installStatus": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'installStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installStatusCSF(self):
try:
userID = self.request.session['userID']
installStatus = ProcessUtilities.outputExecutioner("sudo cat " + CSF.installLogPath)
if installStatus.find("[200]")>-1:
command = 'sudo rm -f ' + CSF.installLogPath
ProcessUtilities.executioner(command)
final_json = json.dumps({
'error_message': "None",
'requestStatus': installStatus,
'abort':1,
'installed': 1,
})
return HttpResponse(final_json)
elif installStatus.find("[404]") > -1:
command = 'sudo rm -f ' + CSF.installLogPath
ProcessUtilities.executioner(command)
final_json = json.dumps({
'abort':1,
'installed':0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
else:
final_json = json.dumps({
'abort':0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'abort':1, 'installed':0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def removeCSF(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " removeCSF"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(2)
data_ret = {"installStatus": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'installStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def fetchCSFSettings(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
currentSettings = CSF.fetchCSFSettings()
data_ret = {"fetchStatus": 1, 'testingMode' : currentSettings['TESTING'],
'tcpIN' : currentSettings['tcpIN'],
'tcpOUT': currentSettings['tcpOUT'],
'udpIN': currentSettings['udpIN'],
'udpOUT': currentSettings['udpOUT'],
'firewallStatus': currentSettings['firewallStatus']
}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': 'CSF is not installed.'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def changeStatus(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
data = json.loads(self.request.body)
controller = data['controller']
status = data['status']
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " changeStatus --controller " + controller + " --status " + status
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def modifyPorts(self, data = None):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
protocol = data['protocol']
ports = data['ports']
portsPath = '/home/cyberpanel/' + str(randint(1000, 9999))
if os.path.exists(portsPath):
os.remove(portsPath)
writeToFile = open(portsPath, 'w')
writeToFile.write(ports)
writeToFile.close()
command = 'chmod 600 %s' % (portsPath)
ProcessUtilities.executioner(command)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " modifyPorts --protocol " + protocol + " --ports " + portsPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def modifyIPs(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
data = json.loads(self.request.body)
mode = data['mode']
ipAddress = data['ipAddress']
if mode == 'allowIP':
CSF.allowIP(ipAddress)
elif mode == 'blockIP':
CSF.blockIP(ipAddress)
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def imunify(self):
ipFile = "/etc/cyberpanel/machineIP"
f = open(ipFile)
ipData = f.read()
ipAddress = ipData.split('\n', 1)[0]
fullAddress = '%s:%s' % (ipAddress, ProcessUtilities.fetchCurrentPort())
data = {}
data['ipAddress'] = fullAddress
data['CL'] = 1
if os.path.exists(FirewallManager.imunifyPath):
data['imunify'] = 1
else:
data['imunify'] = 0
if data['CL'] == 0:
proc = httpProc(self.request, 'firewall/notAvailable.html',
data, 'admin')
return proc.render()
elif data['imunify'] == 0:
proc = httpProc(self.request, 'firewall/notAvailable.html',
data, 'admin')
return proc.render()
else:
proc = httpProc(self.request, 'firewall/imunify.html',
data, 'admin')
return proc.render()
def submitinstallImunify(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
'Not authorized to install container packages. [404].',
1)
return 0
data = json.loads(self.request.body)
execPath = "/usr/local/CyberCP/bin/python /usr/local/CyberCP/CLManager/CageFS.py"
execPath = execPath + " --function submitinstallImunify --key %s" % (data['key'])
ProcessUtilities.popenExecutioner(execPath)
data_ret = {'status': 1, 'error_message': 'None'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
def imunifyAV(self):
ipFile = "/etc/cyberpanel/machineIP"
f = open(ipFile)
ipData = f.read()
ipAddress = ipData.split('\n', 1)[0]
fullAddress = '%s:%s' % (ipAddress, ProcessUtilities.fetchCurrentPort())
data = {}
data['ipAddress'] = fullAddress
if os.path.exists(FirewallManager.imunifyAVPath):
data['imunify'] = 1
else:
data['imunify'] = 0
if data['imunify'] == 0:
proc = httpProc(self.request, 'firewall/notAvailableAV.html',
data, 'admin')
return proc.render()
else:
proc = httpProc(self.request, 'firewall/imunifyAV.html',
data, 'admin')
return proc.render()
def submitinstallImunifyAV(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
'Not authorized to install container packages. [404].',
1)
return 0
execPath = "/usr/local/CyberCP/bin/python /usr/local/CyberCP/CLManager/CageFS.py"
execPath = execPath + " --function submitinstallImunifyAV"
ProcessUtilities.popenExecutioner(execPath)
data_ret = {'status': 1, 'error_message': 'None'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
def litespeed_ent_conf(self, request = None, userID = None):
proc = httpProc(request, 'firewall/litespeed_ent_conf.html',
None, 'admin')
return proc.render()
def fetchlitespeed_Conf(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
file_path = "/usr/local/lsws/conf/pre_main_global.conf"
if not os.path.exists(file_path):
command = "touch /usr/local/lsws/conf/pre_main_global.conf"
ProcessUtilities.executioner(command)
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveLitespeed_conf(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
file_path = "/usr/local/lsws/conf/pre_main_global.conf"
command = f'rm -f {file_path}'
ProcessUtilities.executioner(command)
currentLitespeed_conf = data['modSecRules']
tempRulesPath = '/home/cyberpanel/pre_main_global.conf'
WriteToFile = open(tempRulesPath, 'w')
WriteToFile.write(currentLitespeed_conf)
WriteToFile.close()
command = f'mv {tempRulesPath} {file_path}'
ProcessUtilities.executioner(command)
command = f'chmod 644 {file_path} && chown lsadm:lsadm {file_path}'
ProcessUtilities.executioner(command, None, True)
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)