HEX
Server: LiteSpeed
System: Linux php-prod-1.spaceapp.ru 5.15.0-157-generic #167-Ubuntu SMP Wed Sep 17 21:35:53 UTC 2025 x86_64
User: sport3497 (1034)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/CyberPanel/lib/python3.10/site-packages/cli4/cli4.py
#!/usr/bin/env python
"""Cloudflare API via command line"""

import sys
import re
import getopt
import json

import CloudFlare

from .dump import dump_commands, dump_commands_from_web
from . import converters
from . import examples

my_yaml = None
my_jsonlines = None

class CLI4InternalError(Exception):
    """ errors in cli4 """

def load_and_check_yaml():
    """ load_and_check_yaml() """
    # only called if user uses --yaml flag
    from . import myyaml
    global my_yaml
    try:
        my_yaml = myyaml.myyaml()
    except ImportError:
        sys.exit('cli4: install yaml support via: pip install pyyaml')

def load_and_check_jsonlines():
    """ load_and_check_yaml() """
    # only called if user uses --ndjson flag
    from . import myjsonlines
    global my_jsonlines
    try:
        my_jsonlines = myjsonlines.myjsonlines()
    except ImportError:
        sys.exit('cli4: install jsonlines support via: pip install jsonlines')

def strip_multiline(s):
    """ remove leading/trailing tabs/spaces on each line"""
    # This hack is needed in order to use yaml.safe_load() on JSON text - tabs are not allowed
    return '\n'.join([line.strip() for line in s.splitlines()])

def process_params_content_files(method, binary_file, args):
    """ process_params_content_files() """

    digits_only = re.compile('^-?[0-9]+$')
    floats_only = re.compile('^-?[0-9.]+$')

    params = None
    content = None
    files = None
    # next grab the params. These are in the form of tag=value or =value or @filename
    while len(args) > 0 and ('=' in args[0] or args[0][0] == '@'):
        arg = args.pop(0)
        if arg[0] == '@':
            # a file to be uploaded - used in workers/script etc - only via PUT or POST
            filename = arg[1:]
            if method not in ['PUT','POST']:
                sys.exit('cli4: %s - raw file upload only with PUT or POST' % (filename))
            try:
                if filename == '-':
                    if binary_file:
                        content = sys.stdin.buffer.read()
                    else:
                        content = sys.stdin.read()
                else:
                    if binary_file:
                        with open(filename, 'rb') as f:
                            content = f.read()
                    else:
                        with open(filename, 'r', encoding="utf-8") as f:
                            content = f.read()
            except IOError:
                sys.exit('cli4: %s - file open failure' % (filename))
            continue
        tag_string, value_string = arg.split('=', 1)
        if value_string.lower() == 'true':
            value = True
        elif value_string.lower() == 'false':
            value = False
        elif value_string == '' or value_string.lower() == 'none':
            value = None
        elif value_string[0] == '=' and value_string[1:] == '':
            sys.exit('cli4: %s== - no number value passed' % (tag_string))
        elif value_string[0] == '=' and digits_only.match(value_string[1:]):
            value = int(value_string[1:])
        elif value_string[0] == '=' and floats_only.match(value_string[1:]):
            value = float(value_string[1:])
        elif value_string[0] == '=':
            sys.exit('cli4: %s== - invalid number value passed' % (tag_string))
        elif value_string[0] in '[{' and value_string[-1] in '}]':
            # a json structure - used in pagerules
            try:
                # value = json.loads(value) - changed to yaml code to remove unicode string issues
                load_and_check_yaml()
                # cleanup string before parsing so that yaml.safe.load does not complain about whitespace
                # >>> found character '\t' that cannot start any token <<<
                value_string = strip_multiline(value_string)
                try:
                    value = my_yaml.safe_load(value_string)
                except my_yaml.parser.ParserError:
                    raise ValueError from None
            except ValueError:
                sys.exit('cli4: %s="%s" - can\'t parse json value' % (tag_string, value_string))
        elif value_string[0] == '@':
            # a file to be uploaded - used in dns_records/import etc - only via PUT or POST
            filename = value_string[1:]
            if method not in ['PUT', 'POST']:
                sys.exit('cli4: %s=%s - file upload only with PUT or POST' % (tag_string, filename))
            if files is None:
                files = {}
            if tag_string in files:
                sys.exit('cli4: %s=%s - duplicate name' % (tag_string, filename))
            try:
                if filename == '-':
                    files[tag_string] = sys.stdin
                else:
                    files[tag_string] = open(filename, 'rb')
            except IOError:
                sys.exit('cli4: %s=%s - file open failure' % (tag_string, filename))
            # no need for param code below
            continue
        elif (value_string[0] == '"' and value_string[-1] == '"') or (value_string[0] == '\'' and value_string[-1] == '\''):
            # remove quotes
            value = value_string[1:-1]
        else:
            value = value_string

        if tag_string == '':
            # There's no tag; it's just an unnamed list
            if params is None:
                params = value
            else:
                sys.exit('cli4: %s=%s - param error. Can\'t mix unnamed and named list' %
                         (tag_string, value_string))
        else:
            if params is None:
                params = {}
            tag = tag_string
            try:
                params[tag] = value
            except TypeError:
                sys.exit('cli4: %s=%s - param error. Can\'t mix unnamed and named list' %
                         (tag_string, value_string))

    if content and params:
        sys.exit('cli4: content and params not allowed together')

    if params and files:
        for k,v in params.items():
            files[k] = (None, v)
        params = None
        # sys.exit('cli4: params and files not allowed together')

    if method != 'GET':
        if params:
            content = params
            params = None

    return (params, content, files)

