[mod] rewrite to python; no third party dependencies

This commit is contained in:
Christian Fraß 2020-05-04 22:06:03 +02:00
parent bd6dd18774
commit 2f54c4cfa4
11 changed files with 473 additions and 171 deletions

2
.gitignore vendored
View file

@ -1 +1 @@
node_modules/ build/

135
inwx.js
View file

@ -1,135 +0,0 @@
/**
* @param {string} username
* @param {string} password
* @param {(exec : (category : string, action : string, input : any)=>Promise<any>)=>Promise<void>} core
* @return {Promise<void>}
*/
async function callwrap(
username,
password,
core
)
{
return (
new Promise(
(resolve, reject) => {
require("inwx")(
{"api": "production", "user": username, "password": password},
async (api) => {
await core(
(category, action, input) => (
new Promise(
(resolve_, reject_) => {
api.call(category, action, input, (output) => {resolve_(output);});
}
)
)
);
resolve(undefined);
}
);
}
)
);
}
/**
* @param {string} syntax
*/
function syntaxerror(
syntax
)
{
console.error(`SYNTAX: inwx ${syntax}`);
process.exit(1);
}
/**
* @param {string} message
*/
function log(
message
)
{
console.info("--", message);
}
/**
* @param {Array<string>} args
*/
async function main(
args
)
{
const command = ((args.length >= 1) ? args.shift() : syntaxerror("<command> [<arg1> [<arg2> […]]]"));
switch (command) {
default: {
console.error("unhandled");
break;
}
case "info": {
const syntax_ = "info <username> <password>";
const username = ((args.length >= 1) ? args.shift() : syntaxerror(syntax));
const password = ((args.length >= 1) ? args.shift() : syntaxerror(syntax));
callwrap(
username,
password,
async (exec) => {
const response = await exec("account", "info", {});
console.info(response);
}
);
break;
}
case "save": {
const syntax_ = "save <username> <password> <domain> <name> <type> <content>";
const username = ((args.length >= 1) ? args.shift() : syntaxerror(syntax));
const password = ((args.length >= 1) ? args.shift() : syntaxerror(syntax));
const domain = ((args.length >= 1) ? args.shift() : syntaxerror(syntax_));
const name = ((args.length >= 1) ? args.shift() : syntaxerror(syntax_));
const type = ((args.length >= 1) ? args.shift() : syntaxerror(syntax_));
const content = args.join(" ");
callwrap(
username,
password,
async (exec) => {
const response1 = await exec("nameserver", "info", {"domain": domain});
const matching = response1["record"].filter(
(record) => (
(record["name"] === (name + "." + domain))
&&
(record["type"] === type)
)
);
switch (matching.length) {
case 0: {
const result = await exec("nameserver", "createRecord", {"domain": domain, "name": name, "type": type, "content": content});
const id = result["id"];
log(`created record #${id}`);
break;
}
case 1: {
const id = matching[0]["id"];
const response2 = await exec("nameserver", "updateRecord", {"id": id, "content": content});
log(`updated record #${id}`);
break;
}
default: {
log(`found multiple records with this name and type`);
break;
}
}
}
);
}
}
}
main(process.argv.slice(2));

23
makefile Normal file
View file

@ -0,0 +1,23 @@
dir_source := source
dir_build := build
cmd_md := mkdir -p
cmd_log := echo "--"
cmd_cat := cat
cmd_chmod := chmod
_default: ${dir_build}/inwx
.PHONY: _default
${dir_build}/inwx: \
${dir_source}/head.py \
${dir_source}/helpers.py \
${dir_source}/conf.py \
${dir_source}/core.py \
${dir_source}/macros.py \
${dir_source}/main.py
@ ${cmd_log} "building …"
@ ${cmd_md} ${dir_build}
@ ${cmd_cat} $^ > $@
@ ${cmd_chmod} +x $@

33
package-lock.json generated
View file

@ -1,33 +0,0 @@
{
"requires": true,
"lockfileVersion": 1,
"dependencies": {
"inwx": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/inwx/-/inwx-1.0.0.tgz",
"integrity": "sha1-raLy07VMrrF65TiooP8SDYrkIGg=",
"requires": {
"xmlrpc": ">=1.1.0"
}
},
"sax": {
"version": "1.2.4",
"resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz",
"integrity": "sha512-NqVDv9TpANUjFm0N8uM5GxL36UgKi9/atZw+x7YFnQ8ckwFGKrl4xX4yWtrey3UJm5nP1kUbnYgLopqWNSRhWw=="
},
"xmlbuilder": {
"version": "8.2.2",
"resolved": "https://registry.npmjs.org/xmlbuilder/-/xmlbuilder-8.2.2.tgz",
"integrity": "sha1-aSSGc0ELS6QuGmE2VR0pIjNap3M="
},
"xmlrpc": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/xmlrpc/-/xmlrpc-1.3.2.tgz",
"integrity": "sha1-JrLqNHhI0Ciqx+dRS1NRl23j6D0=",
"requires": {
"sax": "1.2.x",
"xmlbuilder": "8.2.x"
}
}
}
}

