''' todo: allow_self_signed todo: allow_bad_domain todo: ''' class implementation_check_kind_tls_certificate(interface_check_kind): ''' [implementation] ''' def parameters_schema(self): return { "type": "object", "additionalProperties": False, "properties": { "host": { "type": "string" }, "port": { "type": "integer", "default": 443 }, "strict": { "description": "whether a violation of this check shall be leveled as critical instead of concerning", "type": "boolean", "default": True }, "expiry_threshold": { "description": "in days; allowed amount of valid days before the certificate expires", "type": ["null", "integer"], "default": 7, "minimum": 0 } }, "required": [ "host" ] } ''' [implementation] ''' def normalize_conf_node(self, node): if (not "host" in node): raise ValueError("missing mandatory field 'host'") else: return dict_merge( { "strict": True, "port": 443, "expiry_threshold": 7, # "allow_self_signed": False, # "allow_bad_domain": False, }, node ) return node ''' [implementation] ''' def run(self, parameters): context = _ssl.create_default_context() try: socket = _socket.create_connection((parameters["host"], parameters["port"], )) socket_wrapped = context.wrap_socket(socket, server_hostname = parameters["host"]) version = socket_wrapped.version() data = socket_wrapped.getpeercert(False) except _ssl.SSLCertVerificationError as error: version = None data = None if (data is None): return { "condition": ( enum_condition.critical if parameters["strict"] else enum_condition.concerning ), "info": { "host": parameters["host"], "port": parameters["port"], "faults": [ translation_get("checks.tls_certificate.not_obtainable"), ], "data": { }, } } else: # version == "TLSv1.3" expiry_timestamp = _ssl.cert_time_to_seconds(data["notAfter"]) current_timestamp = get_current_timestamp() days = _math.ceil((expiry_timestamp - current_timestamp) / (60 * 60 * 24)) if (days <= parameters["expiry_threshold"]): return { "condition": ( enum_condition.critical if parameters["strict"] else enum_condition.concerning ), "info": { "host": parameters["host"], "port": parameters["port"], "faults": [ translation_get("checks.tls_certificate.expires_soon"), ], "data": { "expiry_timestamp": expiry_timestamp, "days": days, }, } } else: return { "condition": enum_condition.ok, "info": { } }