def run_command(cf, method, command, params=None, content=None, files=None):
    """run the command line"""
    # remove leading and trailing /'s
    if command[0] == '/':
        command = command[1:]
    if command[-1] == '/':
        command = command[:-1]

    # break down command into it's seperate pieces
    # these are then checked against the Cloudflare class
    # to confirm there is a method that matches
    parts = command.split('/')

    cmd = []
    identifier1 = None
    identifier2 = None
    identifier3 = None

    hex_only = re.compile('^[0-9a-fA-F]+$')
    waf_rules = re.compile('^[0-9]+[A-Z]*$')
    uuid_value = re.compile('^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$') # 8-4-4-4-12

    m = cf
    for element in parts:
        if element[0] == ':':
            element = element[1:]
            if identifier1 is None:
                if len(element) in [32, 40, 48] and hex_only.match(element):
                    # raw identifier - lets just use it as-is
                    identifier1 = element
                elif len(element) == 36 and uuid_value.match(element):
                    # uuid identifier - lets just use it as-is
                    identifier1 = element
                elif element[0] == ':':
                    # raw string - used for workers script_name - use ::script_name
                    identifier1 = element[1:]
                else:
                    try:
                        if cmd[0] == 'certificates':
                            # identifier1 = convert_certificates_to_identifier(cf, element)
                            identifier1 = converters.convert_zones_to_identifier(cf, element)
                        elif cmd[0] == 'zones':
                            identifier1 = converters.convert_zones_to_identifier(cf, element)
                        elif cmd[0] == 'accounts':
                            identifier1 = converters.convert_accounts_to_identifier(cf, element)
                        elif cmd[0] == 'organizations':
                            identifier1 = converters.convert_organizations_to_identifier(cf, element)
                        elif (cmd[0] == 'user') and (cmd[1] == 'organizations'):
                            identifier1 = converters.convert_organizations_to_identifier(cf, element)
                        elif (cmd[0] == 'user') and (cmd[1] == 'invites'):
                            identifier1 = converters.convert_invites_to_identifier(cf, element)
                        elif (cmd[0] == 'user') and (cmd[1] == 'virtual_dns'):
                            identifier1 = converters.convert_virtual_dns_to_identifier(cf, element)
                        elif (cmd[0] == 'user') and (cmd[1] == 'load_balancers') and (cmd[2] == 'pools'):
                            identifier1 = converters.convert_load_balancers_pool_to_identifier(cf, element)
                        else:
                            raise CLI4InternalError("/%s/%s :NOT CODED YET" % ('/'.join(cmd), element))
                    except CLI4InternalError as e:
                        sys.stderr.write('cli4: /%s - %s\n' % (command, e))
                        raise e
                cmd.append(':' + identifier1)
            elif identifier2 is None:
                if len(element) in [32, 40, 48] and hex_only.match(element):
                    # raw identifier - lets just use it as-is
                    identifier2 = element
                elif len(element) == 36 and uuid_value.match(element):
                    # uuid identifier - lets just use it as-is
                    identifier2 = element
                elif element[0] == ':':
                    # raw string - used for workers script_names
                    identifier2 = element[1:]
                else:
                    try:
                        if (cmd[0] and cmd[0] == 'zones') and (cmd[2] and cmd[2] == 'dns_records'):
                            identifier2 = converters.convert_dns_record_to_identifier(cf, identifier1, element)
                        elif (cmd[0] and cmd[0] == 'zones') and (cmd[2] and cmd[2] == 'custom_hostnames'):
                            identifier2 = converters.convert_custom_hostnames_to_identifier(cf, identifier1, element)
                        else:
                            raise CLI4InternalError("/%s/:%s :NOT CODED YET" % ('/'.join(cmd), element))
                    except CLI4InternalError as e:
                        sys.stderr.write('cli4: /%s - %s\n' % (command, e))
                        raise e
                # identifier2 may be an array - this needs to be dealt with later
                if isinstance(identifier2, list):
                    cmd.append(':' + '[' + ','.join(identifier2) + ']')
                else:
                    cmd.append(':' + identifier2)
                    identifier2 = [identifier2]
            else:
                if len(element) in [32, 40, 48] and hex_only.match(element):
                    # raw identifier - lets just use it as-is
                    identifier3 = element
                elif len(element) == 36 and uuid_value.match(element):
                    # uuid identifier - lets just use it as-is
                    identifier3 = element
                elif waf_rules.match(element):
                    identifier3 = element
                elif element[0] == ':':
                    # raw string - used for workers script_names
                    identifier3 = element[1:]
                else:
                    #  /accounts/:id/storage/kv/namespaces/:id/values/:key_name - it's a strange one!
                    if len(cmd) >= 6 and cmd[0] == 'accounts' and cmd[2] == 'storage' and cmd[3] == 'kv' and cmd[4] == 'namespaces' and cmd[6] == 'values':
                        identifier3 = element
                    else:
                        sys.stderr.write('/%s/:%s :NOT CODED YET\n' % ('/'.join(cmd), element))
                        raise e
        else:
            try:
                m = getattr(m, CloudFlare.CloudFlare.sanitize_verb(element))
                cmd.append(element)
            except AttributeError as e:
                # the verb/element was not found
                sys.stderr.write('cli4: /%s - not found\n' % (command))
                raise e

    results = []
    if identifier2 is None:
        identifier2 = [None]
    for i2 in identifier2:
        try:
            if method == 'GET':
                # no content with a GET call
                r = m.get(identifier1=identifier1,
                          identifier2=i2,
                          identifier3=identifier3,
                          params=params)
            elif method == 'PATCH':
                r = m.patch(identifier1=identifier1,
                            identifier2=i2,
                            identifier3=identifier3,
                            data=content)
            elif method == 'POST':
                r = m.post(identifier1=identifier1,
                           identifier2=i2,
                           identifier3=identifier3,
                           data=content, files=files)
            elif method == 'PUT':
                r = m.put(identifier1=identifier1,
                          identifier2=i2,
                          identifier3=identifier3,
                          data=content, files=files)
            elif method == 'DELETE':
                r = m.delete(identifier1=identifier1,
                             identifier2=i2,
                             identifier3=identifier3,
                             data=content)
            else:
                pass
        except CloudFlare.exceptions.CloudFlareAPIError as e:
            if len(e) > 0:
                # more than one error returned by the API
                for x in e:
                    sys.stderr.write('cli4: /%s - %d %s\n' % (command, int(x), str(x)))
            sys.stderr.write('cli4: /%s - %d %s\n' % (command, int(e), str(e)))
            raise e
        except CloudFlare.exceptions.CloudFlareInternalError as e:
            sys.stderr.write('cli4: InternalError: /%s - %d %s\n' % (command, int(e), str(e)))
            raise e
        except Exception as e:
            sys.stderr.write('cli4: /%s - %s - api error\n' % (command, str(e)))
            raise e

        results.append(r)
    return results

