def state_encode(state): return { "timestamp": state["timestamp"], "condition": condition_encode(state["condition"]), "count": state["count"], } def state_decode(state_encoded): return { "timestamp": state_encoded["timestamp"], "condition": condition_decode(state_encoded["condition"]), "count": state_encoded["count"], } def schema_active(): return { "type": "boolean", "default": True } def schema_threshold(): return { "description": "how often a condition has to occur in order to be reported", "type": "integer", "minimum": 1, "default": 3 } def schema_annoy(): return { "description": "whether notifications shall be kept sending after the threshold has been surpassed", "type": "boolean", "default": False } def schema_interval(default): return { "description": "in seconds or as text", "anyOf": [ { "type": "integer", "exclusiveMinimum": 0 }, { "type": "string", "enum": [ "minute", "hour", "day", "week", ] }, ], "default": default, } def schema_schedule(): return { "type": "object", "additionalProperties": False, "properties": { "regular_interval": schema_interval(60 * 60), "attentive_interval": schema_interval(60 * 2), }, "required": [ ] } def schema_notifications(notification_channel_implementations): return { "type": "array", "items": { "anyOf": list( map( lambda pair: { "title": ("check kind '%s'" % pair[0]), "type": "object", "unevaluatedProperties": False, "properties": { "kind": { "type": "string", "enum": [pair[0]] }, "parameters": pair[1].parameters_schema(), }, "required": [ "kind", "parameters" ] }, notification_channel_implementations.items() ) ) }, "default": [ { "kind": "console", "parameters": { } } ] } def schema_root(check_kind_implementations, notification_channel_implementations): return { "type": "object", "additionalProperties": False, "properties": { "defaults": { "description": "default values for checks", "type": "object", "additionalProperties": False, "properties": { "active": schema_active(), "threshold": schema_threshold(), "annoy": schema_annoy(), "schedule": schema_schedule(), "notifications": schema_notifications(notification_channel_implementations), }, "required": [ ] }, "checks": { "type": "array", "items": { "allOf": [ { "description": "should represent a specific check", "type": "object", "unevaluatedProperties": False, "properties": { "name": { "type": "string" }, "title": { "type": "string" }, "active": schema_active(), "threshold": schema_threshold(), "annoy": schema_annoy(), "schedule": schema_schedule(), "notifications": schema_notifications(notification_channel_implementations), }, "required": [ "name", ] }, { "anyOf": list( map( lambda pair: { "title": ("notification channel '%s'" % pair[0]), "type": "object", "unevaluatedProperties": False, "properties": { "kind": { "type": "string", "enum": [pair[0]] }, "parameters": pair[1].parameters_schema(), }, "required": [ "kind", "parameters" ] }, check_kind_implementations.items() ) ), }, ] } } }, "required": [ "defaults", "checks", ] } def conf_normalize_interval(interval_raw): if (type(interval_raw) == int): return interval_raw elif (type(interval_raw) == str): if (interval_raw == "minute"): return (60) elif (interval_raw == "hour"): return (60 * 60) elif (interval_raw == "day"): return (60 * 60 * 24) elif (interval_raw == "week"): return (60 * 60 * 24 * 7) else: raise ValueError("invalid string interval value: %s" % interval_raw) else: raise ValueError("invalid type for interval value") def conf_normalize_schedule(node): node_ = dict_merge( { "regular_interval": (60 * 60), "attentive_interval": (60 * 2), }, node ) return { "regular_interval": conf_normalize_interval(node["regular_interval"]), "attentive_interval": conf_normalize_interval(node["attentive_interval"]), } def conf_normalize_notification(notification_channel_implementations, node): return { "kind": node["kind"], "parameters": notification_channel_implementations[node["kind"]].normalize_conf_node(node["parameters"]), } def conf_normalize_defaults(notification_channel_implementations, node): node_ = dict_merge( { "active": True, "threshold": 3, "annoy": False, "schedule": { "regular_interval": (60 * 60), "attentive_interval": (60 * 2), }, "notifications": [ ], }, node ) return { "active": node_["active"], "threshold": node_["threshold"], "annoy": node_["annoy"], "schedule": node_["schedule"], "notifications": list( map( lambda x: conf_normalize_notification(notification_channel_implementations, x), node_["notifications"] ) ), } def conf_normalize_check(check_kind_implementations, notification_channel_implementations, defaults, node): if ("name" not in node): raise ValueError("missing mandatory 'check' field 'name'") else: if ("kind" not in node): raise ValueError("missing mandatory 'check' field 'kind'") else: if (node["kind"] not in check_kind_implementations): raise ValueError("unhandled kind: %s" % node["kind"]) else: node_ = dict_merge( { "title": node["name"], "active": defaults["active"], "threshold": defaults["threshold"], "annoy": defaults["annoy"], "schedule": defaults["schedule"], "notifications": defaults["notifications"], "parameters": {}, }, node ) return { "name": node_["name"], "title": node_["title"], "active": node_["active"], "threshold": node_["threshold"], "annoy": node_["annoy"], "schedule": conf_normalize_schedule(node_["schedule"]), "notifications": list( map( lambda x: conf_normalize_notification(notification_channel_implementations, x), node_["notifications"] ) ), "kind": node_["kind"], "parameters": check_kind_implementations[node_["kind"]].normalize_conf_node(node_["parameters"]), } def conf_normalize_root(check_kind_implementations, notification_channel_implementations, node): counts = {} for node_ in node["checks"]: if (node_["name"] not in counts): counts[node_["name"]] = 0 counts[node_["name"]] += 1 fails = list(filter(lambda pair: (pair[1] > 1), counts.items())) if (len(fails) > 0): raise ValueError( string_coin( "ambiguous check names: {{names}}", { "names": ",".join(counts.keys()), } ) ) else: return list( map( lambda node_: conf_normalize_check( check_kind_implementations, notification_channel_implementations, conf_normalize_defaults(notification_channel_implementations, node["defaults"]), node_ ), node["checks"] ) ) def main(): ## args argumentparser = _argparse.ArgumentParser( description = "Heimdall-Monitoring-Tool", formatter_class = _argparse.ArgumentDefaultsHelpFormatter ) argumentparser.add_argument( "-c", "--conf-path", type = str, default = "monitoring.hmdl.json", dest = "conf_path", metavar = "", help = "path to the configuration file" ) argumentparser.add_argument( "-f", "--state-path", type = str, default = None, dest = "state_path", metavar = "", help = "path to the state file, which contains information about the recent checks; default: file in temporary directory, unique for the conf-path input" ) argumentparser.add_argument( "-x", "--erase-state", action = "store_true", default = False, dest = "erase_state", help = "whether the state shall be deleted on start; this will cause that all checks are executed" ) argumentparser.add_argument( "-s", "--show-schema", action = "store_true", default = False, dest = "show_schema", help = "print the hmdl JSON schema to stdout and exit" ) argumentparser.add_argument( "-e", "--expose-full-conf", action = "store_true", default = False, dest = "expose_full_conf", help = "only print the extended configuration to stdout and exit (useful for debug purposes)" ) args = argumentparser.parse_args() ## vars id_ = _hashlib.sha256(_os.path.abspath(args.conf_path).encode("ascii")).hexdigest()[:8] state_path = ( args.state_path if (args.state_path is not None) else _os.path.join( _tempfile.gettempdir(), string_coin("monitoring-state-{{id}}.json", {"id": id_}) ) ) ## exec ### load check kind implementations check_kind_implementations = { "script": implementation_check_kind_script(), "file_timestamp": implementation_check_kind_file_timestamp(), "http_request": implementation_check_kind_http_request(), } ### load notification channel implementations notification_channel_implementations = { "console": implementation_notification_channel_console(), "file_touch": implementation_notification_channel_file_touch(), "email": implementation_notification_channel_email(), "libnotify": implementation_notification_channel_libnotify(), } if (args.show_schema): _sys.stdout.write( _json.dumps( schema_root( check_kind_implementations, notification_channel_implementations ), indent = "\t" ) + "\n" ) else: _sys.stderr.write(">> state file path: %s\n" % state_path) ### get configuration data checks = conf_normalize_root( check_kind_implementations, notification_channel_implementations, _json.loads(file_read(args.conf_path)) ) if (args.expose_full_conf): _sys.stdout.write(_json.dumps(checks, indent = "\t") + "\n") _sys.exit(1) else: ### get state data if ( (not _os.path.exists(state_path)) or args.erase_state ): state_data = {} file_write(state_path, _json.dumps(state_data, indent = "\t")) else: state_data = _json.loads(file_read(state_path)) ### iterate through checks for check_data in checks: if (not check_data["active"]): pass else: ### get old state and examine whether the check shall be executed old_item_state = ( None if (check_data["name"] not in state_data) else state_decode(state_data[check_data["name"]]) ) timestamp = get_current_timestamp() due = ( (old_item_state is None) or (old_item_state["condition"] != enum_condition.ok) or ((timestamp - old_item_state["timestamp"]) >= check_data["schedule"]["regular_interval"]) or ( (old_item_state["count"] is not None) and ((timestamp - old_item_state["timestamp"]) >= check_data["schedule"]["attentive_interval"]) ) ) if (not due): pass else: _sys.stderr.write( string_coin( "-- {{check_name}}\n", { "check_name": check_data["name"], } ) ) ### execute check and set new state result = check_kind_implementations[check_data["kind"]].run(check_data["parameters"]) new_item_state = { "timestamp": timestamp, "condition": result["condition"], "count": ( 1 if ( (old_item_state is None) or (old_item_state["condition"] != result["condition"]) ) else ( (old_item_state["count"] + 1) if ( (old_item_state["count"] is not None) and ((old_item_state["count"] + 1) <= check_data["threshold"]) ) else None ) ), } state_data[check_data["name"]] = state_encode(new_item_state) file_write(state_path, _json.dumps(state_data, indent = "\t")) ### send notifications if ( ( (new_item_state["count"] is not None) and (new_item_state["count"] == check_data["threshold"]) ) or ( (new_item_state["count"] is None) and check_data["annoy"] ) ): for notification in check_data["notifications"]: if (notification["kind"] in notification_channel_implementations): notification_channel_implementations[notification["kind"]].notify( notification["parameters"], check_data["name"], check_data, new_item_state, result["output"] ) else: raise ValueError("invalid notification kind: %s" % notification["kind"]) main()