[mod] role:tlscert_acme_inwx:revise inwx

This commit is contained in:
Christian Fraß 2024-06-01 08:53:52 +02:00
parent 9806adb9ab
commit d18600bf91
4 changed files with 475 additions and 185 deletions

View file

@ -3,7 +3,6 @@
"var_tlscert_acme_inwx_acme_account_key_path": "/etc/letsencrypt/key",
"var_tlscert_acme_inwx_inwx_account_username": "REPLACE_ME",
"var_tlscert_acme_inwx_inwx_account_password": "REPLACE_ME",
"var_tlscert_acme_inwx_domain_base": "example.org",
"var_tlscert_acme_inwx_domain_path": "foo",
"var_tlscert_acme_inwx_domain": "foo.example.org",
"var_tlscert_acme_inwx_ssl_directory": "/etc/ssl"
}

View file

@ -10,6 +10,32 @@ import argparse as _argparse
import pathlib as _pathlib
import time as _time
def convey(x, fs):
y = x
for f in fs:
y = f(y)
return y
def string_coin(
template : str,
arguments : dict
):
result = template
for (key, value, ) in arguments.items():
result = result.replace("{{%s}}" % key, value)
return result
def file_read(
path : str
):
handle = open(path, "r")
content = handle.read()
handle.close()
return content
def log(
messsage : str
):
@ -28,30 +54,6 @@ def path_read(
return position
def path_write(
thing,
steps : List[str],
value
):
steps_first = steps[:-1]
step_last = steps[-1]
position = thing
for step in steps_first:
if (not (step in position)):
position[step] = {}
position = position[step]
position[step_last] = value
def merge(
core,
mantle
):
result = core.copy()
result.update(mantle)
return result
def http_call(
request : dict,
) -> dict:
@ -76,8 +78,83 @@ def http_call(
return response
_conf_data = {
_conf_data = None
def conf_schema(
):
return {
"type": "object",
"properties": {
"url": {
"type": "object",
"properties": {
},
"additionalProperties": {
"type": "object",
"properties": {
"scheme": {
"type": "string",
},
"host": {
"type": "string",
},
"port": {
"type": "number",
},
"path": {
"type": "string",
},
},
"additionalProperties": False,
"required": [
"host",
]
},
"required": [
]
},
"environment": {
"type": "string",
},
"account": {
"type": "object",
"properties": {
"username": {
"type": "string",
},
"password": {
"type": "string",
},
},
"additionalProperties": False,
"required": [
]
}
},
"additionalProperties": False,
"required": [
],
}
def conf_load(
path : str
):
global _conf_data
if (not _os.path.exists(path)):
pass
else:
conf_data_raw = _json.loads(file_read(path))
for pair in conf_data_raw.get("url", {}).items():
if ("host" in pair[1]):
pass
else:
raise ValueError("flawed conf: missing mandatory value 'host' for url entry '%s'" % pair[0])
_conf_data = {
"url": convey(
(
{
"test": {
"scheme": "https",
"host": "api.ote.domrobot.com",
@ -90,27 +167,34 @@ _conf_data = {
"port": 443,
"path": "jsonrpc/"
}
},
"environment": "production",
"account": {
"username": None,
"password": None
}
}
def conf_load(
path : str
):
global _conf_data
if (not _os.path.exists(path)):
pass
else:
handle = open(path, "r")
content = handle.read()
handle.close()
data = _json.loads(content)
_conf_data = merge(_conf_data, data)
|
conf_data_raw.get("url", {})
),
[
lambda x: x.items(),
lambda pairs: map(
lambda pair: (
pair[0],
{
"scheme": pair[1].get("scheme", "https"),
"host": pair[1]["host"],
"port": pair[1].get("port", 443),
"path": pair[1].get("path", "jsonrpc/"),
}
),
pairs
),
dict,
]
),
"environment": conf_data_raw.get("environment", "production"),
"account": {
"username": conf_data_raw.get("account", {}).get("username", None),
"password": conf_data_raw.get("account", {}).get("password", None),
}
}
print(_json.dumps(_conf_data, indent = "\t"))
def conf_get(
@ -119,15 +203,6 @@ def conf_get(
global _conf_data
return path_read(_conf_data, path.split("."))
def conf_set(
path : str,
value
):
global _conf_data
path_write(_conf_data, path.split("."), value)
def api_call(
environment : str,
accesstoken : str,
@ -172,6 +247,9 @@ def api_call(
return result
'''
@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.login
'''
def api_macro_login(
environment : str,
username : str,
@ -195,6 +273,9 @@ def api_macro_login(
return response["_accesstoken"]
'''
@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.logout
'''
def api_macro_logout(
environment : str,
accesstoken : str
@ -210,6 +291,9 @@ def api_macro_logout(
return None
'''
@see https://www.inwx.de/de/help/apidoc/f/ch02.html#account.info
'''
def api_macro_info(
environment : str,
username : str,
@ -228,6 +312,9 @@ def api_macro_info(
return info
'''
@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info
'''
def api_macro_list(
environment : str,
username : str,
@ -248,12 +335,17 @@ def api_macro_list(
return info
'''
@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info
@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.createRecord
@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.updateRecord
'''
def api_macro_save(
environment : str,
username : str,
password : str,
domain : str,
name : str,
domain_base : str,
domain_path,
type_ : str,
content : str
):
@ -264,12 +356,28 @@ def api_macro_save(
"nameserver",
"info",
{
"domain": domain,
"domain": domain_base,
}
)
matching = list(
filter(
lambda record: ((record["name"] == (name + "." + domain)) and (record["type"] == type_)),
lambda record: (
(
(
(domain_path is None)
and
(record["name"] == domain)
)
or
(
(domain_path is not None)
and
(record["name"] == (domain_path + "." + domain_base))
)
)
and
(record["type"] == type_)
),
info["record"]
)
)
@ -281,8 +389,8 @@ def api_macro_save(
"nameserver",
"createRecord",
{
"domain": domain,
"name": name,
"domain": domain_base,
"name": domain_path,
"type": type_,
"content": content,
}
@ -308,153 +416,324 @@ def api_macro_save(
def args(
'''
@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.info
@see https://www.inwx.de/de/help/apidoc/f/ch02s15.html#nameserver.deleteRecord
'''
def api_macro_delete(
environment : str,
username : str,
password : str,
domain_base : str,
domain_path,
type_
):
argumentparser = _argparse.ArgumentParser(
accesstoken = api_macro_login(environment, username, password)
info = api_call(
environment,
accesstoken,
"nameserver",
"info",
{
"domain": domain_base,
}
)
matching = list(
filter(
lambda record: (
(
(
(domain_path is None)
and
(record["name"] == domain_base)
)
or
(
(domain_path is not None)
and
(record["name"] == (domain_path + "." + domain_base))
)
)
and
(
(type_ is None)
or
(record["type"] == type_)
)
),
info["record"]
)
)
for entry in matching:
id_ = entry["id"]
result = api_call(
environment,
accesstoken,
"nameserver",
"deleteRecord",
{
"id": id_,
}
)
api_macro_logout(environment, accesstoken)
def main(
):
## args
argument_parser = _argparse.ArgumentParser(
description = "INWX CLI Frontend"
)
argumentparser.add_argument(
argument_parser.add_argument(
"-c",
"--conf",
type = str,
dest = "conf",
default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json"),
metavar = "<conf>",
help = "path to configuration file",
)
argumentparser.add_argument(
argument_parser.add_argument(
"-e",
"--environment",
type = str,
dest = "environment",
metavar = "<environment>",
default = None,
help = "environment to use; one of the keys in the 'url' filed of the configuration; overwrites the configuration value",
help = "environment to use; one of the keys in the 'url' file of the configuration; overwrites the configuration value",
)
argumentparser.add_argument(
argument_parser.add_argument(
"-u",
"--username",
type = str,
dest = "username",
metavar = "<username>",
default = None,
help = "username; overwrites the configuration value",
)
argumentparser.add_argument(
argument_parser.add_argument(
"-p",
"--password",
type = str,
dest = "password",
metavar = "<password>",
default = None,
help = "password; overwrites the configuration value",
)
'''
argumentparser.add_argument(
argument_parser.add_argument(
"-d",
"--domain",
type = str,
dest = "domain",
default = None,
metavar = "<domain>",
help = "the domain to work with"
)
'''
argumentparser.add_argument(
argument_parser.add_argument(
"-t",
"--type",
type = str,
dest = "type",
default = None,
metavar = "<type>",
help = "the record type (A, AAAA, TXT, …)"
)
argument_parser.add_argument(
"-v",
"--value",
type = str,
dest = "value",
default = None,
metavar = "<value>",
help = "value for the record"
)
argument_parser.add_argument(
"-x",
"--challenge-prefix",
type = str,
dest = "challenge_prefix",
metavar = "<challenge-prefix>",
default = "_acme-challenge",
help = "which subdomain to use for ACME challanges",
)
argumentparser.add_argument(
argument_parser.add_argument(
"-w",
"--delay",
dest = "delay",
type = float,
dest = "delay",
default = 60.0,
metavar = "<delay>",
help = "seconds to wait at end of certbot auth hook",
)
argumentparser.add_argument(
"action",
argument_parser.add_argument(
type = str,
choices = ["info", "list", "save", "certbot-hook"],
dest = "action",
choices = [
"conf-schema",
"info",
"list",
"save",
"delete",
"certbot-hook",
],
metavar = "<action>",
help = "action to execute",
help = string_coin(
"action to execute; options:\n{{options}}",
{
"options": convey(
[
{"name": "conf-schema", "requirements": []},
{"name": "info", "requirements": []},
{"name": "list", "requirements": ["<domain>"]},
{"name": "save", "requirements": ["<domain>", "<type>", "<value>"]},
{"name": "delete", "requirements": ["<domain>"]},
{"name": "certbot-hook", "requirements": []},
],
[
lambda x: map(
lambda entry: string_coin(
"{{name}}{{macro_requirements}}",
{
"name": entry["name"],
"macro_requirements": (
""
if (len(entry["requirements"]) <= 0) else
string_coin(
" (requires: {{requirements}})",
{
"requirements": ",".join(entry["requirements"]),
}
)
argumentparser.add_argument(
"parameter",
nargs = "*",
type = str,
metavar = "<parameters>",
help = "action specific parameters",
),
}
),
x
),
" | ".join,
]
)
arguments = argumentparser.parse_args()
return arguments
}
),
)
args = argument_parser.parse_args()
## conf
conf_load(args.conf)
def main(
):
arguments = args()
## vars
environment = (args.environment or conf_get("environment"))
account_username = (args.username or conf_get("account.username"))
account_password = (args.password or conf_get("account.password"))
domain_parts = (None if (args.domain is None) else args.domain.split("."))
domain_base = (None if (domain_parts is None) else ".".join(domain_parts[-2:]))
domain_path = (None if ((domain_parts is None) or (len(domain_parts[:-2]) <= 0)) else ".".join(domain_parts[:-2]))
conf_load(arguments.conf)
if (not (arguments.environment is None)): conf_set("environment", arguments.environment)
if (not (arguments.username is None)): conf_set("account.username", arguments.username)
if (not (arguments.password is None)): conf_set("account.password", arguments.password)
if (arguments.action == "info"):
## exec
if (args.action == "conf-schema"):
print(_json.dumps(conf_schema(), indent = "\t"))
elif (args.action == "info"):
if (account_username is None):
raise ValueError("account username required")
else:
if (account_password is None):
raise ValueError("account password required")
else:
result = api_macro_info(
conf_get("environment"),
conf_get("account.username"),
conf_get("account.password")
environment,
account_username,
account_password
)
print(_json.dumps(result, indent = "\t"))
elif (arguments.action == "list"):
domain = arguments.parameter[0]
elif (args.action == "list"):
if (account_username is None):
raise ValueError("account username required")
else:
if (account_password is None):
raise ValueError("account password required")
else:
if (args.domain_base is None):
raise ValueError("domain base required")
else:
result = api_macro_list(
conf_get("environment"),
conf_get("account.username"),
conf_get("account.password"),
domain
environment,
account_username,
account_password,
domain_base
)
print(_json.dumps(result, indent = "\t"))
elif (arguments.action == "save"):
domain = arguments.parameter[0]
name = arguments.parameter[1]
type_ = arguments.parameter[2]
content = arguments.parameter[3]
elif (args.action == "save"):
if (account_username is None):
raise ValueError("account username required")
else:
if (account_password is None):
raise ValueError("account password required")
else:
if (args.domain is None):
raise ValueError("domain required")
else:
if (args.type is None):
raise ValueError("type required")
else:
if (args.value is None):
raise ValueError("value required")
else:
api_macro_save(
conf_get("environment"),
conf_get("account.username"),
conf_get("account.password"),
domain,
name,
type_,
content
environment,
account_username,
account_password,
domain_base,
domain_path,
args.type,
args.value
)
# print(_json.dumps(result, indent = "\t"))
elif (arguments.action == "certbot-hook"):
elif (args.action == "delete"):
if (account_username is None):
raise ValueError("account username required")
else:
if (account_password is None):
raise ValueError("account password required")
else:
if (args.domain is None):
raise ValueError("domain required")
else:
api_macro_delete(
environment,
account_username,
account_password,
domain_base,
domain_path,
args.type
)
elif (args.action == "certbot-hook"):
if (account_username is None):
raise ValueError("account username required")
else:
if (account_password is None):
raise ValueError("account password required")
else:
domain_full_parts = _os.environ["CERTBOT_DOMAIN"].split(".")
account = ".".join(domain_full_parts[-2:])
concern = ".".join(domain_full_parts[:-2])
domain = account
name = (arguments.challenge_prefix + "." + concern)
domain_base = ".".join(domain_full_parts[-2:])
domain_path_stripped = ".".join(domain_full_parts[:-2])
domain_path = (args.challenge_prefix + "." + domain_path_stripped)
type_ = "TXT"
content = _os.environ["CERTBOT_VALIDATION"]
api_macro_save(
conf_get("environment"),
conf_get("account.username"),
conf_get("account.password"),
domain,
name,
environment,
account_username,
account_password,
domain_base,
domain_path,
type_,
content
)
_time.sleep(arguments.delay)
_time.sleep(args.delay)
# print(_json.dumps(result, indent = "\t"))
else:
log("unhandled action '%s'" % (arguments.action, ))
log("unhandled action '%s'" % (args.action, ))
try:
main()
except ValueError as error:
_sys.stderr.write(str(error) + "\n")
_sys.stderr.write("-- %s\n" % str(error))

View file

@ -3,6 +3,7 @@
import sys as _sys
import os as _os
import json as _json
import pathlib as _pathlib
import argparse as _argparse
@ -22,12 +23,12 @@ def main():
type = str,
dest = "conf_path",
metavar = "<conf-path>",
default = _os.path.join(_os.environ["HOME"], ".tls-get-conf.json"),
default = _os.path.join(str(_pathlib.Path.home()), ".tls-get-conf.json"),
)
argument_parser.add_argument(
type = str,
dest = "domain",
metavar = "<domain>",
help = "the domain for which the TLS certificate shall be generated"
)
argument_parser.add_argument(
"-t",
@ -65,38 +66,49 @@ def main():
)
args = argument_parser.parse_args()
## vars
conf = _json.loads(file_read(args.conf_path))
le_dir = "/etc/letsencrypt/live"
## exec
command_hook_parts = [
("/usr/local/bin/inwx"),
("--username=\"%s\"" % conf["inwx_account"]["username"]),
("--password=\"%s\"" % conf["inwx_account"]["password"]),
("certbot-hook")
]
command_hook = " ".join(command_hook_parts)
command_certbot_parts = [
("certbot"),
("certonly"),
command_certbot = " ".join(
[
"certbot",
"certonly",
("--email='%s'" % conf["acme_account"]["email"]),
# ("--work-dir='%s'" % conf["misc"]["working_directory"]),
("--preferred-challenges='dns'"),
("--non-interactive"),
("--agree-tos"),
"--preferred-challenges='dns'",
"--non-interactive",
"--agree-tos",
("--domain='%s'" % args.domain),
("--manual"),
("--manual-auth-hook='%s'" % command_hook),
# ("--key-path='%s'" % _os.path.join(args.target_directory, "private", "%s.pem" % args.domain)),
# ("--cert-path='%s'" % _os.path.join(args.target_directory, "certs", "%s.pem" % args.domain)),
# ("--chain-path='%s'" % _os.path.join(args.target_directory, "chains", "%s.pem" % args.domain)),
# ("--fullchain-path='%s'" % _os.path.join(args.target_directory, "fullchains", "%s.pem" % args.domain)),
"--manual",
(
"--manual-auth-hook='%s'"
% " ".join(
[
"/usr/local/bin/inwx",
("--username=\"%s\"" % conf["inwx_account"]["username"]),
("--password=\"%s\"" % conf["inwx_account"]["password"]),
"certbot-hook",
("--delay=%.4f" % args.delay),
]
command_certbot = " ".join(command_certbot_parts)
)
),
(
"--post-hook='%s'"
% " ".join(
[
"/usr/local/bin/inwx",
("--username=\"%s\"" % conf["inwx_account"]["username"]),
("--password=\"%s\"" % conf["inwx_account"]["password"]),
"delete",
("--domain=\"%s\"" % (args.challenge_prefix + "." + args.domain)),
("--type=\"TXT\""),
]
)
),
]
)
if (args.dry_run):
_sys.stdout.write(command_certbot + "\n")
else:

View file

@ -76,20 +76,20 @@
"ansible.builtin.cron": {
"state": "present",
"disabled": false,
"name": "TLS certificate for {{var_tlscert_acme_inwx_domain_path}}.{{var_tlscert_acme_inwx_domain_base}}",
"name": "TLS certificate for {{var_tlscert_acme_inwx_domain}}",
"minute": "0",
"hour": "2",
"day": "1",
"month": "*",
"weekday": "*",
"job": "echo '/usr/local/bin/tls-get --conf-path=/root/.tls-get-conf.json {{var_tlscert_acme_inwx_domain_path}}.{{var_tlscert_acme_inwx_domain_base}} --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue"
"job": "echo '/usr/local/bin/tls-get {{var_tlscert_acme_inwx_domain}} --conf-path=/root/.tls-get-conf.json --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue"
}
},
{
"name": "run",
"become": true,
"ansible.builtin.shell": {
"cmd": "echo '/usr/local/bin/tls-get --conf-path=/root/.tls-get-conf.json {{var_tlscert_acme_inwx_domain_path}}.{{var_tlscert_acme_inwx_domain_base}} --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue"
"cmd": "echo '/usr/local/bin/tls-get {{var_tlscert_acme_inwx_domain}} --conf-path=/root/.tls-get-conf.json --target-directory={{var_tlscert_acme_inwx_ssl_directory}}' > /var/pseudoqueue"
}
}
]