From e8bf8073ca7b05a56a92124628ccbe010ce4d7cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Fra=C3=9F?= Date: Fri, 31 May 2024 11:24:53 +0200 Subject: [PATCH] [mod] revise args and conf --- source/conf.py | 143 +++++++++++++++++------ source/helpers.py | 50 ++++---- source/main.py | 287 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 332 insertions(+), 148 deletions(-) diff --git a/source/conf.py b/source/conf.py index 6ee982a..bc8ea73 100644 --- a/source/conf.py +++ b/source/conf.py @@ -1,26 +1,63 @@ -_conf_data = { - "url": { - "test": { - "scheme": "https", - "host": "api.ote.domrobot.com", - "port": 443, - "path": "jsonrpc/" - }, - "production": { - "scheme": "https", - "host": "api.domrobot.com", - "port": 443, - "path": "jsonrpc/" - } - }, - "environment": "production", - "account": { - "username": None, - "password": None - } -} +_conf_data = None +def conf_schema( +): + return { + "type": "object", + "properties": { + "url": { + "type": "object", + "properties": { + }, + "additionalProperties": { + "type": "object", + "properties": { + "scheme": { + "type": "string", + }, + "host": { + "type": "string", + }, + "port": { + "type": "number", + }, + "path": { + "type": "string", + }, + }, + "additionalProperties": False, + "required": [ + "host", + ] + }, + "required": [ + ] + }, + "environment": { + "type": "string", + }, + "account": { + "type": "object", + "properties": { + "username": { + "type": "string", + }, + "password": { + "type": "string", + }, + }, + "additionalProperties": False, + "required": [ + ] + } + }, + "additionalProperties": False, + "required": [ + ], + } + + def conf_load( path : str ): @@ -28,11 +65,56 @@ def conf_load( if (not _os.path.exists(path)): pass else: - handle = open(path, "r") - content = handle.read() - handle.close() - data = _json.loads(content) - _conf_data = merge(_conf_data, data) + conf_data_raw = _json.loads(file_read(path)) + for pair in conf_data_raw.get("url", {}).items(): + if ("host" in pair[1]): + pass + else: + raise ValueError("flawed conf: missing mandatory value 'host' for url entry '%s'" % pair[0]) + _conf_data = { + "url": convey( + ( + { + "test": { + "scheme": "https", + "host": "api.ote.domrobot.com", + "port": 443, + "path": "jsonrpc/" + }, + "production": { + "scheme": "https", + "host": "api.domrobot.com", + "port": 443, + "path": "jsonrpc/" + } + } + | + conf_data_raw.get("url", {}) + ), + [ + lambda x: x.items(), + lambda pairs: map( + lambda pair: ( + pair[0], + { + "scheme": pair[1].get("scheme", "https"), + "host": pair[1]["host"], + "port": pair[1].get("port", 443), + "path": pair[1].get("path", "jsonrpc/"), + } + ), + pairs + ), + dict, + ] + ), + "environment": conf_data_raw.get("environment", "production"), + "account": { + "username": conf_data_raw.get("account", {}).get("username", None), + "password": conf_data_raw.get("account", {}).get("password", None), + } + } + print(_json.dumps(_conf_data, indent = "\t")) def conf_get( @@ -41,12 +123,3 @@ def conf_get( global _conf_data return path_read(_conf_data, path.split(".")) - -def conf_set( - path : str, - value -): - global _conf_data - path_write(_conf_data, path.split("."), value) - - diff --git a/source/helpers.py b/source/helpers.py index ae420be..f36c4eb 100644 --- a/source/helpers.py +++ b/source/helpers.py @@ -1,3 +1,29 @@ +def convey(x, fs): + y = x + for f in fs: + y = f(y) + return y + + +def string_coin( + template : str, + arguments : dict +): + result = template + for (key, value, ) in arguments.items(): + result = result.replace("{{%s}}" % key, value) + return result + + +def file_read( + path : str +): + handle = open(path, "r") + content = handle.read() + handle.close() + return content + + def log( messsage : str ): @@ -16,30 +42,6 @@ def path_read( return position -def path_write( - thing, - steps : List[str], - value -): - steps_first = steps[:-1] - step_last = steps[-1] - position = thing - for step in steps_first: - if (not (step in position)): - position[step] = {} - position = position[step] - position[step_last] = value - - -def merge( - core, - mantle -): - result = core.copy() - result.update(mantle) - return result - - def http_call( request : dict, ) -> dict: diff --git a/source/main.py b/source/main.py index 4ca10b5..d132237 100644 --- a/source/main.py +++ b/source/main.py @@ -1,12 +1,13 @@ - -def args( +def main( ): + ## args argumentparser = _argparse.ArgumentParser( description = "INWX CLI Frontend" ) argumentparser.add_argument( "-c", "--conf", + type = str, dest = "conf", default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json"), metavar = "", @@ -15,14 +16,16 @@ def args( argumentparser.add_argument( "-e", "--environment", + type = str, dest = "environment", metavar = "", default = None, - help = "environment to use; one of the keys in the 'url' filed of the configuration; overwrites the configuration value", + help = "environment to use; one of the keys in the 'url' file of the configuration; overwrites the configuration value", ) argumentparser.add_argument( "-u", "--username", + type = str, dest = "username", metavar = "", default = None, @@ -31,24 +34,52 @@ def args( argumentparser.add_argument( "-p", "--password", + type = str, dest = "password", metavar = "", default = None, help = "password; overwrites the configuration value", ) - ''' argumentparser.add_argument( - "-d", - "--domain", - dest = "domain", + "-b", + "--domain-base", + type = str, + dest = "domain_base", default = None, - metavar = "", - help = "the domain to work with" + metavar = "", + help = "the domain, which holds the records" + ) + argumentparser.add_argument( + "-n", + "--domain-path", + type = str, + dest = "domain_path", + default = None, + metavar = "", + help = "the record name/sub domain to work with" + ) + argumentparser.add_argument( + "-t", + "--type", + type = str, + dest = "type", + default = None, + metavar = "", + help = "the record type (A, AAAA, TXT, …)" + ) + argumentparser.add_argument( + "-v", + "--value", + type = str, + dest = "value", + default = None, + metavar = "", + help = "value for the record" ) - ''' argumentparser.add_argument( "-x", "--challenge-prefix", + type = str, dest = "challenge_prefix", metavar = "", default = "_acme-challenge", @@ -57,101 +88,179 @@ def args( argumentparser.add_argument( "-w", "--delay", - dest = "delay", type = float, + dest = "delay", default = 60.0, metavar = "", help = "seconds to wait at end of certbot auth hook", ) argumentparser.add_argument( - "action", type = str, - choices = ["info", "list", "save", "delete", "certbot-hook"], + dest = "action", + choices = [ + "conf-schema", + "info", + "list", + "save", + "delete", + "certbot-hook", + ], metavar = "", - help = "action to execute; options: info,list,save,delete,certbot-hook", - ) - argumentparser.add_argument( - "parameter", - nargs = "*", - type = str, - metavar = "", - help = "action specific parameters", + help = string_coin( + "action to execute; options:\n{{options}}", + { + "options": convey( + [ + {"name": "conf-schema", "requirements": []}, + {"name": "info", "requirements": []}, + {"name": "list", "requirements": [""]}, + {"name": "save", "requirements": ["", "", "", ""]}, + {"name": "delete", "requirements": ["", ""]}, + {"name": "certbot-hook", "requirements": []}, + ], + [ + lambda x: map( + lambda entry: string_coin( + "{{name}}{{macro_requirements}}", + { + "name": entry["name"], + "macro_requirements": ( + "" + if (len(entry["requirements"]) <= 0) else + string_coin( + " (requires: {{requirements}})", + { + "requirements": ",".join(entry["requirements"]), + } + ) + ), + } + ), + x + ), + " | ".join, + ] + ) + } + ), ) arguments = argumentparser.parse_args() - return arguments - - -def main( -): - arguments = args() + ## conf conf_load(arguments.conf) - if (not (arguments.environment is None)): conf_set("environment", arguments.environment) - if (not (arguments.username is None)): conf_set("account.username", arguments.username) - if (not (arguments.password is None)): conf_set("account.password", arguments.password) - if (arguments.action == "info"): - result = api_macro_info( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password") - ) - print(_json.dumps(result, indent = "\t")) + ## vars + environment = (arguments.environment or conf_get("environment")) + account_username = (arguments.username or conf_get("account.username")) + account_password = (arguments.password or conf_get("account.password")) + + ## exec + if (arguments.action == "conf-schema"): + print(_json.dumps(conf_schema(), indent = "\t")) + elif (arguments.action == "info"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + result = api_macro_info( + environment, + account_username, + account_password + ) + print(_json.dumps(result, indent = "\t")) elif (arguments.action == "list"): - domain = arguments.parameter[0] - result = api_macro_list( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain - ) - print(_json.dumps(result, indent = "\t")) + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (arguments.domain_base is None): + raise ValueError("domain base required") + else: + result = api_macro_list( + environment, + account_username, + account_password, + arguments.domain_base + ) + print(_json.dumps(result, indent = "\t")) elif (arguments.action == "save"): - domain = arguments.parameter[0] - name = arguments.parameter[1] - type_ = arguments.parameter[2] - content = arguments.parameter[3] - api_macro_save( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain, - name, - type_, - content - ) - # print(_json.dumps(result, indent = "\t")) + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (arguments.domain_base is None): + raise ValueError("domain base required") + else: + if (arguments.domain_base is None): + raise ValueError("domain path required") + else: + if (arguments.type is None): + raise ValueError("type required") + else: + if (arguments.value is None): + raise ValueError("value required") + else: + api_macro_save( + environment, + account_username, + account_password, + arguments.domain_base, + arguments.domain_path, + arguments.type, + arguments.value + ) elif (arguments.action == "delete"): - domain = arguments.parameter[0] - name = arguments.parameter[1] - type_ = (arguments.parameter[2] if (len(arguments.parameter) >= 2) else None) - api_macro_delete( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain, - name, - type_ - ) + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (arguments.domain_base is None): + raise ValueError("domain base required") + else: + if (arguments.domain_base is None): + raise ValueError("domain path required") + else: + api_macro_delete( + environment, + account_username, + account_password, + arguments.domain_base, + arguments.domain_path, + arguments.type + ) elif (arguments.action == "certbot-hook"): - domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") - account = ".".join(domain_full_parts[-2:]) - concern = ".".join(domain_full_parts[:-2]) - domain = account - name = (arguments.challenge_prefix + "." + concern) - type_ = "TXT" - content = _os.environ["CERTBOT_VALIDATION"] - api_macro_save( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain, - name, - type_, - content - ) - _time.sleep(arguments.delay) - # print(_json.dumps(result, indent = "\t")) + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") + account = ".".join(domain_full_parts[-2:]) + concern = ".".join(domain_full_parts[:-2]) + domain = account + name = (arguments.challenge_prefix + "." + concern) + type_ = "TXT" + content = _os.environ["CERTBOT_VALIDATION"] + api_macro_save( + environment, + account_username, + account_password, + domain, + name, + type_, + content + ) + _time.sleep(arguments.delay) + # print(_json.dumps(result, indent = "\t")) else: log("unhandled action '%s'" % (arguments.action, )) @@ -159,5 +268,5 @@ def main( try: main() except ValueError as error: - _sys.stderr.write(str(error) + "\n") + _sys.stderr.write("-- %s\n" % str(error))