''' todo: allow_self_signed todo: allow_bad_domain todo: ''' class implementation_check_kind_tls_certificate(interface_check_kind): ''' [implementation] ''' def parameters_schema(self): return { "type": "object", "additionalProperties": False, "properties": { "host": { "type": "string" }, "port": { "type": "integer", "default": 443 }, "expiry_threshold_concerning": { "description": "in days; allowed amount of valid days before the certificate expires; threshold for condition 'concerning'; 'null' means 'report at no value'", "type": ["null", "integer"], "default": 7, "minimum": 0 }, "expiry_threshold_critical": { "description": "in days; allowed amount of valid days before the certificate expires; threshold for condition 'critical'; 'null' means 'report at no value'", "type": ["null", "integer"], "default": 1, "minimum": 0 }, "expiry_threshold": { "deprecated": True, "description": "", "type": ["null", "integer"], "minimum": 0, "default": None, }, "strict": { "deprecated": True, "description": "", "type": ["null", "boolean"], "default": None, }, }, "required": [ "host", ] } ''' [implementation] ''' def normalize_order_node(self, node): version = ( "v1" if ( (not ("expiry_threshold_concerning" in node)) and (not ("expiry_threshold_critical" in node)) ) else "v2" ) if (version == "v1"): if (not "host" in node): raise ValueError("missing mandatory field 'host'") else: node_ = dict_merge( { "port": 443, "expiry_threshold": 7, "strict": True, # "allow_self_signed": False, # "allow_bad_domain": False, }, node ) return { "port": node_["port"], "expiry_threshold_concerning": ( None if node_["strict"] else node_["expiry_threshold"] ), "expiry_threshold_critical": ( node_["expiry_threshold"] if node_["strict"] else None ), } elif (version == "v2"): if (not "host" in node): raise ValueError("missing mandatory field 'host'") else: node_ = dict_merge( { "port": 443, "expiry_threshold_concerning": 7, "expiry_threshold_critical": 1, # "allow_self_signed": False, # "allow_bad_domain": False, }, node ) return node_ else: raise ValueError("unhandled") ''' [implementation] ''' def run(self, parameters): faults = [] data = {} context = _ssl.create_default_context() condition = enum_condition.ok try: socket = _socket.create_connection((parameters["host"], parameters["port"], )) socket_wrapped = context.wrap_socket(socket, server_hostname = parameters["host"]) version = socket_wrapped.version() stuff = socket_wrapped.getpeercert(False) except _ssl.SSLCertVerificationError as error: version = None stuff = None if (stuff is None): faults.append(translation_get("checks.tls_certificate.not_obtainable")) condition = enum_condition.critical else: # version == "TLSv1.3" expiry_timestamp = _ssl.cert_time_to_seconds(stuff["notAfter"]) current_timestamp = get_current_timestamp() days = _math.ceil((expiry_timestamp - current_timestamp) / (60 * 60 * 24)) data = dict_merge( data, { "expiry_timestamp": expiry_timestamp, "days": days, }, ) if ( (parameters["expiry_threshold_critical"] is not None) and (days <= parameters["expiry_threshold_critical"]) ): faults.append(translation_get("checks.tls_certificate.expires_soon")) condition = enum_condition.critical else: if ( (parameters["expiry_threshold_concerning"] is not None) and (days <= parameters["expiry_threshold_concerning"]) ): faults.append(translation_get("checks.tls_certificate.expires_soon")) condition = enum_condition.concerning else: pass return { "condition": condition, "info": { "host": parameters["host"], "port": parameters["port"], "faults": faults, "data": data, } }