Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dnscry.pt update script #945

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions utils/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def process(md_path, signatures_to_update):
for name in sorted(entries.keys()):
entry = entries[name]
out = out + "\n" + entry.format() + "\n"
if not name in INCOMPATIBLE_WITH_LEGACY_VERSIONS:
if name not in INCOMPATIBLE_WITH_LEGACY_VERSIONS:
out_legacy = out_legacy + "\n" + entry.format_legacy() + "\n"

if os.path.basename(md_path) == "public-resolvers.md":
Expand All @@ -197,6 +197,7 @@ def process(md_path, signatures_to_update):
else:
with open(md_path + ".tmp", "wt") as f:
f.write(out)
os.unlink(md_path)
os.rename(md_path + ".tmp", md_path)

# Legacy
Expand All @@ -214,7 +215,8 @@ def process(md_path, signatures_to_update):
else:
with open(md_legacy_path + ".tmp", "wt") as f:
f.write(out_legacy)
os.rename(md_legacy_path + ".tmp", md_legacy_path)
os.unlink(md_legacy_path)
os.rename(md_legacy_path + ".tmp", md_legacy_path)

# Historic

Expand Down Expand Up @@ -244,15 +246,17 @@ def process(md_path, signatures_to_update):

# Signatures

for path in [md_path, md_legacy_path, csv_historic_path]:
try:
subprocess.run(
["minisign", "-V", "-P", MINISIGN_PK, "-m", path], check=True
)
except subprocess.CalledProcessError:
signatures_to_update.append(path)
if signatures_to_update is not None:
for path in [md_path, md_legacy_path, csv_historic_path]:
try:
subprocess.run(
["minisign", "-V", "-P", MINISIGN_PK, "-m", path], check=True
)
except subprocess.CalledProcessError:
signatures_to_update.append(path)


# Change to None to skip signature updates, e.g. during development.
signatures_to_update = []

for md_path in glob(CURRENT_DIR + "/*.md"):
Expand Down
5 changes: 1 addition & 4 deletions utils/subset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#! /usr/bin/env python3

from copyreg import constructor
import sys
from glob import glob


class Entry:
Expand Down Expand Up @@ -72,8 +70,7 @@ def process(names_path, md_path):
for i in range(0, len(raw_entries)):
entry = Entry.parse(raw_entries[i])
if not entry:
print(
"Invalid entry: [" + raw_entries[i] + "]", file=sys.stderr)
print("Invalid entry: [" + raw_entries[i] + "]", file=sys.stderr)
continue
if entry.name in entries:
print("Duplicate entry: [" + entry.name + "]", file=sys.stderr)
Expand Down
189 changes: 189 additions & 0 deletions utils/update-dnscry.pt-entries.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#! /usr/bin/env python3

import json
import os
import sys
import unicodedata
import urllib.request

CURRENT_DIR = "v3"


class Entry:
name = None
description = None
stamps = None

def __init__(self, name, description, stamps):
self.name = name
self.description = description
self.stamps = stamps

@staticmethod
def parse(raw_entry):
description = ""
stamps = []
lines = raw_entry.strip().splitlines()
if len(lines) < 2:
return None
name = lines[0].strip()
previous_was_blank = False
for line in lines[1:]:
line = line.strip()
if previous_was_blank is True and line == "":
continue
previous_was_blank = False
if line.startswith("sdns://"):
stamps.append(line)
else:
description = description + line + "\n"

description = description.strip()
if len(name) < 2 or len(description) < 10 or len(stamps) < 1:
return None

return Entry(name, description, stamps)

def format(self):
out = "## " + self.name + "\n\n"
out = out + self.description + "\n\n"
for stamp in self.stamps:
out = out + stamp + "\n"

return out


class DNSCryDotPTEntry:
raw = None
name = None
description = None

def __init__(self, raw, name, description):
self.raw = raw
self.name = name
self.description = description