def write_results(results, output):
    """dump the results"""

    if output is None:
        return

    if len(results) == 1:
        results = results[0]

    if isinstance(results, (str, bytes, bytearray)):
        # if the results are a simple string, then it should be dumped directly
        # this is only used for /zones/:id/dns_records/export, workers, and other calls
        # or
        # output is image or audio or video or something like that so we dump directly
        pass
    else:
        # anything more complex (dict, list, etc) should be dumped as JSON/YAML
        if output == 'json':
            try:
                results = json.dumps(results,
                                     indent=4,
                                     sort_keys=True,
                                     ensure_ascii=False,
                                     encoding='utf8')
            except TypeError:
                results = json.dumps(results,
                                     indent=4,
                                     sort_keys=True,
                                     ensure_ascii=False)
        elif output == 'yaml':
            results = my_yaml.safe_dump(results)
        elif output == 'ndjson':
            # NDJSON support seems like a hack. There has to be a better way
            try:
                writer = my_jsonlines.Writer(sys.stdout)
                writer.write_all(results)
                writer.close()
            except (BrokenPipeError, IOError):
                pass
            return
        else:
            # None of the above, so pass thru results except something in byte form
            if not isinstance(results, (bytes, bytearray)):
                results = str(results)

    if results:
        try:
            if isinstance(results, (bytes, bytearray)):
                sys.stdout.buffer.write(results)
            else:
                sys.stdout.write(results)
                if not results.endswith('\n'):
                    sys.stdout.write('\n')
        except (BrokenPipeError, IOError):
            pass

