def state_encode(state): return { "timestamp": state["timestamp"], "condition": condition_encode(state["condition"]), "count": state["count"], "last_notification_timestamp": state["last_notification_timestamp"], } def state_decode(state_encoded): return { "timestamp": state_encoded["timestamp"], "condition": condition_decode(state_encoded["condition"]), "count": state_encoded["count"], "last_notification_timestamp": ( state_encoded["last_notification_timestamp"] if ("last_notification_timestamp" in state_encoded) else None ), } def main(): ## setup translation for the first time translation_initialize("en", env_get_language()) ## args argumentparser = _argparse.ArgumentParser( description = translation_get("help.title"), formatter_class = _argparse.ArgumentDefaultsHelpFormatter ) argumentparser.add_argument( "-c", "--conf-path", type = str, default = "monitoring.hmdl.json", dest = "conf_path", metavar = "", help = translation_get("help.args.conf_path"), ) argumentparser.add_argument( "-f", "--state-path", type = str, default = None, dest = "state_path", metavar = "", help = translation_get("help.args.state_path"), ) argumentparser.add_argument( "-m", "--mutex-path", type = str, default = "/tmp/heimdall.mutex", dest = "mutex_path", metavar = "", help = translation_get("help.args.mutex_path"), ) argumentparser.add_argument( "-y", "--send-ok-notifications", action = "store_true", default = False, dest = "send_ok_notifications", help = translation_get("help.args.send_ok_notifications", {"condition_name": translation_get("conditions.ok")}), ) argumentparser.add_argument( "-l", "--language", type = str, choices = localization_data.keys(), default = None, dest = "language", metavar = "", help = translation_get("help.args.language"), ) argumentparser.add_argument( "-x", "--erase-state", action = "store_true", default = False, dest = "erase_state", help = translation_get("help.args.erase_state"), ) argumentparser.add_argument( "-s", "--show-schema", action = "store_true", default = False, dest = "show_schema", help = translation_get("help.args.show_schema"), ) argumentparser.add_argument( "-e", "--expose-full-conf", action = "store_true", default = False, dest = "expose_full_conf", help = translation_get("help.args.expose_full_conf"), ) args = argumentparser.parse_args() ## vars id_ = _hashlib.sha256(_os.path.abspath(args.conf_path).encode("ascii")).hexdigest()[:8] state_path = ( args.state_path if (args.state_path is not None) else _os.path.join( _tempfile.gettempdir(), string_coin("monitoring-state-{{id}}.json", {"id": id_}) ) ) ## exec ### setup translation for the second time if (args.language is not None): translation_initialize("en", args.language) ### load check kind implementations check_kind_implementations = { "script": implementation_check_kind_script(), "file_state": implementation_check_kind_file_state(), "tls_certificate": implementation_check_kind_tls_certificate(), "http_request": implementation_check_kind_http_request(), "generic_remote" : implementation_check_kind_generic_remote(), } ### load notification channel implementations notification_channel_implementations = { "console": implementation_notification_channel_console(), "email": implementation_notification_channel_email(), "libnotify": implementation_notification_channel_libnotify(), } if (args.show_schema): _sys.stdout.write( _json.dumps( conf_schema_root( check_kind_implementations, notification_channel_implementations ), indent = "\t" ) + "\n" ) else: ### get configuration data conf = conf_load( check_kind_implementations, notification_channel_implementations, _os.path.abspath(args.conf_path) ) if (args.expose_full_conf): _sys.stdout.write(_json.dumps(conf, indent = "\t") + "\n") _sys.exit(1) else: _sys.stderr.write( string_coin( "[info] {{label}}: {{path}}\n", { "label": translation_get("misc.state_file_path"), "path": state_path, } ) ) ### mutex check if (_os.path.exists(args.mutex_path)): _sys.stderr.write( string_coin( "[error] {{message}} ({{path}})\n", { "message": translation_get("misc.still_running"), "path": args.mutex_path, } ) ) _sys.exit(2) else: file_write(args.mutex_path, "", {"append": True}) ### get state data if ( (not _os.path.exists(state_path)) or args.erase_state ): state_data = {} file_write(state_path, _json.dumps(state_data, indent = "\t")) else: state_data = _json.loads(file_read(state_path)) ### iterate through checks for check_data in conf["checks"]: if (not check_data["active"]): pass else: ### get old state and examine whether the check shall be executed old_item_state = ( None if (check_data["name"] not in state_data) else state_decode(state_data[check_data["name"]]) ) timestamp = get_current_timestamp() due = ( (old_item_state is None) or (old_item_state["condition"] != enum_condition.ok) or ((timestamp - old_item_state["timestamp"]) >= check_data["schedule"]["regular_interval"]) or ( (old_item_state["count"] is not None) and ((timestamp - old_item_state["timestamp"]) >= check_data["schedule"]["attentive_interval"]) ) ) if (not due): pass else: _sys.stderr.write( string_coin( "-- {{check_name}}\n", { "check_name": check_data["name"], } ) ) ### execute check and set new state try: result = check_kind_implementations[check_data["kind"]].run(check_data["parameters"]) except Exception as error: result = { "condition": enum_condition.unknown, "info": { # "cause": translation_get("misc.check_procedure_failed"), "error": str(error), }, } count = ( 1 if ( (old_item_state is None) or (old_item_state["condition"] != result["condition"]) ) else ( (old_item_state["count"] + 1) if ( (old_item_state["count"] is not None) and ((old_item_state["count"] + 1) <= check_data["threshold"]) ) else None ) ) shall_send_notification = ( ( ( (count is not None) and (count == check_data["threshold"]) ) or ( (count is None) and check_data["annoy"] ) or ( (count is None) and ( (old_item_state is not None) and (old_item_state["last_notification_timestamp"] is not None) and (check_data["schedule"]["reminding_interval"] is not None) and ( (timestamp - old_item_state["last_notification_timestamp"]) >= check_data["schedule"]["reminding_interval"] ) ) ) ) and ( (result["condition"] != enum_condition.ok) or args.send_ok_notifications ) ) new_item_state = { "timestamp": timestamp, "condition": result["condition"], "count": count, "last_notification_timestamp": ( timestamp if shall_send_notification else ( None if (old_item_state is None) else old_item_state["last_notification_timestamp"] ) ), } state_data[check_data["name"]] = state_encode(new_item_state) file_write(state_path, _json.dumps(state_data, indent = "\t")) ### send notifications if shall_send_notification: for notification in check_data["notifications"]: notification_channel_implementations[notification["kind"]].notify( notification["parameters"], check_data["name"], check_data, new_item_state, result["info"] ) _os.remove(args.mutex_path) main()