@staticmethod
def parse(raw_entry):
name = (
raw_entry["location"]
.lower()
.replace(" ", "")
.replace("-", "")
.replace("'", "")
)
# The hand-written entries were ASCII-only, so this is done to reduce churn.
name = unicodedata.normalize("NFKD", name).encode("ASCII", "ignore").decode()
# NOTE: Just trusting this is true for all dnscry.pt resolvers/relays, rather than trying
# to process the stamp to confirm this.
description = f"DNSCry.pt {raw_entry['location']} - DNSCrypt, no filter, no logs, DNSSEC support"
return DNSCryDotPTEntry(raw_entry, name, description)

def get_ipv4_resolver_entry(self):
return Entry(
f"dnscry.pt-{self.name}-ipv4",
f"{self.description} (IPv4 server)\n\nhttps://www.dnscry.pt",
[self.raw["ipv4_stamp"]],
)

def get_ipv6_resolver_entry(self):
return Entry(
f"dnscry.pt-{self.name}-ipv6",
f"{self.description} (IPv6 server)\n\nhttps://www.dnscry.pt",
[self.raw["ipv6_stamp"]],
)

def get_ipv4_relay_entry(self):
return Entry(
f"dnscry.pt-anon-{self.name}-ipv4",
f"{self.description} (IPv4 server)\n\nhttps://www.dnscry.pt",
[self.raw["anon_ipv4_stamp"]],
)

def get_ipv6_relay_entry(self):
return Entry(
f"dnscry.pt-anon-{self.name}-ipv6",
f"{self.description} (IPv6 server)\n\nhttps://www.dnscry.pt",
[self.raw["anon_ipv6_stamp"]],
)


def process(md_path, resolvers, is_resolvers):
print("\n[" + md_path + "]")
entries = {}
previous_content = ""
out = ""

with open(md_path, encoding="utf8") as f:
previous_content = f.read()
c = previous_content.split("\n## ")
out = out + c[0].strip() + "\n\n"
raw_entries = c[1:]
for i in range(0, len(raw_entries)):
entry = Entry.parse(raw_entries[i])
if not entry:
print("Invalid entry: [" + raw_entries[i] + "]", file=sys.stderr)
continue
if entry.name in entries:
print("Duplicate entry: [" + entry.name + "]", file=sys.stderr)
if entry.name.startswith("dnscry.pt"):
continue
entries[entry.name] = entry

for resolver in resolvers:
if is_resolvers:
v4 = resolver.get_ipv4_resolver_entry()
v6 = resolver.get_ipv6_resolver_entry()
else:
v4 = resolver.get_ipv4_relay_entry()
v6 = resolver.get_ipv6_relay_entry()

# Some entries in the dnscry.pt database used to have repeated location names.
# Right now, all entries are unique, so this code shouldn't be needed, but
# it'll catch any future oversights rather than silently losing entries.
if v4.name in entries:
v4name = v4.name
v4suffix = resolver.raw["host"].split(".")[0][3:]
v4name = v4name.replace(resolver.name, f"{resolver.name}{v4suffix}")
print(
"Duplicate entry: [" + v4.name + "] => [" + v4name + "]",
file=sys.stderr,
)
v4.name = v4name
entries[v4.name] = v4

if v6.name in entries:
v6name = v6.name
v6suffix = resolver.raw["host"].split(".")[0][3:]
v6name = v6name.replace(resolver.name, f"{resolver.name}{v6suffix}")
print(
"Duplicate entry: [" + v6.name + "] => [" + v6name + "]",
file=sys.stderr,
)
v6.name = v6name
entries[v6.name] = v6

for name in sorted(entries.keys()):
entry = entries[name]
out = out + "\n" + entry.format() + "\n"

if out == previous_content:
print("No changes")
else:
with open(md_path + ".tmp", "wt", encoding="utf8") as f:
f.write(out)
os.unlink(md_path)
os.rename(md_path + ".tmp", md_path)


with urllib.request.urlopen("https://www.dnscry.pt/resolvers.json") as response:
resolverdata = json.load(response)

resolvers = []
for data in resolverdata:
resolvers.append(DNSCryDotPTEntry.parse(data))

resolvers.sort(key=lambda r: r.name)

process(f"{CURRENT_DIR}/public-resolvers.md", resolvers, is_resolvers=True)
process(f"{CURRENT_DIR}/relays.md", resolvers, is_resolvers=False)