def do_it(args):
    """Cloudflare API via command line"""

    verbose = False
    output = 'json'
    example = False
    raw = False
    do_dump = False
    do_openapi = None
    openapi_url = None
    binary_file = False
    profile = None
    http_headers = None
    warnings = True
    method = 'GET'

    usage = ('usage: cli4 '
             + '[-V|--version] [-h|--help] [-v|--verbose] '
             + '[-e|--examples] '
             + '[-q|--quiet] '
             + '[-j|--json] [-y|--yaml] [-n|--ndjson] [-i|--image] '
             + '[-r|--raw] '
             + '[-d|--dump] '
             + '[-A|--openapi url] '
             + '[-b|--binary] '
             + '[-p|--profile profile-name] '
             + '[-h|--header additional-header] '
             + '[-w|--warnings [True|False]] '
             + '[--get|--patch|--post|--put|--delete] '
             + '[item=value|item=@filename|@filename ...] '
             + '/command ...')

    try:
        opts, args = getopt.getopt(args,
                                   'VhveqjynirdA:bp:h:w:GPOUD',
                                   [
                                       'version', 'help', 'verbose',
                                       'examples',
                                       'quiet',
                                       'json', 'yaml', 'ndjson', 'image',
                                       'raw',
                                       'dump',
                                       'openapi=',
                                       'binary',
                                       'profile=',
                                       'header=',
                                       'warnings=',
                                       'get', 'patch', 'post', 'put', 'delete'
                                   ])
    except getopt.GetoptError:
        sys.exit(usage)
    for opt, arg in opts:
        if opt in ('-V', '--version'):
            sys.exit('Cloudflare library version: %s' % (CloudFlare.__version__))
        if opt in ('-h', '--help'):
            sys.exit(usage)
        elif opt in ('-v', '--verbose'):
            verbose = True
        elif opt in ('-q', '--quiet'):
            output = None
        elif opt in ('-e', '--examples'):
            example = True
        elif opt in ('-j', '--json'):
            output = 'json'
        elif opt in ('-y', '--yaml'):
            load_and_check_yaml()
            output = 'yaml'
        elif opt in ('-n', '--ndjson'):
            load_and_check_jsonlines()
            output = 'ndjson'
        elif opt in ('-i', '--image'):
            output = 'image'
        elif opt in ('-r', '--raw'):
            raw = True
        elif opt in ('-p', '--profile'):
            profile = arg
        elif opt in ('-h', '--header'):
            if http_headers is None:
                http_headers = []
            http_headers.append(arg)
        elif opt in ('-w', '--warnings'):
            if arg is None or arg == '':
                warnings = None
            elif arg.lower() in ('yes', 'true', '1'):
                warnings = True
            elif arg.lower() in ('no', 'false', '0'):
                warnings = False
            else:
                sys.exit('cli4: --warnings takes boolean True/False argument')
        elif opt in ('-d', '--dump'):
            do_dump = True
        elif opt in ('-A', '--openapi'):
            do_openapi = True
            openapi_url = arg if arg != '' else None
        elif opt in ('-b', '--binary'):
            binary_file = True
        elif opt in ('-G', '--get'):
            method = 'GET'
        elif opt in ('-P', '--patch'):
            method = 'PATCH'
        elif opt in ('-O', '--post'):
            method = 'POST'
        elif opt in ('-U', '--put'):
            method = 'PUT'
        elif opt in ('-D', '--delete'):
            method = 'DELETE'

    if example:
        try:
            examples.display()
        except ModuleNotFoundError as e:
            sys.exit(e)
        sys.exit(0)

    try:
        cf = CloudFlare.CloudFlare(debug=verbose, raw=raw, profile=profile, http_headers=http_headers, warnings=warnings)
    except Exception as e:
        sys.exit(e)

    if do_dump:
        a = dump_commands(cf)
        # success - just dump results and exit
        sys.stdout.write(a)
        sys.exit(0)

    if do_openapi:
        try:
            a = dump_commands_from_web(cf, openapi_url)
        except CloudFlare.exceptions.CloudFlareAPIError as e:
            sys.exit('cli4: %s - Failed' % (e))
        # success - just dump results and exit
        sys.stdout.write(a)
        sys.exit(0)

    # next grab the params. These are in the form of tag=value or =value or @filename
    (params, content, files) = process_params_content_files(method, binary_file, args)

    # what's left is the command itself
    if len(args) < 1:
        sys.exit(usage)
    commands = args

    exit_with_error = False
    for command in commands:
        try:
            results = run_command(cf, method, command, params, content, files)
            write_results(results, output)
        except KeyboardInterrupt as e:
            sys.exit('cli4: %s - Interrupted\n' % (command))
        except Exception as e:
            exit_with_error = True

    if exit_with_error:
        sys.exit(1)

def cli4(args):
    """Cloudflare API via command line"""

    do_it(args)
    sys.exit(0)