bifroyst/source/helpers/ssh.py
2026-04-29 23:46:44 +02:00

108 lines
3.4 KiB
Python

import helpers.misc as __misc
def sshconf_decode_bool(
value
):
return (value == "yes")
def sshconf_decode(
sshconf
):
state = {
"name": "initial",
"entries": {},
"current_name": None,
"current_entry": None,
}
for line in sshconf.split("\n"):
line_stripped = line.strip()
if (line_stripped == ""):
pass
else:
parts = line_stripped.split(" ")
head = parts[0]
body = " ".join(parts[1:])
if (head == "#"):
state["current_entry"]["extra"] = (
[body]
if ("extra" not in state["current_entry"]) else
(state["current_entry"]["extra"] + [body])
)
elif (head.lower() == "host"):
if ((state["current_name"] is None) or (state["current_entry"] is None)):
pass
else:
state["entries"][state["current_name"]] = state["current_entry"]
state["current_name"] = body
state["current_entry"] = {}
elif (head.lower() == "hostname"):
state["current_entry"]["host"] = body
elif (head.lower() == "port"):
state["current_entry"]["port"] = int(body)
elif (head.lower() == "user"):
state["current_entry"]["username"] = body
elif (head.lower() == "identityfile"):
state["current_entry"]["key_name"] = body
elif (head.lower() == "proxyjump"):
state["current_entry"]["proxy_jump"] = body
elif (head.lower() == "proxycommand"):
state["current_entry"]["proxy_command"] = body
elif (head.lower() == "pubkeyauthentication"):
state["current_entry"]["pub_key_authentication"] = sshconf_decode_bool(body)
elif (head.lower() == "identitiesonly"):
state["current_entry"]["identities_only"] = sshconf_decode_bool(body)
else:
raise ValueError("unhandled sshconf thing: " + head)
state["entries"][state["current_name"]] = state["current_entry"]
state["current_name"] = None
state["current_entry"] = None
return state["entries"]
def sshconf_encode_bool(
value
):
return ("yes" if value else "no")
def sshconf_encode_entry(name, entry, prefix):
lines = []
lines.append(__misc.string_coin("Host {{prefix}}{{name}}", {"prefix": prefix, "name": name}))
if ("host" in entry):
lines.append(__misc.string_coin("\tHostName {{host}}", {"host": entry["host"]}))
if ("port" in entry):
lines.append(__misc.string_coin("\tPort {{port}}", {"port": ("%u" % entry["port"])}))
if ("username" in entry):
lines.append(__misc.string_coin("\tUser {{username}}", {"username": entry["username"]}))
if ("key_name" in entry):
lines.append(__misc.string_coin("\tIdentityFile {{key_path}}", {"key_path": ("~/.ssh/keypairs/%s%s" % (prefix, entry["key_name"]))}))
if ("proxy_jump" in entry):
lines.append(__misc.string_coin("\tProxyJump {{proxy_jump}}", {"proxy_jump": entry["proxy_jump"]}))
if ("proxy_command" in entry):
lines.append(__misc.string_coin("\tProxyCommand {{proxy_command}}", {"proxy_command": entry["proxy_command"]}))
if ("pub_key_authentication" in entry):
lines.append(__misc.string_coin("\tPubKeyAuthentication {{pub_key_authentication}}", {"pub_key_authentication": sshconf_encode_bool(entry["pub_key_authentication"])}))
if ("identities_only" in entry):
lines.append(__misc.string_coin("\tIdentitiesOnly {{identities_only}}", {"identities_only": sshconf_encode_bool(entry["identities_only"])}))
lines.append("")
return "\n".join(lines)
def sshconf_encode(conf):
return "\n".join(
map(
lambda pair: sshconf_encode_entry(
pair[0],
pair[1],
conf["settings"]["prefix"]
),
sorted(
conf["entries"].items(),
key = lambda entry: entry[0]
)
)
)