core/source/logic/checks/file_state.py

169 lines
3.7 KiB
Python
Raw Normal View History

class implementation_check_kind_file_state(interface_check_kind):
'''
[implementation]
'''
def parameters_schema(self):
return {
"type": "object",
"additionalProperties": False,
"properties": {
"path": {
"type": "string"
},
"strict": {
"description": "whether a violation of this check shall be leveled as critical instead of concerning",
"type": "boolean",
"default": True
},
"exist": {
"description": "whether the file is supposed to exist or not",
"type": "boolean",
"default": True
},
"age_threshold": {
"description": "in seconds; ignored if 'exist' is set to false",
"type": ["null", "integer"],
"exclusiveMinimum": 0,
"default": None,
},
"size_threshold": {
"description": "in bytes; ignored if 'exist' is set to false",
"type": "integer",
"exclusiveMinimum": 0,
"default": None,
},
},
"required": [
"path"
]
}
'''
[implementation]
'''
def normalize_conf_node(self, node):
if ("path" not in node):
raise ValueError("missing mandatory field 'path'")
else:
return dict_merge(
{
"strict": True,
"exist": True,
"age_threshold": None,
"size_threshold": None,
},
node
)
'''
[implementation]
'''
def run(self, parameters):
exists = _os.path.exists(parameters["path"])
if (parameters["exist"]):
if (parameters["exist"] and not exists):
return {
"condition": (
enum_condition.critical
if parameters["strict"] else
enum_condition.warning
),
"info": {
"path": parameters["path"],
"faults": [
translation_get("checks.file_state.missing"),
],
"data": {
},
}
}
else:
faults = []
data = {}
stat = _os.stat(parameters["path"])
## age
if True:
if (parameters["age_threshold"] is None):
pass
else:
timestamp_this = get_current_timestamp()
timestamp_that = int(stat.st_atime)
age = (timestamp_this - timestamp_that)
if (age >= 0):
pass
else:
faults.append(translation_get("checks.file_state.timestamp_implausible"))
if (age <= parameters["age_threshold"]):
pass
else:
faults.append(translation_get("checks.file_state.too_old"))
data = dict_merge(
data,
{
"timestamp_of_checking_instance": timestamp_this,
"timestamp_of_file": timestamp_that,
"age_value_in_seconds": age,
"age_threshold_in_seconds": parameters["age_threshold"],
}
)
## size
if True:
if (parameters["size_threshold"] is None):
pass
else:
size = stat.st_size
if (size <= parameters["size_threshold"]):
pass
else:
faults.append(translation_get("checks.file_state.too_big"))
data = dict_merge(
data,
{
"size_value_in_bytes": size,
"size_threshold_in_bytes": parameters["size_threshold_in_bytes"],
}
)
return {
"condition": (
enum_condition.ok
if (len(faults) == 0) else
(
enum_condition.critical
if parameters["strict"] else
enum_condition.warning
)
),
"info": {
"path": parameters["path"],
"faults": faults,
"data": data,
}
}
else:
if (not exists):
return {
"condition": (
enum_condition.critical
if parameters["strict"] else
enum_condition.warning
),
"info": {
"path": parameters["path"],
"faults": [
translation_get("checks.file_state.exists")
],
"data": {
},
}
}
else:
return {
"condition": enum_condition.ok,
"info": {
}
}