View file

@ -1,3 +1,46 @@
- `npm install inwx` # Description
- use `node inwx.js save <username> <password> <domain> <name> <type> <content>` for creating/updating a DNS-record
A simple CLI client for the API of [INWX](inwx.de)
# Usage
## Credentials
For most API calls it is necessary to provide login information. There are two ways to do this:
### Via command line arguments
- `--username` for specifying the username of the account
- `--password` for specifying the username of the account
## Via configuration file
- the location of a configuration file can be specified via `--conf`
- the default location is `~/.inwx-conf.json`
- a minial configuration file for specifying the credentials would look as follows:
{
"account": {
"username": "___",
"password": "___"
}
}
## Commands
### `list`
- synopsis: `inwx list <domain>`
- description: for listing the records of a domain
### `save`
- synopsis: `inwx save <domain> <name> <type> <content>`
- description: for creating or updating a records of a domain
- example: `inwx save example.org dat TXT 'foo bar'`

52
source/conf.py Normal file
View file

@ -0,0 +1,52 @@
_conf_data = {
"url": {
"test": {
"protocol": "https",
"host": "api.ote.domrobot.com",
"port": 443,
"path": "jsonrpc/"
},
"production": {
"protocol": "https",
"host": "api.domrobot.com",
"port": 443,
"path": "jsonrpc/"
}
},
"environment": "production",
"account": {
"username": None,
"password": None
}
}
def conf_load(
path : str
):
global _conf_data
if (not _os.path.exists(path)):
pass
else:
handle = open(path, "r")
content = handle.read()
handle.close()
data = _json.loads(content)
_conf_data = merge(_conf_data, data)
def conf_get(
path : str
):
global _conf_data
return path_read(_conf_data, path.split("."))
def conf_set(
path : str,
value
):
global _conf_data
path_write(_conf_data, path.split("."), value)

44
source/core.py Normal file
View file

@ -0,0 +1,44 @@
def api_call(
environment : str,
accesstoken : str,
category : str,
action : str,
data,
):
url = conf_get("url." + environment)
# input_["lang"] = "de"
request_headers = {
"Content-Type": "application/json",
}
if (accesstoken is not None):
request_headers["Cookie"] = ("domrobot=%s" % (accesstoken, ))
else:
pass
request_data_decoded = {
"method": (category + "." + action),
"params": data,
}
request = {
"url": url,
"method": "POST",
"headers": request_headers,
"data": _json.dumps(request_data_decoded),
}
# log("[>>] %s" % _json.dumps(request, indent = "\t"))
response = http_call(request)
# log("[<<] %s" % _json.dumps(response, indent = "\t"))
if (not (response["status"] == 200)):
raise ValueError("API call failed with status %u: %s" % (response["status"], response["data"], ))
else:
output_data_decoded = _json.loads(response["data"])
result = (output_data_decoded["resData"] if ("resData" in output_data_decoded) else {})
if ("Set-Cookie" in response["headers"]):
result["_accesstoken"] = response["headers"]["Set-Cookie"].split("; ")[0].split("=")[1]
else:
pass
if (output_data_decoded["code"] == 2002):
raise ValueError("wrong use: %s" % str(output_data_decoded))
else:
return result

12
source/head.py Normal file
View file

@ -0,0 +1,12 @@
#!/usr/bin/env python3
from typing import List
import os as _os
import sys as _sys
import json as _json
import http.client as _http_client
import argparse as _argparse
import pathlib as _pathlib

66
source/helpers.py Normal file
View file

@ -0,0 +1,66 @@
def log(
messsage : str
):
_sys.stderr.write("-- %s\n" % messsage)
def path_read(
thing,
steps : List[str]
):
position = thing
for step in steps:
if (not (step in position)):
raise ValueError("missing key '%s'" % ".".join(steps))
position = position[step]
return position
def path_write(
thing,
steps : List[str],
value
):
steps_first = steps[:-1]
step_last = steps[-1]
position = thing
for step in steps_first:
if (not (step in position)):
position[step] = {}
position = position[step]
position[step_last] = value
def merge(
core,
mantle
):
result = core.copy()
result.update(mantle)
return result
def http_call(
request : dict,
) -> dict:
connection = (
{
"http": (lambda: _http_client.HTTPConnection(request["url"]["host"], request["url"]["port"])),
"https": (lambda: _http_client.HTTPSConnection(request["url"]["host"], request["url"]["port"])),
}[request["url"]["protocol"]]
)()
connection.request(
request["method"],
("/" + request["url"]["path"]),
request["data"],
request["headers"]
)
response_ = connection.getresponse()
response = {
"status": response_.status,
"headers": dict(response_.getheaders()),
"data": response_.read(),
}
return response

