File: //opt/imunify360/venv/share/imunify360/scripts/setup_cagefs.py
#!/opt/imunify360/venv/bin/python3
# coding: utf-8
"""
WARNING
cagefs --remount-all can cause high load on customers server and even
temporary outage (See DEF-9491)
Please only add anything to cagefs.mp only if absolutely necessary
"""
import subprocess
import sys
from defence360agent.subsys import clcagefs
CAGEFSMP = "/etc/cagefs/cagefs.mp"
CAGEFSCTL_TOOL = "/usr/sbin/cagefsctl"
ACTUAL_DIRS = {
b"/var/imunify360/files/sigs": {"added_by": "imunify360", "prefix": b"!"},
b"/etc/imunify360/user_config": {
"added_by": "imunify360",
"mode": 0o755,
# every user has his own isolated dir in this dir
"prefix": b"%",
},
b"/var/run/imunify360_user": {"added_by": "imunify360", "mode": 0o1755},
b"/usr/share/imunify360/wp-plugins": {
"added_by": "imunify360",
"mode": 0o1755,
},
}
OBSOLETES_DIRS = [
b"/var/imunify360/malware/signatures",
b"/var/imunify360/scan_report",
b"/var/imunify360/web_quar",
b"/var/lib/clamav",
b"/var/run/defence360agent",
]
def _cagefs_remountall(action_info):
def _read_mp_file():
try:
with open(CAGEFSMP) as f:
return f.read()
except FileNotFoundError:
print("%s file was not found", CAGEFSMP)
return ""
def decorator(fun):
def wrapper(*args, **kwargs):
before = _read_mp_file()
try:
return fun(*args, **kwargs)
finally:
after = _read_mp_file()
if before != after:
print(
"CageFS for Imunify has been %s, remounting..."
% action_info
)
try:
subprocess.call([CAGEFSCTL_TOOL, "--remount-all"])
except Exception as e:
print(
"Something went wrong while executing"
" --remount-all command: %s",
e,
)
else:
print("CageFS for Imunify: no update is required.")
return wrapper
return decorator
def _add_imunify360_dirs():
for path, options in ACTUAL_DIRS.items():
try:
clcagefs.setup_mount_dir_cagefs(
path, remount_cagefs=False, **options
)
except (clcagefs.CagefsMpConflict, EnvironmentError) as e:
sys.stderr.write(
"Failed to setup CageFS with Imunify for path %s: %s\n"
% (path, e)
)
@_cagefs_remountall(action_info="set up")
def _setup_cagefs():
_add_imunify360_dirs()
_remove_obsoleted()
@_cagefs_remountall(action_info="reset")
def _revert_cagefs():
# here we assume that OBSOLETES_DIRS have been already removed
# during the installation step
for path in ACTUAL_DIRS.keys():
try:
clcagefs.remove_mount_dir_cagefs(path, remount_cagefs=False)
except Exception as e:
sys.stderr.write(
"Error during removing %s from cagefs.mp: %s\n" % (path, e)
)
def _remove_obsoleted():
for path in OBSOLETES_DIRS:
try:
clcagefs.remove_mount_dir_cagefs(path, remount_cagefs=False)
except Exception as e:
sys.stderr.write(
"Error during removing %s from cagefs.mp: %s\n" % (path, e)
)
def _main():
if sys.argv[-1] == "--revert":
_revert_cagefs()
else:
_setup_cagefs()
def _is_removemountdircagefs_supported():
return hasattr(clcagefs, "remove_mount_dir_cagefs")
if __name__ == "__main__":
if clcagefs.is_cagefs_present() and _is_removemountdircagefs_supported():
_main()