ansible-base/tools/cfg-man
2025-10-15 08:07:41 +02:00

354 lines
9.1 KiB
Python
Executable file

#!/usr/bin/env python3
import os as _os
import sys as _sys
import json as _json
import argparse as _argparse
def convey(
x,
fs
):
y = x
for f in fs:
y = f(y)
return y
def file_read(
path
):
handle = open(path, "r")
content = handle.read()
handle.close()
return content
class interface_option(object):
def is_filled(self):
raise NotImplementedError()
def cull(self):
raise NotImplementedError()
def propagate(self, function):
raise NotImplementedError()
class class_option_empty(interface_option):
def __init__(self):
pass
def is_filled(self):
return False
def cull(self):
raise ValueError("cull from empty")
def propagate(self, function):
return class_option_empty()
class class_option_filled(interface_option):
def __init__(self, value):
self.value = value
def is_filled(self):
return True
def cull(self):
return self.value
def propagate(self, function):
return function(self.value)
def generate_defaults(
schema_node
):
if ("anyOf" in schema_node):
## todo: o'rly?
return generate_defaults(schema_node["anyOf"][0])
else:
if (not ("type" in schema_node)):
raise ValueError(":?")
else:
if (not (schema_node["type"] == "object")):
if (not ("default" in schema_node)):
return class_option_empty()
else:
return class_option_filled(schema_node["default"])
else:
result = {}
for (key, value, ) in schema_node["properties"].items():
sub_result = generate_defaults(value)
if (not sub_result.is_filled()):
pass
else:
result[key] = sub_result.cull()
return class_option_filled(result)
def generate_overrides(
schema_node
):
if ("anyOf" in schema_node):
## todo: o'rly?
return generate_overrides(schema_node["anyOf"][0])
else:
if (not ("type" in schema_node)):
raise ValueError(":?")
else:
if (not (schema_node["type"] == "object")):
if ("default" in schema_node):
return class_option_empty()
else:
if ("enum" in schema_node):
return class_option_filled(schema_node["enum"][0])
else:
if (schema_node.get("nullable", False)):
return class_option_filled(None)
else:
if (schema_node["type"] == "boolean"):
return class_option_filled(False)
elif (schema_node["type"] == "integer"):
return class_option_filled(0)
elif (schema_node["type"] == "number"):
return class_option_filled(0)
elif (schema_node["type"] == "string"):
return class_option_filled("")
else:
raise ValueError("unhandled type: %s" % schema_node["type"])
else:
result = {}
for (key, value, ) in schema_node["properties"].items():
sub_result = generate_overrides(value)
if (not sub_result.is_filled()):
pass
else:
result[key] = sub_result.cull()
return (
class_option_empty()
if (len(result) <= 0) else
class_option_filled(result)
)
def investigate(
schema,
value
):
flaws = []
if ("anyOf" in schema):
found = False
entries = []
for sub_schema in schema["anyOf"]:
sub_flaws = investigate(sub_schema, value)
if (len(sub_flaws) <= 0):
found = True
break
else:
entries.append({"schema": sub_schema, "flaws": sub_flaws})
if found:
pass
else:
flaws.append({"incident": "not_valid_against_any_sub_schema", "details": {"value": value, "results": entries}})
else:
if (value is None):
if (not schema.get("nullable", False)):
flaws.append({"incident": "not_nullable", "details": {"value": str(type(value))}})
else:
if (not ("type" in schema)):
raise ValueError("unhandled: %s" % _json.dumps(schema))
else:
if (schema["type"] == "boolean"):
if (not (type(value) == bool)):
flaws.append({"incident": "wrong_type", "details": {"shall": str(bool), "is": str(type(value))}})
else:
if (("enum" in schema) and not (value in schema["enum"])):
flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}})
elif (schema["type"] == "integer"):
if (not (type(value) == int)):
flaws.append({"incident": "wrong_type", "details": {"shall": str(int), "is": str(type(value))}})
else:
if (("enum" in schema) and not (value in schema["enum"])):
flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}})
# todo: min,max,step,etc.
elif (schema["type"] == "number"):
if (not (type(value) == int) and not (type(value) == float)):
flaws.append({"incident": "wrong_type", "details": {"shall": str(float), "is": str(type(value))}})
else:
if (("enum" in schema) and not (value in schema["enum"])):
flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}})
# todo: min,max,step,etc.
elif (schema["type"] == "string"):
if (not (type(value) == str)):
flaws.append({"incident": "wrong_type", "details": {"shall": str(str), "is": str(type(value))}})
else:
if (("enum" in schema) and not (value in schema["enum"])):
flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}})
# todo: min,max,pattern,etc.
elif (schema["type"] == "object"):
if (not (type(value) == dict)):
flaws.append({"incident": "wrong_type", "details": {"shall": str(dict), "is": str(type(value))}})
else:
entries = []
for (property_key, property_value, ) in schema["properties"].items():
if (not (property_key in value)):
if (not (property_key in schema.get("required", []))):
pass
else:
flaws.append({"incident": "mandatory_field_missing", "details": {"key": property_key}})
else:
sub_flaws = investigate(property_value, value[property_key])
if (len(sub_flaws) <= 0):
pass
else:
entries.append({"key": property_key, "result": sub_flaws})
if (len(entries) <= 0):
pass
else:
flaws.append({"incident": "field_flaws", "details": entries})
# todo: additionalProperties
# todo: required
else:
raise ValueError("unhandled: %s" % _json.dumps(schema))
return flaws
def validate(
schema,
value
):
investigation = investigate(schema, value)
if False:
_sys.stderr.write(
_json.dumps(
investigation,
indent = "\t"
)
+
"\n"
)
return (
len(investigation)
<=
0
)
def reduce(
schema,
value
):
if False:
_sys.stderr.write(
_json.dumps(
{
"schema": schema,
"value": value,
},
indent = "\t"
)
+
"\n"
)
if ("anyOf" in schema):
for sub_schema in schema["anyOf"]:
if validate(sub_schema, value):
return reduce(sub_schema, value)
else:
pass
raise ValueError("not valid against any sub schema")
else:
if (not ("default" in schema)):
return class_option_filled(value)
else:
if (
(schema["type"] == "boolean")
or
(schema["type"] == "integer")
or
(schema["type"] == "number")
or
(schema["type"] == "string")
):
if (value == schema["default"]):
return class_option_empty()
else:
return class_option_filled(value)
elif (schema["type"] == "array"):
if (value == schema["default"]):
return class_option_empty()
else:
return class_option_filled(value)
elif (schema["type"] == "object"):
if (not (type(value) == dict)):
raise ValueError("dict expected")
else:
value_out = {}
for (property_key, property_value, ) in schema["properties"].items():
sub_result = reduce(property_value, value[property_key])
if (not sub_result.is_filled()):
pass
else:
value_out[property_key] = sub_result.cull()
return class_option_filled(value_out)
# todo: additionalProperties
# toto: required
else:
raise ValueError("unhandled: %s" % _json.dumps(schema))
def role_name_derive(
role_name
):
return role_name.replace("-", "_")
def main(
):
## args
argument_parser = _argparse.ArgumentParser()
argument_parser.add_argument(
"action",
type = str,
choices = [
"defaults",
"overrides",
"reduce",
],
metavar = "<action>",
)
argument_parser.add_argument(
"role",
type = str,
metavar = "<role>",
)
args = argument_parser.parse_args()
## exec
cfg_schema = convey(
args.role,
[
lambda x: _os.path.join("roles", x, "cfg.schema.json"),
file_read,
_json.loads,
]
)
if args.action == "defaults":
raw = generate_defaults(cfg_schema)
key = ("cfg_%s_defaults" % (role_name_derive(args.role)))
result = {key: (raw.cull() if raw.is_filled() else {})}
_sys.stdout.write(_json.dumps(result, indent = "\t") + "\n")
elif args.action == "overrides":
raw = generate_overrides(cfg_schema)
key = ("cfg_%s_overrides" % (role_name_derive(args.role)))
result = {key: (raw.cull() if raw.is_filled() else {})}
_sys.stdout.write(_json.dumps(result, indent = "\t") + "\n")
elif args.action == 'reduce':
cfg = _json.loads(_sys.stdin.read())
result = reduce(cfg_schema, cfg)
if (not result.is_filled()):
print("?")
else:
_sys.stdout.write(_json.dumps(result.cull(), indent = "\t") + "\n")
else:
raise ValueError("invalid action: %s" % args.action)
main()