135
source/macros.py Normal file
View file

@ -0,0 +1,135 @@
def api_macro_login(
environment : str,
username : str,
password : str
):
if ((username is None) or (password is None)):
raise ValueError("username or password not given")
else:
response = (
api_call(
environment,
None,
"account",
"login",
{
"user": username,
"pass": password,
}
)
)
return response["_accesstoken"]
def api_macro_logout(
environment : str,
accesstoken : str
):
response = api_call(
environment,
accesstoken,
"account",
"logout",
{
}
)
return None
def api_macro_info(
environment : str,
username : str,
password : str
):
accesstoken = api_macro_login(environment, username, password)
info = api_call(
environment,
accesstoken,
"account",
"info",
{
}
)
api_macro_logout(environment, accesstoken)
return info
def api_macro_list(
environment : str,
username : str,
password : str,
domain : str
):
accesstoken = api_macro_login(environment, username, password)
info = api_call(
environment,
accesstoken,
"nameserver",
"info",
{
"domain": domain,
}
)
api_macro_logout(environment, accesstoken)
return info
def api_macro_save(
environment : str,
username : str,
password : str,
domain : str,
name : str,
type_ : str,
content : str
):
accesstoken = api_macro_login(environment, username, password)
info = api_call(
environment,
accesstoken,
"nameserver",
"info",
{
"domain": domain,
}
)
matching = list(
filter(
lambda record: ((record["name"] == (name + "." + domain)) and (record["type"] == type_)),
info["record"]
)
)
count = len(matching)
if (count == 0):
result = api_call(
environment,
accesstoken,
"nameserver",
"createRecord",
{
"domain": domain,
"name": name,
"type": type_,
"content": content,
}
)
id_ = result["id"]
log("created record %u" % id_)
elif (count == 1):
id_ = matching[0]["id"]
result = api_call(
environment,
accesstoken,
"nameserver",
"updateRecord",
{
"id": id_,
"content": content,
}
)
log("updated record %u" % id_)
else:
log("found multiple records with this name and type")
api_macro_logout(environment, accesstoken)

95
source/main.py Normal file
View file

@ -0,0 +1,95 @@
def args(
):
argumentparser = _argparse.ArgumentParser(
description = "INWX CLI Frontend"
)
argumentparser.add_argument(
'--conf',
dest = "conf",
default = _os.path.join(str(_pathlib.Path.home()), ".inwx-conf.json")
)
argumentparser.add_argument(
'--environment',
dest = "environment",
default = None
)
argumentparser.add_argument(
'--username',
dest = "username",
default = None
)
argumentparser.add_argument(
'--password',
dest = "password",
default = None
)
'''
argumentparser.add_argument(
'--domain',
dest = "domain",
default = None
)
'''
argumentparser.add_argument(
"command",
type = str
)
argumentparser.add_argument(
"parameter",
nargs = "*",
type = str
)
arguments = argumentparser.parse_args()
return arguments
def main(
):
arguments = args()
conf_load(arguments.conf)
if (not (arguments.environment is None)): conf_set("environment", arguments.environment)
if (not (arguments.username is None)): conf_set("account.username", arguments.username)
if (not (arguments.password is None)): conf_set("account.password", arguments.password)
if (arguments.command == "info"):
result = api_macro_info(
conf_get("environment"),
conf_get("account.username"),
conf_get("account.password")
)
print(_json.dumps(result, indent = "\t"))
elif (arguments.command == "list"):
domain = arguments.parameter[0]
result = api_macro_list(
conf_get("environment"),
conf_get("account.username"),
conf_get("account.password"),
domain
)
print(_json.dumps(result, indent = "\t"))
elif (arguments.command == "save"):
domain = arguments.parameter[0]
name = arguments.parameter[1]
type_ = arguments.parameter[2]
content = arguments.parameter[3]
api_macro_save(
conf_get("environment"),
conf_get("account.username"),
conf_get("account.password"),
domain,
name,
type_,
content
)
# print(_json.dumps(result, indent = "\t"))
else:
log("unhandled command '%s'" % (arguments.command, ))
try:
main()
except ValueError as error:
_sys.stderr.write(str(error) + "\n")