core/source/logic/main.py
2023-03-03 18:50:21 +01:00

269 lines
6.5 KiB
Python

def state_encode(state):
return {
"timestamp": state["timestamp"],
"condition": condition_encode(state["condition"]),
"count": state["count"],
}
def state_decode(state_encoded):
return {
"timestamp": state_encoded["timestamp"],
"condition": condition_decode(state_encoded["condition"]),
"count": state_encoded["count"],
}
def main():
## setup translation for the first time
translation_initialize("en", env_get_language())
## args
argumentparser = _argparse.ArgumentParser(
description = translation_get("help.title"),
formatter_class = _argparse.ArgumentDefaultsHelpFormatter
)
argumentparser.add_argument(
"-c",
"--conf-path",
type = str,
default = "monitoring.hmdl.json",
dest = "conf_path",
metavar = "<conf-path>",
help = translation_get("help.args.conf_path"),
)
argumentparser.add_argument(
"-f",
"--state-path",
type = str,
default = None,
dest = "state_path",
metavar = "<state-path>",
help = translation_get("help.args.state_path"),
)
argumentparser.add_argument(
"-y",
"--send-ok-notifications",
action = "store_true",
default = False,
dest = "send_ok_notifications",
help = translation_get("help.args.send_ok_notifications", {"condition_name": translation_get("conditions.ok")}),
)
argumentparser.add_argument(
"-l",
"--language",
type = str,
choices = localization_data.keys(),
default = None,
dest = "language",
metavar = "<language>",
help = translation_get("help.args.language"),
)
argumentparser.add_argument(
"-x",
"--erase-state",
action = "store_true",
default = False,
dest = "erase_state",
help = translation_get("help.args.erase_state"),
)
argumentparser.add_argument(
"-s",
"--show-schema",
action = "store_true",
default = False,
dest = "show_schema",
help = translation_get("help.args.show_schema"),
)
argumentparser.add_argument(
"-e",
"--expose-full-conf",
action = "store_true",
default = False,
dest = "expose_full_conf",
help = translation_get("help.args.expose_full_conf"),
)
args = argumentparser.parse_args()
## vars
id_ = _hashlib.sha256(_os.path.abspath(args.conf_path).encode("ascii")).hexdigest()[:8]
state_path = (
args.state_path
if (args.state_path is not None) else
_os.path.join(
_tempfile.gettempdir(),
string_coin("monitoring-state-{{id}}.json", {"id": id_})
)
)
## exec
### setup translation for the second time
if (args.language is not None):
translation_initialize("en", args.language)
### load check kind implementations
check_kind_implementations = {
"script": implementation_check_kind_script(),
"file_state": implementation_check_kind_file_state(),
"http_request": implementation_check_kind_http_request(),
}
### load notification channel implementations
notification_channel_implementations = {
"console": implementation_notification_channel_console(),
"email": implementation_notification_channel_email(),
"libnotify": implementation_notification_channel_libnotify(),
}
if (args.show_schema):
_sys.stdout.write(
_json.dumps(
conf_schema_root(
check_kind_implementations,
notification_channel_implementations
),
indent = "\t"
)
+
"\n"
)
else:
_sys.stderr.write(
string_coin(
"[info] {{label}}: {{path}}\n",
{
"label": translation_get("misc.state_file_path"),
"path": state_path,
}
)
)
### get configuration data
conf = conf_load(
check_kind_implementations,
notification_channel_implementations,
_os.path.abspath(args.conf_path)
)
if (args.expose_full_conf):
_sys.stdout.write(_json.dumps(conf, indent = "\t") + "\n")
_sys.exit(1)
else:
### get state data
if (
(not _os.path.exists(state_path))
or
args.erase_state
):
state_data = {}
file_write(state_path, _json.dumps(state_data, indent = "\t"))
else:
state_data = _json.loads(file_read(state_path))
### iterate through checks
for check_data in conf["checks"]:
if (not check_data["active"]):
pass
else:
### get old state and examine whether the check shall be executed
old_item_state = (
None
if (check_data["name"] not in state_data) else
state_decode(state_data[check_data["name"]])
)
timestamp = get_current_timestamp()
due = (
(old_item_state is None)
or
(old_item_state["condition"] != enum_condition.ok)
or
((timestamp - old_item_state["timestamp"]) >= check_data["schedule"]["regular_interval"])
or
(
(old_item_state["count"] is not None)
and
((timestamp - old_item_state["timestamp"]) >= check_data["schedule"]["attentive_interval"])
)
)
if (not due):
pass
else:
_sys.stderr.write(
string_coin(
"-- {{check_name}}\n",
{
"check_name": check_data["name"],
}
)
)
### execute check and set new state
try:
result = check_kind_implementations[check_data["kind"]].run(check_data["parameters"])
except Exception as error:
result = {
"condition": enum_condition.unknown,
"info": {
# "cause": translation_get("misc.check_procedure_failed"),
"error": str(error),
},
}
new_item_state = {
"timestamp": timestamp,
"condition": result["condition"],
"count": (
1
if (
(old_item_state is None)
or
(old_item_state["condition"] != result["condition"])
) else
(
(old_item_state["count"] + 1)
if (
(old_item_state["count"] is not None)
and
((old_item_state["count"] + 1) <= check_data["threshold"])
) else
None
)
),
}
state_data[check_data["name"]] = state_encode(new_item_state)
file_write(state_path, _json.dumps(state_data, indent = "\t"))
### send notifications
if (
(
(
(new_item_state["count"] is not None)
and
(new_item_state["count"] == check_data["threshold"])
)
or
(
(new_item_state["count"] is None)
and
check_data["annoy"]
)
)
and
(
(new_item_state["condition"] != enum_condition.ok)
or
args.send_ok_notifications
)
):
for notification in check_data["notifications"]:
notification_channel_implementations[notification["kind"]].notify(
notification["parameters"],
check_data["name"],
check_data,
new_item_state,
result["info"]
)
main()