# Extend the existing /mnt/data/ucls-dns-packs repo to include IP address ontology, packs, bindings, gates,
# and adapter manifests for RPKI/BGP/IRR, plus update Grafana with an IP posture panel.
import os, json, yaml, textwrap, zipfile
base = "/mnt/data/ucls-dns-packs"
assert os.path.isdir(base), "Base repo not found. Run the previous step first."
def write(path, content):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(content.strip() + "\n")
def dump_yaml(path, obj):
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
yaml.safe_dump(obj, f, sort_keys=False)
# 1) Extend JSON-LD context with IP concepts
ctx_path = os.path.join(base, "schemas", "contexts", "ucls-dns-context.jsonld")
with open(ctx_path, "r", encoding="utf-8") as f:
ctx = json.load(f)
ctx["@context"].update({
"IPAddress":"ucls:IPAddress",
"IPPrefix":"ucls:IPPrefix",
"AF":"ucls:AddressFamily",
"roa":"ucls:ROA",
"aspa":"ucls:ASPA",
"originAS":"ucls:originAS",
"maxLength":"ucls:maxLength",
"asn":"ucls:asn",
"afi":"ucls:afi",
"ptrDomain":"ucls:ptrDomain",
"rir":"ucls:rir",
"status":"ucls:status",
"bgpState":"ucls:bgpState",
"bogon":"ucls:bogon",
"scope":"ucls:scope",
"category":"ucls:category"
})
write(ctx_path, json.dumps(ctx, indent=2))
# 2) SHACL shapes for IPAddress and IPPrefix, ROA, ASPA
shapes_dir = os.path.join(base, "schemas", "shacl")
ip_shapes = {
"IPAddressShape.ttl": """
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix ucls: <https://ucls.org/terms#> .
ucls:IPAddressShape a sh:NodeShape ;
sh:targetClass ucls:IPAddress ;
sh:property [ sh:path ucls:afi ; sh:in ( "IPv4" "IPv6" ) ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:value ; sh:datatype xsd:string ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:rir ; sh:minCount 0 ] ;
sh:property [ sh:path ucls:ptrDomain ; sh:minCount 0 ] .
""",
"IPPrefixShape.ttl": """
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix ucls: <https://ucls.org/terms#> .
ucls:IPPrefixShape a sh:NodeShape ;
sh:targetClass ucls:IPPrefix ;
sh:property [ sh:path ucls:afi ; sh:in ( "IPv4" "IPv6" ) ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:cidr ; sh:datatype xsd:string ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:rir ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:status ; sh:minCount 1 ] .
""",
"ROAShape.ttl": """
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix ucls: <https://ucls.org/terms#> .
ucls:ROAShape a sh:NodeShape ;
sh:targetClass ucls:ROA ;
sh:property [ sh:path ucls:asn ; sh:datatype xsd:integer ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:prefix ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:maxLength ; sh:datatype xsd:integer ; sh:minCount 1 ] .
""",
"ASPAShape.ttl": """
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix ucls: <https://ucls.org/terms#> .
ucls:ASPAShape a sh:NodeShape ;
sh:targetClass ucls:ASPA ;
sh:property [ sh:path ucls:asn ; sh:datatype xsd:integer ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:providers ; sh:minCount 1 ] .
"""
}
for fname, content in ip_shapes.items():
write(os.path.join(shapes_dir, fname), content)
# 3) IP policy packs
packs_dir = os.path.join(base, "packs")
ip_packs = {
"IP-Core-1.0.0.yaml": {
"packId":"IP-Core",
"policyVersion":"1.0.0",
"appliesTo":[{"targetKind":"IPAddress"},{"targetKind":"IPPrefix"}],
"enforces":{
"classification":[
"Categorize address/prefix into (legacy A/B/C, CGNAT, private, loopback, link-local, multicast, documentation, reserved, 6to4/teredo, ULA, GUA)"
],
"rdns":[ "PTR domain consistency for allocated ranges" ],
"rir": [ "Allocation must map to RIR (ARIN, RIPE, APNIC, AFRINIC, LACNIC) or legacy noted" ]
},
"controls":[
{"name":"BogonBlock","rule":"Block unallocated/bogon space from edge ingress"},
{"name":"GeoScope","rule":"Tag prefixes with intended scope (anycast, regional, internal)"}
],
"metrics":[
{"key":"bogon_drop_rate","target":">=99.99%"},
{"key":"rdns_coverage","target":">=95%"}
]
},
"IP-Security-Baseline-1.0.0.yaml": {
"packId":"IP-Security-Baseline",
"policyVersion":"1.0.0",
"appliesTo":[{"targetKind":"IPPrefix"}],
"enforces":{
"rpki":[ "ROA must exist for routed GUA prefixes", "ASPA provider set required for edge ASes" ],
"bgp":[ "Reject RPKI invalid; prefer valid over unknown", "Max-prefix limits per neighbor" ],
"telemetry":[ "BMP stream from edge; hijack detection SLO <= 60s" ]
},
"metrics":[
{"key":"rpki_valid_rate","target":">=99.5%"},
{"key":"hijack_detection_ttd_seconds","target":"<=60"}
]
},
"IPv6-FutureProof-1.0.0.yaml": {
"packId":"IPv6-FutureProof",
"policyVersion":"1.0.0",
"appliesTo":[{"targetKind":"IPPrefix","afi":"IPv6"}],
"enforces":{
"addressing":[ "Use /48 site, /64 LANs; no SLAAC+RA conflicts", "Consistent ULA for internal mesh" ],
"dns":[ "Reverse zones ip6.arpa delegated with DNSSEC" ],
"sr6":[ "Segment Routing headers permitted by policy only" ]
},
"metrics":[{"key":"ipv6_reverse_dnssec","target":">=99.9%"}]
}
}
for fname, obj in ip_packs.items():
dump_yaml(os.path.join(packs_dir, fname), obj)
# 4) Bindings for IP resources
bindings_dir = os.path.join(base, "bindings")
ip_bindings = {
"bind-ip-core.json": {
"@type":"PolicyBinding",
"bindingId":"bind:ip-core",
"selector":{"kind":"IPAddress"},
"packs":["IP-Core@1.0.0"],
"precedence":10
},
"bind-ipprefix-core.json": {
"@type":"PolicyBinding",
"bindingId":"bind:ipprefix-core",
"selector":{"kind":"IPPrefix"},
"packs":["IP-Core@1.0.0","IP-Security-Baseline@1.0.0"],
"precedence":20
},
"bind-ipv6-future.json": {
"@type":"PolicyBinding",
"bindingId":"bind:ipv6-future",
"selector":{"kind":"IPPrefix","afi":"IPv6"},
"packs":["IPv6-FutureProof@1.0.0"],
"precedence":30
}
}
for fname, obj in ip_bindings.items():
write(os.path.join(bindings_dir, fname), json.dumps(obj, indent=2))
# 5) Gates for IP entities
gates_dir = os.path.join(base, "gates")
ip_gates = {
"gate-ipaddress-core.json": {
"gateId":"gate:ipaddress-core",
"targetKind":"IPAddress",
"shapes":["ucls:IPAddressShape"],
"packs":["IP-Core@1.0.0"],
"failureMode":"fail-closed"
},
"gate-ipprefix-core.json": {
"gateId":"gate:ipprefix-core",
"targetKind":"IPPrefix",
"shapes":["ucls:IPPrefixShape","ucls:ROAShape","ucls:ASPAShape"],
"packs":["IP-Core@1.0.0","IP-Security-Baseline@1.0.0"],
"failureMode":"fail-closed",
"onFail":[{"action":"block-release"},{"action":"open-praas-case","severity":"high"}]
}
}
for fname, obj in ip_gates.items():
write(os.path.join(gates_dir, fname), json.dumps(obj, indent=2))
# 6) Adapters for RPKI/BGP/IRR/Bogon
adapters_dir = os.path.join(base, "adapters")
configs_dir = os.path.join(adapters_dir, "configs")
# Additional configs
bogon_cfg = """
sources:
- name: team-cymru-bogon
url: https://www.team-cymru.org/Services/Bogons/fullbogons-ipv4.txt
- name: team-cymru-bogon6
url: https://www.team-cymru.org/Services/Bogons/fullbogons-ipv6.txt
refresh_seconds: 3600
""".strip()
write(os.path.join(configs_dir, "bogon-sources.yaml"), bogon_cfg)
irr_cfg = """
servers:
- host: rr.ntt.net
- host: rr.level3.net
- host: rr.arin.net
refresh_seconds: 7200
""".strip()
write(os.path.join(configs_dir, "irr-servers.yaml"), irr_cfg)
# Extend docker-compose with IP adapters
compose_path = os.path.join(adapters_dir, "docker-compose.yml")
with open(compose_path, "r", encoding="utf-8") as f:
compose_txt = f.read()
compose_extra = """
rpki-monitor:
image: ghcr.io/ucls/rpki-monitor:latest
environment:
- UCLS_API_BASE=${UCLS_API_BASE:-http://ucls:8080}
- BUS_BROKER=bus:9092
# Expects RTR to public validators or your own
bgp-bmp-collector:
image: ghcr.io/ucls/bmp-collector:latest
environment:
- UCLS_API_BASE=${UCLS_API_BASE:-http://ucls:8080}
- BUS_BROKER=bus:9092
ports: ["5000:5000"] # BMP listen
irr-puller:
image: ghcr.io/ucls/irr-puller:latest
environment:
- UCLS_API_BASE=${UCLS_API_BASE:-http://ucls:8080}
- BUS_BROKER=bus:9092
volumes: ["./configs/irr-servers.yaml:/app/irr-servers.yaml:ro"]
bogon-feed:
image: ghcr.io/ucls/bogon-feed:latest
environment:
- UCLS_API_BASE=${UCLS_API_BASE:-http://ucls:8080}
- BUS_BROKER=bus:9092
volumes: ["./configs/bogon-sources.yaml:/app/bogon-sources.yaml:ro"]
"""
# Naively append before 'networks:'
compose_txt = compose_txt.replace("networks:\n default: {}", compose_extra + "\nnetworks:\n default: {}")
write(compose_path, compose_txt)
# K8s manifests for IP adapters
k8s_dir = os.path.join(adapters_dir, "k8s")
def k8s_deploy(name, image, env=None, port=None, cm=None, cm_data=None):
spec = {
"apiVersion":"apps/v1",
"kind":"Deployment",
"metadata":{"name":name},
"spec":{
"replicas":1,
"selector":{"matchLabels":{"app":name}},
"template":{
"metadata":{"labels":{"app":name}},
"spec":{
"containers":[{
"name":name,"image":image,
"env":[
{"name":"UCLS_API_BASE","value":"http://ucls:8080"},
{"name":"BUS_BROKER","value":"bus-kafka:9092"}
]
}]
}
}
}
}
if env:
spec["spec"]["template"]["spec"]["containers"][0]["env"] += env
if port:
spec["spec"]["template"]["spec"]["containers"][0]["ports"] = [{"containerPort":port}]
if cm and cm_data:
cm_spec = {"apiVersion":"v1","kind":"ConfigMap","metadata":{"name":cm},"data":cm_data}
write(os.path.join(k8s_dir, f"{cm}.yaml"), yaml.safe_dump(cm_spec, sort_keys=False))
spec["spec"]["template"]["spec"]["volumes"] = [{"name":"config","configMap":{"name":cm}}]
spec["spec"]["template"]["spec"]["containers"][0]["volumeMounts"] = [{"name":"config","mountPath":"/app","readOnly":True}]
write(os.path.join(k8s_dir, f"{name}-deployment.yaml"), yaml.safe_dump(spec, sort_keys=False))
k8s_deploy("rpki-monitor","ghcr.io/ucls/rpki-monitor:latest")
k8s_deploy("bgp-bmp-collector","ghcr.io/ucls/bmp-collector:latest", port=5000)
k8s_deploy("irr-puller","ghcr.io/ucls/irr-puller:latest", cm="irr-servers-cm", cm_data={"irr-servers.yaml": irr_cfg})
k8s_deploy("bogon-feed","ghcr.io/ucls/bogon-feed:latest", cm="bogon-sources-cm", cm_data={"bogon-sources.yaml": bogon_cfg})
# 7) Update README with IP section
readme_path = os.path.join(base, "README.md")
with open(readme_path, "a", encoding="utf-8") as f:
f.write(textwrap.dedent("""
## IP Address Coverage (IPv4/IPv6, legacy & future-proof)
- **Packs**: `IP-Core`, `IP-Security-Baseline`, `IPv6-FutureProof`
- **Bindings**: apply to `IPAddress` and `IPPrefix` (with IPv6 specialization)
- **Gates**: `gate:ipaddress-core`, `gate:ipprefix-core` (ROA/ASPA aware)
- **Adapters**: `rpki-monitor`, `bgp-bmp-collector`, `irr-puller`, `bogon-feed`
- **Shapes**: `IPAddressShape`, `IPPrefixShape`, `ROAShape`, `ASPAShape`
Data sources assumed: RPKI RTR validators, BMP from edge routers, IRR servers, bogon feeds.
"""))
# 8) Update Grafana with an IP posture panel
dash_path = os.path.join(base, "dashboards", "ucls-dns-posture.json")
with open(dash_path, "r", encoding="utf-8") as f:
dash = json.load(f)
ip_panel = {
"type":"table",
"title":"IP Posture (RPKI/BGP)",
"id": 4,
"datasource":{"type":"postgres","uid":"POSTGRES_DS"},
"targets":[{
"format":"table",
"rawSql": textwrap.dedent("""
SELECT prefix, afi, origin_as, rpki_state, aspa_state, bgp_uptime_minutes
FROM ip_posture
WHERE $__timeFilter(ts)
ORDER BY rpki_state DESC, bgp_uptime_minutes DESC
LIMIT 200
""").strip(),
"refId":"A"
}],
"gridPos":{"h":9,"w":24,"x":0,"y":28}
}
dash["panels"].append(ip_panel)
dash["version"] = (dash.get("version") or 1) + 1
write(dash_path, json.dumps(dash, indent=2))
# 9) Re-zip the updated repo
zip_path = "/mnt/data/ucls-dns-packs-ip.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
for folder, _, files in os.walk(base):
for f in files:
ap = os.path.join(folder, f)
zf.write(ap, os.path.relpath(ap, os.path.dirname(base)))
print("Updated repository tree with IP coverage.\nZip:", zip_path)