# Point of the exercise is to generate examples, so put them first. def main(): d = dict((name, PubObject(name)) for name in ("Alice", "Bob", "Carol", "Dave", "Eve", "Fee", "Fie", "Foe", "Fum")) alice = d["Alice"] e = Comment(" Zero or more PDUs ") prettyprint(e, "common", "query") prettyprint(e, "common", "reply") #e = TextElement("publish", alice.base64, uri = alice.uri, tag = "foo") e = TextElement("publish", alice.base64, uri = alice.uri, tag = "") prettyprint(e, "publish-new", "query", " body is base64(new-object) ") pub_without_hash = e e = TextElement("publish", alice.base64, uri = alice.uri, tag = "foo", hash = alice.hash) prettyprint(e, "publish-existing", "query", " hash is hex(SHA-256(old-object)) ", " body is base64(new-object) ") pub_with_hash = e e = Element("success") prettyprint(e, "success", "reply") e = Element("withdraw", hash = alice.hash, uri = alice.uri, tag = "foo") prettyprint(e, "withdraw", "query", " hash is hex(SHA-256(old-object)) ") withdraw = e e = Element("report_error", error_code = "no_object_matching_hash", tag = "foo") SubElement(e, "error_text").text = "Can't delete an object I don't have" SubElement(e, "failed_pdu").append(pub_with_hash) prettyprint(e, "error-with-optional", "reply") e = Element("report_error", error_code = "object_already_present", tag = "foo") prettyprint(e, "error-without-optional", "reply") q = dict(Alice = TextElement("publish", d["Alice"].base64, uri = d["Alice"].uri, tag = "Alice"), Bob = Element( "withdraw", hash = d["Bob"].hash, uri = d["Bob"].uri, tag = "Bob"), Carol = TextElement("publish", d["Carol"].base64, uri = d["Carol"].uri, tag = "Carol"), Dave = Element( "withdraw", hash = d["Dave"].hash, uri = d["Dave"].uri, tag = "Dave"), Eve = TextElement("publish", d["Eve"].base64, uri = d["Eve"].uri, tag = "Eve")) e1 = Element("report_error", error_code = "no_object_matching_hash", tag = "Dave") e2 = Element("report_error", error_code = "object_already_present", tag = "Eve") SubElement(e1, "failed_pdu").append(q["Dave"]) SubElement(e2, "failed_pdu").append(q["Eve"]) prettyprint([q[i] for i in sorted(q)], "multi", "query") prettyprint(e1, "multi-error-first", "reply") prettyprint([e1, e2], "multi-error-all", "reply") prettyprint(Element("list"), "list", "query") prettyprint([Element("list", hash = d["Fee"].hash, uri = d["Fee"].uri), Element("list", hash = d["Fie"].hash, uri = d["Fie"].uri), Element("list", hash = d["Foe"].hash, uri = d["Foe"].uri), Element("list", hash = d["Fum"].hash, uri = d["Fum"].uri)], "list", "reply") # Rest of file is the machinery to generate the examples. from lxml.etree import (ElementTree, Element, SubElement, Comment, XML, tostring as ElementToString, RelaxNG, _Comment) import copy import sys import re matcher = re.compile(r"( *<[^!/ >]+)([^>]*)>(.*)$") schema = ElementTree(file = "rpki-publication.rng") version = schema.xpath("*[local-name() = 'define' and @name = 'version']")[0][0].text xmlns = schema.getroot().get("ns") schema = RelaxNG(schema) # Hash truncation is a kludge to let examples fit in the 72-column RFC format truncate_hash = True class PubObject(object): def __init__(self, name): from hashlib import sha256 self.name = name self.text = "Hello, my name is {}".format(name) self.base64 = self.text.encode("base64") self.hash = sha256(self.text).hexdigest() if truncate_hash: self.hash = self.hash[:16] self.uri = "rsync://wombat.example/" + name + "/" + self.hash + ".cer" def TextElement(elt, text, **kwargs): t = Element(elt, **kwargs) br = "\n" + (" " * 6) t.text = br + text.lstrip().replace("\n", br)[:-2] return t def iscomment(e): return isinstance(e, _Comment) def validate(e): if e.get("xmlns"): assert e.get("xmlns") == xmlns e = copy.deepcopy(e) del e.attrib["xmlns"] for ee in e.getiterator(): if not iscomment(ee): ee.tag = "{" + xmlns + "}" + ee.tag if not schema.validate(e): sys.exit("Failure validating\n%s%s" % ( ElementToString(e, pretty_print = True, encoding = "us-ascii", xml_declaration = False), schema.error_log.last_error)) def prettyprint(e, name, qr, *comments): assert qr in ("query", "reply") m = Element("msg", version = version, type = qr, xmlns = xmlns) for comment in comments: m.append(Comment(comment)) if isinstance(e, list): m.extend(copy.deepcopy(s) for s in e) else: m.append(copy.deepcopy(e)) validate(m) s = ElementToString(m, pretty_print = True, encoding = "us-ascii", xml_declaration = False) lines = [] for line in s.splitlines(): line = line.expandtabs() m = matcher.match(line) if not m: lines.append(line) else: elt, attrs, rest = m.groups() attrs = attrs.split() if attrs == ["/"] and not rest: elt += "/" attrs = [] lines.append(elt) if attrs: indent = " " * (elt.find("<") + 4) lines.extend(indent + a for a in attrs) lines[-1] += ">" if rest: i = rest.find("\n" + "
") if __name__ == "__main__": main()