#!/usr/bin/env python3 import os as _os import sys as _sys import json as _json import argparse as _argparse def convey( x, fs ): y = x for f in fs: y = f(y) return y def file_read( path ): handle = open(path, "r") content = handle.read() handle.close() return content class interface_option(object): def is_filled(self): raise NotImplementedError() def cull(self): raise NotImplementedError() def propagate(self, function): raise NotImplementedError() class class_option_empty(interface_option): def __init__(self): pass def is_filled(self): return False def cull(self): raise ValueError("cull from empty") def propagate(self, function): return class_option_empty() class class_option_filled(interface_option): def __init__(self, value): self.value = value def is_filled(self): return True def cull(self): return self.value def propagate(self, function): return function(self.value) def generate_defaults( schema_node ): if ("anyOf" in schema_node): ## todo: o'rly? return generate_defaults(schema_node["anyOf"][0]) else: if (not ("type" in schema_node)): raise ValueError(":?") else: if (not (schema_node["type"] == "object")): if (not ("default" in schema_node)): return class_option_empty() else: return class_option_filled(schema_node["default"]) else: result = {} for (key, value, ) in schema_node["properties"].items(): sub_result = generate_defaults(value) if (not sub_result.is_filled()): pass else: result[key] = sub_result.cull() return class_option_filled(result) def generate_overrides( schema_node ): if ("anyOf" in schema_node): ## todo: o'rly? return generate_overrides(schema_node["anyOf"][0]) else: if (not ("type" in schema_node)): raise ValueError(":?") else: if (not (schema_node["type"] == "object")): if ("default" in schema_node): return class_option_empty() else: if ("enum" in schema_node): return class_option_filled(schema_node["enum"][0]) else: if (schema_node.get("nullable", False)): return class_option_filled(None) else: if (schema_node["type"] == "boolean"): return class_option_filled(False) elif (schema_node["type"] == "integer"): return class_option_filled(0) elif (schema_node["type"] == "number"): return class_option_filled(0) elif (schema_node["type"] == "string"): return class_option_filled("") else: raise ValueError("unhandled type: %s" % schema_node["type"]) else: result = {} for (key, value, ) in schema_node["properties"].items(): sub_result = generate_overrides(value) if (not sub_result.is_filled()): pass else: result[key] = sub_result.cull() return ( class_option_empty() if (len(result) <= 0) else class_option_filled(result) ) def investigate( schema, value ): flaws = [] if ("anyOf" in schema): found = False entries = [] for sub_schema in schema["anyOf"]: sub_flaws = investigate(sub_schema, value) if (len(sub_flaws) <= 0): found = True break else: entries.append({"schema": sub_schema, "flaws": sub_flaws}) if found: pass else: flaws.append({"incident": "not_valid_against_any_sub_schema", "details": {"value": value, "results": entries}}) else: if (value is None): if (not schema.get("nullable", False)): flaws.append({"incident": "not_nullable", "details": {"value": str(type(value))}}) else: if (not ("type" in schema)): raise ValueError("unhandled: %s" % _json.dumps(schema)) else: if (schema["type"] == "boolean"): if (not (type(value) == bool)): flaws.append({"incident": "wrong_type", "details": {"shall": str(bool), "is": str(type(value))}}) else: if (("enum" in schema) and not (value in schema["enum"])): flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}}) elif (schema["type"] == "integer"): if (not (type(value) == int)): flaws.append({"incident": "wrong_type", "details": {"shall": str(int), "is": str(type(value))}}) else: if (("enum" in schema) and not (value in schema["enum"])): flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}}) # todo: min,max,step,etc. elif (schema["type"] == "number"): if (not (type(value) == int) and not (type(value) == float)): flaws.append({"incident": "wrong_type", "details": {"shall": str(float), "is": str(type(value))}}) else: if (("enum" in schema) and not (value in schema["enum"])): flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}}) # todo: min,max,step,etc. elif (schema["type"] == "string"): if (not (type(value) == str)): flaws.append({"incident": "wrong_type", "details": {"shall": str(str), "is": str(type(value))}}) else: if (("enum" in schema) and not (value in schema["enum"])): flaws.append({"incident": "not_in_enum", "details": {"enum": schema["enum"], "value": value}}) # todo: min,max,pattern,etc. elif (schema["type"] == "object"): if (not (type(value) == dict)): flaws.append({"incident": "wrong_type", "details": {"shall": str(dict), "is": str(type(value))}}) else: entries = [] for (property_key, property_value, ) in schema["properties"].items(): if (not (property_key in value)): if (not (property_key in schema.get("required", []))): pass else: flaws.append({"incident": "mandatory_field_missing", "details": {"key": property_key}}) else: sub_flaws = investigate(property_value, value[property_key]) if (len(sub_flaws) <= 0): pass else: entries.append({"key": property_key, "result": sub_flaws}) if (len(entries) <= 0): pass else: flaws.append({"incident": "field_flaws", "details": entries}) # todo: additionalProperties # todo: required else: raise ValueError("unhandled: %s" % _json.dumps(schema)) return flaws def validate( schema, value ): investigation = investigate(schema, value) if False: _sys.stderr.write( _json.dumps( investigation, indent = "\t" ) + "\n" ) return ( len(investigation) <= 0 ) def reduce( schema, value ): if False: _sys.stderr.write( _json.dumps( { "schema": schema, "value": value, }, indent = "\t" ) + "\n" ) if ("anyOf" in schema): for sub_schema in schema["anyOf"]: if validate(sub_schema, value): return reduce(sub_schema, value) else: pass raise ValueError("not valid against any sub schema") else: if (not ("default" in schema)): return class_option_filled(value) else: if ( (schema["type"] == "boolean") or (schema["type"] == "integer") or (schema["type"] == "number") or (schema["type"] == "string") ): if (value == schema["default"]): return class_option_empty() else: return class_option_filled(value) elif (schema["type"] == "array"): if (value == schema["default"]): return class_option_empty() else: return class_option_filled(value) elif (schema["type"] == "object"): if (not (type(value) == dict)): raise ValueError("dict expected") else: value_out = {} for (property_key, property_value, ) in schema["properties"].items(): sub_result = reduce(property_value, value[property_key]) if (not sub_result.is_filled()): pass else: value_out[property_key] = sub_result.cull() return class_option_filled(value_out) # todo: additionalProperties # toto: required else: raise ValueError("unhandled: %s" % _json.dumps(schema)) def role_name_derive( role_name ): return role_name.replace("-", "_") def main( ): ## args argument_parser = _argparse.ArgumentParser() argument_parser.add_argument( "action", type = str, choices = [ "defaults", "overrides", "reduce", ], metavar = "", ) argument_parser.add_argument( "role", type = str, metavar = "", ) args = argument_parser.parse_args() ## exec cfg_schema = convey( args.role, [ lambda x: _os.path.join("roles", x, "cfg.schema.json"), file_read, _json.loads, ] ) if args.action == "defaults": raw = generate_defaults(cfg_schema) key = ("cfg_%s_defaults" % (role_name_derive(args.role))) result = {key: (raw.cull() if raw.is_filled() else {})} _sys.stdout.write(_json.dumps(result, indent = "\t") + "\n") elif args.action == "overrides": raw = generate_overrides(cfg_schema) key = ("cfg_%s_overrides" % (role_name_derive(args.role))) result = {key: (raw.cull() if raw.is_filled() else {})} _sys.stdout.write(_json.dumps(result, indent = "\t") + "\n") elif args.action == 'reduce': cfg = _json.loads(_sys.stdin.read()) result = reduce(cfg_schema, cfg) if (not result.is_filled()): print("?") else: _sys.stdout.write(_json.dumps(result.cull(), indent = "\t") + "\n") else: raise ValueError("invalid action: %s" % args.action) main()