diff --git a/roles/tlscert_acme_inwx/defaults/main.json b/roles/tlscert_acme_inwx/defaults/main.json index 0151289..e660b19 100644 --- a/roles/tlscert_acme_inwx/defaults/main.json +++ b/roles/tlscert_acme_inwx/defaults/main.json @@ -3,7 +3,6 @@ "var_tlscert_acme_inwx_acme_account_key_path": "/etc/letsencrypt/key", "var_tlscert_acme_inwx_inwx_account_username": "REPLACE_ME", "var_tlscert_acme_inwx_inwx_account_password": "REPLACE_ME", - "var_tlscert_acme_inwx_domain_base": "example.org", - "var_tlscert_acme_inwx_domain_path": "foo", + "var_tlscert_acme_inwx_domain": "foo.example.org", "var_tlscert_acme_inwx_ssl_directory": "/etc/ssl" } diff --git a/roles/tlscert_acme_inwx/files/inwx b/roles/tlscert_acme_inwx/files/inwx index 2fdeb41..5f8ac82 100755 --- a/roles/tlscert_acme_inwx/files/inwx +++ b/roles/tlscert_acme_inwx/files/inwx @@ -10,6 +10,32 @@ import argparse as _argparse import pathlib as _pathlib import time as _time +def convey(x, fs): + y = x + for f in fs: + y = f(y) + return y + + +def string_coin( + template : str, + arguments : dict +): + result = template + for (key, value, ) in arguments.items(): + result = result.replace("{{%s}}" % key, value) + return result + + +def file_read( + path : str +): + handle = open(path, "r") + content = handle.read() + handle.close() + return content + + def log( messsage : str ): @@ -28,30 +54,6 @@ def path_read( return position -def path_write( - thing, - steps : List[str], - value -): - steps_first = steps[:-1] - step_last = steps[-1] - position = thing - for step in steps_first: - if (not (step in position)): - position[step] = {} - position = position[step] - position[step_last] = value - - -def merge( - core, - mantle -): - result = core.copy() - result.update(mantle) - return result - - def http_call( request : dict, ) -> dict: @@ -76,29 +78,66 @@ def http_call( return response -_conf_data = { - "url": { - "test": { - "scheme": "https", - "host": "api.ote.domrobot.com", - "port": 443, - "path": "jsonrpc/" - }, - "production": { - "scheme": "https", - "host": "api.domrobot.com", - "port": 443, - "path": "jsonrpc/" - } - }, - "environment": "production", - "account": { - "username": None, - "password": None - } -} +_conf_data = None +def conf_schema( +): + return { + "type": "object", + "properties": { + "url": { + "type": "object", + "properties": { + }, + "additionalProperties": { + "type": "object", + "properties": { + "scheme": { + "type": "string", + }, + "host": { + "type": "string", + }, + "port": { + "type": "number", + }, + "path": { + "type": "string", + }, + }, + "additionalProperties": False, + "required": [ + "host", + ] + }, + "required": [ + ] + }, + "environment": { + "type": "string", + }, + "account": { + "type": "object", + "properties": { + "username": { + "type": "string", + }, + "password": { + "type": "string", + }, + }, + "additionalProperties": False, + "required": [ + ] + } + }, + "additionalProperties": False, + "required": [ + ], + } + + def conf_load( path : str ): @@ -106,11 +145,56 @@ def conf_load( if (not _os.path.exists(path)): pass else: - handle = open(path, "r") - content = handle.read() - handle.close() - data = _json.loads(content) - _conf_data = merge(_conf_data, data) + conf_data_raw = _json.loads(file_read(path)) + for pair in conf_data_raw.get("url", {}).items(): + if ("host" in pair[1]): + pass + else: + raise ValueError("flawed conf: missing mandatory value 'host' for url entry '%s'" % pair[0]) + _conf_data = { + "url": convey( + ( + { + "test": { + "scheme": "https", + "host": "api.ote.domrobot.com", + "port": 443, + "path": "jsonrpc/" + }, + "production": { + "scheme": "https", + "host": "api.domrobot.com", + "port": 443, + "path": "jsonrpc/" + } + } + | + conf_data_raw.get("url", {}) + ), + [ + lambda x: x.items(), + lambda pairs: map( + lambda pair: ( + pair[0], + { + "scheme": pair[1].get("scheme", "https"), + "host": pair[1]["host"], + "port": pair[1].get("port", 443), + "path": pair[1].get("path", "jsonrpc/"), + } + ), + pairs + ), + dict, + ] + ), + "environment": conf_data_raw.get("environment", "production"), + "account": { + "username": conf_data_raw.get("account", {}).get("username", None), + "password": conf_data_raw.get("account", {}).get("password", None), + } + } + print(_json.dumps(_conf_data, indent = "\t")) def conf_get( @@ -119,15 +203,6 @@ def conf_get( global _conf_data return path_read(_conf_data, path.split(".")) - -def conf_set( - path : str, - value -): - global _conf_data - path_write(_conf_data, path.split("."), value) - - def api_call( environment : str, accesstoken : str, @@ -172,6 +247,9 @@ def api_call( return result +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.login +''' def api_macro_login( environment : str, username : str, @@ -195,6 +273,9 @@ def api_macro_login( return response["_accesstoken"] +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.logout +''' def api_macro_logout( environment : str, accesstoken : str @@ -210,6 +291,9 @@ def api_macro_logout( return None +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.info +''' def api_macro_info( environment : str, username : str, @@ -228,6 +312,9 @@ def api_macro_info( return info +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info +''' def api_macro_list( environment : str, username : str, @@ -248,12 +335,17 @@ def api_macro_list( return info +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.createRecord +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.updateRecord +''' def api_macro_save( environment : str, username : str, password : str, - domain : str, - name : str, + domain_base : str, + domain_path, type_ : str, content : str ): @@ -264,12 +356,28 @@ def api_macro_save( "nameserver", "info", { - "domain": domain, + "domain": domain_base, } ) matching = list( filter( - lambda record: ((record["name"] == (name + "." + domain)) and (record["type"] == type_)), + lambda record: ( + ( + ( + (domain_path is None) + and + (record["name"] == domain) + ) + or + ( + (domain_path is not None) + and + (record["name"] == (domain_path + "." + domain_base)) + ) + ) + and + (record["type"] == type_) + ), info["record"] ) ) @@ -281,8 +389,8 @@ def api_macro_save( "nameserver", "createRecord", { - "domain": domain, - "name": name, + "domain": domain_base, + "name": domain_path, "type": type_, "content": content, } @@ -308,153 +416,324 @@ def api_macro_save( -def args( +''' +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info +@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.deleteRecord +''' +def api_macro_delete( + environment : str, + username : str, + password : str, + domain_base : str, + domain_path, + type_ ): - argumentparser = _argparse.ArgumentParser( + accesstoken = api_macro_login(environment, username, password) + info = api_call( + environment, + accesstoken, + "nameserver", + "info", + { + "domain": domain_base, + } + ) + matching = list( + filter( + lambda record: ( + ( + ( + (domain_path is None) + and + (record["name"] == domain_base) + ) + or + ( + (domain_path is not None) + and + (record["name"] == (domain_path + "." + domain_base)) + ) + ) + and + ( + (type_ is None) + or + (record["type"] == type_) + ) + ), + info["record"] + ) + ) + for entry in matching: + id_ = entry["id"] + result = api_call( + environment, + accesstoken, + "nameserver", + "deleteRecord", + { + "id": id_, + } + ) + api_macro_logout(environment, accesstoken) + + +def main( +): + ## args + argument_parser = _argparse.ArgumentParser( description = "INWX CLI Frontend" ) - argumentparser.add_argument( + argument_parser.add_argument( "-c", "--conf", + type = str, dest = "conf", default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json"), metavar = "", help = "path to configuration file", ) - argumentparser.add_argument( + argument_parser.add_argument( "-e", "--environment", + type = str, dest = "environment", metavar = "", default = None, - help = "environment to use; one of the keys in the 'url' filed of the configuration; overwrites the configuration value", + help = "environment to use; one of the keys in the 'url' file of the configuration; overwrites the configuration value", ) - argumentparser.add_argument( + argument_parser.add_argument( "-u", "--username", + type = str, dest = "username", metavar = "", default = None, help = "username; overwrites the configuration value", ) - argumentparser.add_argument( + argument_parser.add_argument( "-p", "--password", + type = str, dest = "password", metavar = "", default = None, help = "password; overwrites the configuration value", ) - ''' - argumentparser.add_argument( + argument_parser.add_argument( "-d", "--domain", + type = str, dest = "domain", default = None, metavar = "", help = "the domain to work with" ) - ''' - argumentparser.add_argument( + argument_parser.add_argument( + "-t", + "--type", + type = str, + dest = "type", + default = None, + metavar = "", + help = "the record type (A, AAAA, TXT, …)" + ) + argument_parser.add_argument( + "-v", + "--value", + type = str, + dest = "value", + default = None, + metavar = "", + help = "value for the record" + ) + argument_parser.add_argument( "-x", "--challenge-prefix", + type = str, dest = "challenge_prefix", metavar = "", default = "_acme-challenge", help = "which subdomain to use for ACME challanges", ) - argumentparser.add_argument( + argument_parser.add_argument( "-w", "--delay", - dest = "delay", type = float, + dest = "delay", default = 60.0, metavar = "", help = "seconds to wait at end of certbot auth hook", ) - argumentparser.add_argument( - "action", + argument_parser.add_argument( type = str, - choices = ["info", "list", "save", "certbot-hook"], + dest = "action", + choices = [ + "conf-schema", + "info", + "list", + "save", + "delete", + "certbot-hook", + ], metavar = "", - help = "action to execute", + help = string_coin( + "action to execute; options:\n{{options}}", + { + "options": convey( + [ + {"name": "conf-schema", "requirements": []}, + {"name": "info", "requirements": []}, + {"name": "list", "requirements": [""]}, + {"name": "save", "requirements": ["", "", ""]}, + {"name": "delete", "requirements": [""]}, + {"name": "certbot-hook", "requirements": []}, + ], + [ + lambda x: map( + lambda entry: string_coin( + "{{name}}{{macro_requirements}}", + { + "name": entry["name"], + "macro_requirements": ( + "" + if (len(entry["requirements"]) <= 0) else + string_coin( + " (requires: {{requirements}})", + { + "requirements": ",".join(entry["requirements"]), + } + ) + ), + } + ), + x + ), + " | ".join, + ] + ) + } + ), ) - argumentparser.add_argument( - "parameter", - nargs = "*", - type = str, - metavar = "", - help = "action specific parameters", - ) - arguments = argumentparser.parse_args() - return arguments + args = argument_parser.parse_args() - -def main( -): - arguments = args() + ## conf + conf_load(args.conf) - conf_load(arguments.conf) - if (not (arguments.environment is None)): conf_set("environment", arguments.environment) - if (not (arguments.username is None)): conf_set("account.username", arguments.username) - if (not (arguments.password is None)): conf_set("account.password", arguments.password) + ## vars + environment = (args.environment or conf_get("environment")) + account_username = (args.username or conf_get("account.username")) + account_password = (args.password or conf_get("account.password")) + domain_parts = (None if (args.domain is None) else args.domain.split(".")) + domain_base = (None if (domain_parts is None) else ".".join(domain_parts[-2:])) + domain_path = (None if ((domain_parts is None) or (len(domain_parts[:-2]) <= 0)) else ".".join(domain_parts[:-2])) - if (arguments.action == "info"): - result = api_macro_info( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password") - ) - print(_json.dumps(result, indent = "\t")) - elif (arguments.action == "list"): - domain = arguments.parameter[0] - result = api_macro_list( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain - ) - print(_json.dumps(result, indent = "\t")) - elif (arguments.action == "save"): - domain = arguments.parameter[0] - name = arguments.parameter[1] - type_ = arguments.parameter[2] - content = arguments.parameter[3] - api_macro_save( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain, - name, - type_, - content - ) - # print(_json.dumps(result, indent = "\t")) - elif (arguments.action == "certbot-hook"): - domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") - account = ".".join(domain_full_parts[-2:]) - concern = ".".join(domain_full_parts[:-2]) - domain = account - name = (arguments.challenge_prefix + "." + concern) - type_ = "TXT" - content = _os.environ["CERTBOT_VALIDATION"] - api_macro_save( - conf_get("environment"), - conf_get("account.username"), - conf_get("account.password"), - domain, - name, - type_, - content - ) - _time.sleep(arguments.delay) - # print(_json.dumps(result, indent = "\t")) + ## exec + if (args.action == "conf-schema"): + print(_json.dumps(conf_schema(), indent = "\t")) + elif (args.action == "info"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + result = api_macro_info( + environment, + account_username, + account_password + ) + print(_json.dumps(result, indent = "\t")) + elif (args.action == "list"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (args.domain_base is None): + raise ValueError("domain base required") + else: + result = api_macro_list( + environment, + account_username, + account_password, + domain_base + ) + print(_json.dumps(result, indent = "\t")) + elif (args.action == "save"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (args.domain is None): + raise ValueError("domain required") + else: + if (args.type is None): + raise ValueError("type required") + else: + if (args.value is None): + raise ValueError("value required") + else: + api_macro_save( + environment, + account_username, + account_password, + domain_base, + domain_path, + args.type, + args.value + ) + elif (args.action == "delete"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + if (args.domain is None): + raise ValueError("domain required") + else: + api_macro_delete( + environment, + account_username, + account_password, + domain_base, + domain_path, + args.type + ) + elif (args.action == "certbot-hook"): + if (account_username is None): + raise ValueError("account username required") + else: + if (account_password is None): + raise ValueError("account password required") + else: + domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".") + domain_base = ".".join(domain_full_parts[-2:]) + domain_path_stripped = ".".join(domain_full_parts[:-2]) + domain_path = (args.challenge_prefix + "." + domain_path_stripped) + type_ = "TXT" + content = _os.environ["CERTBOT_VALIDATION"] + api_macro_save( + environment, + account_username, + account_password, + domain_base, + domain_path, + type_, + content + ) + _time.sleep(args.delay) + # print(_json.dumps(result, indent = "\t")) else: - log("unhandled action '%s'" % (arguments.action, )) + log("unhandled action '%s'" % (args.action, )) try: main() except ValueError as error: - _sys.stderr.write(str(error) + "\n") + _sys.stderr.write("-- %s\n" % str(error)) diff --git a/roles/tlscert_acme_inwx/files/tls-get b/roles/tlscert_acme_inwx/files/tls-get index 4632c14..4d91b16 100755 --- a/roles/tlscert_acme_inwx/files/tls-get +++ b/roles/tlscert_acme_inwx/files/tls-get @@ -3,6 +3,7 @@ import sys as _sys import os as _os import json as _json +import pathlib as _pathlib import argparse as _argparse @@ -22,12 +23,12 @@ def main(): type = str, dest = "conf_path", metavar = "", - default = _os.path.join(_os.environ["HOME"], ".tls-get-conf.json"), + default = _os.path.join(str(_pathlib.Path.home()), ".tls-get-conf.json"), ) argument_parser.add_argument( - type = str, dest = "domain", metavar = "", + help = "the domain for which the TLS certificate shall be generated" ) argument_parser.add_argument( "-t", @@ -65,38 +66,49 @@ def main(): ) args = argument_parser.parse_args() - ## vars conf = _json.loads(file_read(args.conf_path)) le_dir = "/etc/letsencrypt/live" ## exec - command_hook_parts = [ - ("/usr/local/bin/inwx"), - ("--username=\"%s\"" % conf["inwx_account"]["username"]), - ("--password=\"%s\"" % conf["inwx_account"]["password"]), - ("certbot-hook") - ] - command_hook = " ".join(command_hook_parts) - - command_certbot_parts = [ - ("certbot"), - ("certonly"), - ("--email='%s'" % conf["acme_account"]["email"]), - # ("--work-dir='%s'" % conf["misc"]["working_directory"]), - ("--preferred-challenges='dns'"), - ("--non-interactive"), - ("--agree-tos"), - ("--domain='%s'" % args.domain), - ("--manual"), - ("--manual-auth-hook='%s'" % command_hook), - # ("--key-path='%s'" % _os.path.join(args.target_directory, "private", "%s.pem" % args.domain)), - # ("--cert-path='%s'" % _os.path.join(args.target_directory, "certs", "%s.pem" % args.domain)), - # ("--chain-path='%s'" % _os.path.join(args.target_directory, "chains", "%s.pem" % args.domain)), - # ("--fullchain-path='%s'" % _os.path.join(args.target_directory, "fullchains", "%s.pem" % args.domain)), - ] - command_certbot = " ".join(command_certbot_parts) - + command_certbot = " ".join( + [ + "certbot", + "certonly", + ("--email='%s'" % conf["acme_account"]["email"]), + # ("--work-dir='%s'" % conf["misc"]["working_directory"]), + "--preferred-challenges='dns'", + "--non-interactive", + "--agree-tos", + ("--domain='%s'" % args.domain), + "--manual", + ( + "--manual-auth-hook='%s'" + % " ".join( + [ + "/usr/local/bin/inwx", + ("--username=\"%s\"" % conf["inwx_account"]["username"]), + ("--password=\"%s\"" % conf["inwx_account"]["password"]), + "certbot-hook", + ("--delay=%.4f" % args.delay), + ] + ) + ), + ( + "--post-hook='%s'" + % " ".join( + [ + "/usr/local/bin/inwx", + ("--username=\"%s\"" % conf["inwx_account"]["username"]), + ("--password=\"%s\"" % conf["inwx_account"]["password"]), + "delete", + ("--domain=\"%s\"" % (args.challenge_prefix + "." + args.domain)), + ("--type=\"TXT\""), + ] + ) + ), + ] + ) if (args.dry_run): _sys.stdout.write(command_certbot + "\n") else: diff --git a/roles/tlscert_acme_inwx/tasks/main.json b/roles/tlscert_acme_inwx/tasks/main.json index 873aa12..0273e99 100644 --- a/roles/tlscert_acme_inwx/tasks/main.json +++ b/roles/tlscert_acme_inwx/tasks/main.json @@ -76,20 +76,20 @@ "ansible.builtin.cron": { "state": "present", "disabled": false, - "name": "TLS certificate for {{var_tlscert_acme_inwx_domain_path}}.{{var_tlscert_acme_inwx_domain_base}}", + "name": "TLS certificate for {{var_tlscert_acme_inwx_domain}}", "minute": "0", "hour": "2", "day": "1", "month": "*", "weekday": "*", - "job": "echo '/usr/local/bin/tls-get --conf-path=/root/.tls-get-conf.json {{var_tlscert_acme_inwx_domain_path}}.{{var_tlscert_acme_inwx_domain_base}} --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue" + "job": "echo '/usr/local/bin/tls-get {{var_tlscert_acme_inwx_domain}} --conf-path=/root/.tls-get-conf.json --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue" } }, { "name": "run", "become": true, "ansible.builtin.shell": { - "cmd": "echo '/usr/local/bin/tls-get --conf-path=/root/.tls-get-conf.json {{var_tlscert_acme_inwx_domain_path}}.{{var_tlscert_acme_inwx_domain_base}} --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue" + "cmd": "echo '/usr/local/bin/tls-get {{var_tlscert_acme_inwx_domain}} --conf-path=/root/.tls-get-conf.json --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue" } } ]