''' todo: allow_self_signed todo: allow_bad_domain todo: ''' class implementation_check_kind_tls_certificate(interface_check_kind): ''' [implementation] ''' def parameters_schema(self): return { "type": "object", "additionalProperties": False, "properties": { "host": { "type": "string" }, "port": { "type": "integer", "default": 443 }, "strict": { "description": "whether a violation of this check shall be leveled as critical instead of concerning", "type": "boolean", "default": True }, "expiry_threshold": { "description": "in days; allowed amount of valid days before the certificate expires", "type": ["null", "integer"], "default": 7, "minimum": 0 } }, "required": [ "host" ] } ''' [implementation] ''' def normalize_order_node(self, node): if (not "host" in node): raise ValueError("missing mandatory field 'host'") else: return dict_merge( { "strict": True, "port": 443, "expiry_threshold": 7, # "allow_self_signed": False, # "allow_bad_domain": False, }, node ) return node ''' [implementation] ''' def run(self, parameters): faults = [] data = {} context = _ssl.create_default_context() try: socket = _socket.create_connection((parameters["host"], parameters["port"], )) socket_wrapped = context.wrap_socket(socket, server_hostname = parameters["host"]) version = socket_wrapped.version() stuff = socket_wrapped.getpeercert(False) except _ssl.SSLCertVerificationError as error: version = None stuff = None if (stuff is None): faults.append(translation_get("checks.tls_certificate.not_obtainable")) else: # version == "TLSv1.3" expiry_timestamp = _ssl.cert_time_to_seconds(stuff["notAfter"]) current_timestamp = get_current_timestamp() days = _math.ceil((expiry_timestamp - current_timestamp) / (60 * 60 * 24)) data = dict_merge( data, { "expiry_timestamp": expiry_timestamp, "days": days, }, ) if (days <= parameters["expiry_threshold"]): faults.append(translation_get("checks.tls_certificate.expires_soon")) else: pass return { "condition": ( enum_condition.ok if (len(faults) <= 0) else ( enum_condition.critical if parameters["strict"] else enum_condition.concerning ) ), "info": { "host": parameters["host"], "port": parameters["port"], "faults": faults, "data": data, } }