core/source/logic/checks/http_request.py
2023-03-04 16:00:35 +01:00

210 lines
4.6 KiB
Python

class implementation_check_kind_http_request(interface_check_kind):
'''
[implementation]
'''
def parameters_schema(self):
return {
"type": "object",
"additionalProperties": False,
"properties": {
"request": {
"type": "object",
"additionalProperties": False,
"properties": {
"target": {
"description": "URL",
"type": "string"
},
"method": {
"type": "string",
"enum": [
"GET",
"POST"
],
"default": "GET"
}
},
"required": [
"target"
]
},
"timeout": {
"description": "maximum allowed execution time in seconds",
"type": "float",
"default": 5.0
},
"response": {
"type": "object",
"additionalProperties": False,
"properties": {
"status_code": {
"description": "checks whether the response status code is this",
"type": ["null", "integer"],
"default": 200
},
"headers": {
"description": "conjunctively checks header key-value pairs",
"type": "object",
"additionalProperties": {
"description": "header value",
"type": "string"
},
"properties": {
},
"required": [
],
"default": {}
},
"body_part": {
"description": "checks whether the response body contains this string",
"type": "string"
}
},
"required": [
]
},
"strict": {
"description": "whether a violation of this check shall be leveled as critical instead of concerning",
"type": "boolean",
"default": True
},
},
"required": [
"request"
]
}
'''
[implementation]
'''
def normalize_conf_node(self, node):
node_ = dict_merge(
{
"request": {
"method": "GET"
},
"timeout": 5.0,
"response": {
"status_code": 200
},
"strict": True,
},
node,
True
)
allowed_methods = set(["GET", "POST"])
if (node_["request"]["method"] not in allowed_methods):
raise ValueError("invalid HTTP request method: %s" % node_["request"]["method"])
else:
return node_
'''
[implementation]
'''
def run(self, parameters):
if (parameters["request"]["method"] == "GET"):
try:
response = _requests.get(
parameters["request"]["target"],
timeout = parameters["timeout"]
)
error = None
except Exception as error_:
error = error_
response = None
elif (parameters["request"]["method"] == "POST"):
try:
response = _requests.post(
parameters["request"]["target"],
timeout = parameters["timeout"]
)
error = None
except Exception as error_:
error = error_
response = None
else:
raise ValueError("impossible")
if (response is None):
return {
"condition": (
enum_condition.critical
if parameters["strict"] else
enum_condition.concerning
),
"info": {
"request": parameters["request"],
"faults": [
translation_get("checks.http_request.request_failed"),
],
}
}
else:
faults = []
for (key, value, ) in parameters["response"].items():
if (key == "status_code"):
if ((value is None) or (response.status_code == value)):
pass
else:
faults.append(
translation_get(
"checks.http_request.status_code_mismatch",
{
"status_code_actual": ("%u" % response.status_code),
"status_code_expected": ("%u" % value),
}
)
)
elif (key == "headers"):
for (header_key, header_value, ) in value.items():
if (response.headers[header_key] == header_value):
pass
else:
faults.append(
translation_get(
"checks.http_request.header_value_mismatch",
{
"key": header_key,
"value_actual": response.headers[header_key],
"value_expected": header_value,
}
)
)
elif (key == "body_part"):
if (response.text.find(value) >= 0):
pass
else:
faults.append(
translation_get(
"checks.http_request.body_misses_part",
{
"part": value,
}
)
)
else:
raise ValueError("unhandled")
return {
"condition": (
enum_condition.ok
if (len(faults) <= 0) else
(
enum_condition.critical
if parameters["strict"] else
enum_condition.concerning
)
),
"info": {
"request": parameters["request"],
"response": {
"status_code": response.status_code,
# "headers": dict(map(lambda pair: pair, response.headers.items())),
# "body": response.text,
},
"faults": faults,
}
}