core/source/check_kinds/http_request.py
2022-11-30 23:14:38 +01:00

193 lines
4.3 KiB
Python

class implementation_check_kind_http_request(interface_check_kind):
'''
[implementation]
'''
def parameters_schema(self):
return {
"type": "object",
"additionalProperties": False,
"properties": {
"request": {
"type": "object",
"additionalProperties": False,
"properties": {
"target": {
"description": "URL",
"type": "string"
},
"method": {
"type": "string",
"enum": [
"GET",
"POST"
],
"default": "GET"
}
},
"required": [
"target"
]
},
"response": {
"type": "object",
"additionalProperties": False,
"properties": {
"status_code": {
"description": "checks whether the response status code is this",
"type": ["null", "integer"],
"default": 200
},
"headers": {
"description": "conjunctively checks header key-value pairs",
"type": "object",
"additionalProperties": {
"description": "header value",
"type": "string"
},
"properties": {
},
"required": [
],
"default": {}
},
"body_part": {
"description": "checks whether the response body contains this string",
"type": "string"
}
},
"required": [
]
},
"as_warning": {
"description": "whether a violation of this check shall be exposed as warning instead of critical; default: false",
"type": "boolean",
"default": False
}
},
"required": [
"request"
]
}
'''
[implementation]
'''
def normalize_conf_node(self, node):
return dict_merge(
{
"request": {
"method": "GET"
},
"response": {
"status_code": 200
},
"as_warning": False,
},
node,
True
)
'''
[implementation]
'''
def run(self, parameters):
if (parameters["request"]["method"] == "GET"):
method_handled = True
try:
response = _requests.get(
parameters["request"]["target"]
)
error = None
except Exception as error_:
error = error_
response = None
elif (parameters["request"]["method"] == "POST"):
method_handled = True
try:
response = _requests.post(
parameters["request"]["target"]
)
error = None
except Exception as error_:
error = error_
response = None
else:
method_handled = False
response = None
if (not method_handled):
return {
"condition": enum_condition.unknown,
"info": ("invalid HTTP request method: %s" % parameters["request"]["method"])
}
else:
if (response is None):
return {
"condition": (
enum_condition.warning
if parameters["as_warning"] else
enum_condition.critical
),
"info": "HTTP request failed",
}
else:
lines = []
for (key, value, ) in parameters["response"].items():
if (key == "status_code"):
if ((value is None) or (response.status_code == value)):
pass
else:
lines.append(
string_coin(
"actual status code {{status_code_actual}} does not match expected value {{status_code_expected}}",
{
"status_code_actual": ("%u" % response.status_code),
"status_code_expected": ("%u" % value),
}
)
)
elif (key == "headers"):
for (header_key, header_value, ) in value.items():
if (response.headers[header_key] == header_value):
pass
else:
lines.append(
string_coin(
"actual header value for key {{key}} is {{value_actual}} and does not match the expected value {{value_expected}}",
{
"key": header_key,
"value_actual": response.headers[header_key],
"value_expected": header_value,
}
)
)
elif (key == "body_part"):
if (response.text.find(value) >= 0):
pass
else:
lines.append(
string_coin(
"body does not contain the expected part '{{part}}'",
{
"part": value,
}
)
)
else:
raise ValueError("unhandled ")
return {
"condition": (
enum_condition.ok
if (len(lines) <= 0) else
(
enum_condition.warning
if parameters["as_warning"] else
enum_condition.critical
)
),
"info": "\n".join(lines),
}