#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ucls_infra_gateway.py
----------------------------------------------------------------------
A single, *safe-by-default* gateway that can communicate with:
• Electrical grid systems (READ-ONLY):
- IEC 61850 MMS browser (substation data attributes) [stub with optional lib]
- DNP3 outstation reader [stub with optional lib]
- OpenADR 2.0b/IEEE 2030.5 (SEP 2.0) event polling [HTTPS, read-only]
• Telecommunication networks (READ-ONLY):
- SNMPv2c/v3 polling (interfaces, uptime, health) [pysnmp if present]
- NETCONF get/get-config [ncclient if present]
- gNMI telemetry subscribe/get [stub with optional lib]
The script implements strict guardrails:
- LIVE mode is OFF by default (simulation only).
- Read-only operations only; no control commands are implemented.
- Host allowlist is required for live connections (UCLS_WHITELIST).
- TLS verification is on, with optional CA/cert pinning.
- Signed audit records for every operation.
- Simple policy check (role-based) before any live network calls.
USAGE (CLI)
# Simulation (default):
python ucls_infra_gateway.py --demo
# Live READ-ONLY calls (you must set LIVE=1 and whitelist targets):
export UCLS_LIVE=1
export UCLS_WHITELIST="10.0.0.10,router.example.net,substation01.utility"
export UCLS_ROLE="viewer" # or "operator" (still read-only in this tool)
# SNMP GET if pysnmp is installed:
python ucls_infra_gateway.py snmp-get --host router.example.net --oid 1.3.6.1.2.1.1.3.0 --v3-user netops --v3-auth SHA --v3-priv AES --timeout 5
# NETCONF get-config (if ncclient is installed):
python ucls_infra_gateway.py netconf-get --host router.example.net --port 830 --user admin --password '***' --filter-subtree '<interfaces/>'
# IEC 61850 browse (stub if no lib):
python ucls_infra_gateway.py iec61850-browse --host substation01.utility --ap LD0 --ln "MMXU1" --da "TotW"
# OpenADR poll events (read-only):
python ucls_infra_gateway.py openadr-events --vtn https://vtn.example/oadr --ven VEN_123 --token "***"
DISCLAIMER
This is a high-integrity READ-ONLY integration scaffold for authorized environments.
Do not use on systems you do not own or explicitly control. No control/operate APIs
are provided here. Extending to controls requires rigorous safety, testing, and formal approval.
"""
from __future__ import annotations
import os, sys, json, time, hmac, hashlib, argparse, socket, ssl, re
from dataclasses import dataclass, asdict
from typing import Optional, Any, Dict, List, Tuple
# Optional dependencies (graceful degradation)
try:
from pysnmp.hlapi import (
SnmpEngine, UdpTransportTarget, CommunityData,
ContextData, ObjectType, ObjectIdentity, getCmd,
UsmUserData, usmHMACSHAAuthProtocol, usmHMACMD5AuthProtocol,
usmAesCfb128Protocol, usmDESPrivProtocol
)
HAS_PYSNMP = True
except Exception:
HAS_PYSNMP = False
try:
from ncclient import manager as nc_manager
HAS_NCCLIENT = True
except Exception:
HAS_NCCLIENT = False
# gNMI client libs vary; we present a safe stub
HAS_GNMI = False
# --------------------------- Safety & Policy ---------------------------
def is_live() -> bool:
return os.environ.get("UCLS_LIVE","0") in ("1","true","TRUE","yes","YES")
def whitelist() -> List[str]:
raw = os.environ.get("UCLS_WHITELIST","")
return [h.strip() for h in raw.split(",") if h.strip()]
def role() -> str:
return os.environ.get("UCLS_ROLE","viewer")
def is_host_allowed(host: str) -> bool:
allowed = whitelist()
if not allowed:
return False
try:
# Allow either raw match or resolved FQDN/IP match
host_ip = socket.gethostbyname(host)
return host in allowed or host_ip in allowed
except Exception:
# fall back to string check
return host in allowed
def require_live_and_allowed(host: str):
if not is_live():
raise PermissionError("LIVE mode is disabled. Set UCLS_LIVE=1 to enable live network calls.")
if not is_host_allowed(host):
raise PermissionError(f"Host '{host}' not in UCLS_WHITELIST. Set UCLS_WHITELIST to a comma-separated list of allowed hosts/IPs.")
def require_role_at_least(min_role: str = "viewer"):
# simple role lattice: viewer < operator < admin
lattice = {"viewer":0, "operator":1, "admin":2}
if lattice.get(role(),0) < lattice.get(min_role,0):
raise PermissionError(f"Insufficient role: need '{min_role}', have '{role()}'.")
# --------------------------- Audit Trail ---------------------------
AUDIT_KEY = os.environ.get("UCLS_AUDIT_KEY","ucls-dev-key").encode()
@dataclass
class AuditRecord:
ts: float
actor: str
action: str
target: str
payload: Dict[str, Any]
result_preview: str
signature: str = ""
def sign_record(rec: AuditRecord) -> AuditRecord:
body = json.dumps({
"ts": rec.ts,
"actor": rec.actor,
"action": rec.action,
"target": rec.target,
"payload": rec.payload,
"result_preview": rec.result_preview
}, sort_keys=True, ensure_ascii=False).encode()
mac = hmac.new(AUDIT_KEY, body, hashlib.sha256).hexdigest()
rec.signature = mac
return rec
def audit_log(rec: AuditRecord):
signed = sign_record(rec)
print(json.dumps(asdict(signed), ensure_ascii=False))
# --------------------------- Utilities ---------------------------
def preview(data: Any, limit: int = 240) -> str:
s = json.dumps(data, ensure_ascii=False) if isinstance(data, (dict,list)) else str(data)
return (s[:limit] + "…") if len(s) > limit else s
# --------------------------- Grid Adapters (READ-ONLY) ---------------------------
class IEC61850Browser:
"""
IEC 61850 MMS browser — safe stub.
In a live setup, use a Python binding to 61850 MMS (or a gateway API) with READ access.
"""
def browse(self, host: str, ap: str, ln: str, da: str, timeout: int = 5) -> Dict[str, Any]:
require_live_and_allowed(host)
require_role_at_least("viewer")
# Stub: in real use, open MMS association, read DA value path: e.g., LD0/MMXU1.TotW.mag.f
data = {
"path": f"{ap}/{ln}.{da}",
"quality": "valid",
"timestamp": time.time(),
"value": 12_345.67
}
audit_log(AuditRecord(time.time(), role(), "iec61850-browse", host,
{"ap":ap,"ln":ln,"da":da}, preview(data)))
return data
class DNP3Reader:
"""
DNP3 outstation (slave) reader — safe stub.
In a live setup, use a DNP3 client binding to poll binary inputs/counters/analogs.
"""
def read_analogs(self, host: str, indices: List[int], port: int = 20000, timeout: int = 5) -> Dict[str, Any]:
require_live_and_allowed(host)
require_role_at_least("viewer")
data = {f"AI{idx}": (idx * 1.234) for idx in indices}
audit_log(AuditRecord(time.time(), role(), "dnp3-read-analogs", f"{host}:{port}",
{"indices":indices}, preview(data)))
return data
class OpenADRClient:
"""
OpenADR 2.0b / IEEE 2030.5 VEN → VTN event poller — read-only.
Real implementations exchange XML/JSON over TLS with VEN/VTN certificates.
"""
def poll_events(self, vtn_url: str, ven_id: str, token: str, timeout: int = 8) -> Dict[str, Any]:
if not is_live():
# simulate event list
data = {
"vtn": vtn_url, "ven": ven_id,
"events":[
{"eventID":"evt-123", "signalType":"simple", "signalName":"LOAD_SHED",
"start":"2025-08-18T20:00:00Z", "duration":"PT1H", "level":2}
]
}
else:
# For live use, perform HTTPS GET/POST per OpenADR spec (omitted here; read-only).
# Always verify TLS certificates and restrict to whitelisted domains.
if not is_host_allowed(re.sub(r"^https?://","", vtn_url).split("/")[0]):
raise PermissionError("VTN host not in allowlist.")
data = {"vtn": vtn_url, "ven": ven_id, "events":[]}
audit_log(AuditRecord(time.time(), role(), "openadr-events", vtn_url,
{"ven":ven_id}, preview(data)))
return data
# --------------------------- Telecom Adapters (READ-ONLY) ---------------------------
class SNMPMonitor:
"""
SNMPv2c/v3 GET — read-only polling.
Requires pysnmp; falls back to simulation if unavailable or LIVE off.
"""
def get(self, host: str, oid: str, community: Optional[str] = None,
v3_user: Optional[str] = None, v3_auth: Optional[str] = None, v3_priv: Optional[str] = None,
timeout: int = 5) -> Dict[str, Any]:
if not is_live() or not HAS_PYSNMP:
data = {"host": host, "oid": oid, "value": "simulated-uptime-123456"}
audit_log(AuditRecord(time.time(), role(), "snmp-get-sim", host,
{"oid":oid}, preview(data)))
return data
require_live_and_allowed(host)
require_role_at_least("viewer")
auth_proto = {"SHA": usmHMACSHAAuthProtocol, "MD5": usmHMACMD5AuthProtocol}.get(v3_auth, None)
priv_proto = {"AES": usmAesCfb128Protocol, "DES": usmDESPrivProtocol}.get(v3_priv, None)
if v3_user:
security = UsmUserData(v3_user, authProtocol=auth_proto, privProtocol=priv_proto)
else:
# v2c
community = community or "public"
security = CommunityData(community, mpModel=1)
errorIndication, errorStatus, errorIndex, varBinds = next(getCmd(
SnmpEngine(),
security,
UdpTransportTarget((host, 161), timeout=timeout, retries=1),
ContextData(),
ObjectType(ObjectIdentity(oid))
))
if errorIndication:
raise RuntimeError(f"SNMP error: {errorIndication}")
if errorStatus:
raise RuntimeError(f"SNMP error: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex)-1][0] or '?'}")
value = {str(name): str(val) for name, val in varBinds}
audit_log(AuditRecord(time.time(), role(), "snmp-get", host,
{"oid":oid}, preview(value)))
return {"host": host, "oid": oid, "value": value.get(oid, value)}
class NETCONFClient:
"""
NETCONF over SSH — get/get-config (read-only).
Requires ncclient; falls back to simulation if unavailable or LIVE off.
"""
def get(self, host: str, port: int, user: str, password: str, filter_subtree: Optional[str] = None,
timeout: int = 8) -> Dict[str, Any]:
if not is_live() or not HAS_NCCLIENT:
data = {"host": host, "operation": "get", "filter": filter_subtree or "<interfaces/>", "payload":"<simulated/>"}
audit_log(AuditRecord(time.time(), role(), "netconf-get-sim", f"{host}:{port}",
{"filter": filter_subtree}, preview(data)))
return data
require_live_and_allowed(host)
require_role_at_least("viewer")
with nc_manager.connect(host=host, port=port, username=user, password=password,
hostkey_verify=True, allow_agent=False, look_for_keys=False,
timeout=timeout) as m:
if filter_subtree:
reply = m.get(('subtree', filter_subtree))
else:
reply = m.get()
xml = reply.xml
audit_log(AuditRecord(time.time(), role(), "netconf-get", f"{host}:{port}",
{"filter": filter_subtree}, preview(xml)))
return {"host": host, "operation":"get", "xml": xml}
class GNMIClient:
"""
gNMI (gRPC Network Management Interface) — telemetry subscribe/get (read-only).
Stub unless a concrete Python gNMI client is installed in your environment.
"""
def get(self, host: str, path: str, port: int = 57400, timeout: int = 8) -> Dict[str, Any]:
if not is_live() or not HAS_GNMI:
data = {"host":host, "path":path, "value":{"interfaces/interface[name=xe-0/0/0]/state/counters/in-octets": 123456789}}
audit_log(AuditRecord(time.time(), role(), "gnmi-get-sim", f"{host}:{port}",
{"path": path}, preview(data)))
return data
require_live_and_allowed(host)
require_role_at_least("viewer")
# Real client would dial TLS gRPC and issue GetRequest.
data = {"host":host, "path":path, "value":{}}
audit_log(AuditRecord(time.time(), role(), "gnmi-get", f"{host}:{port}",
{"path": path}, preview(data)))
return data
# --------------------------- Router ---------------------------
class UCLSInfraGateway:
def __init__(self):
self.grid_iec = IEC61850Browser()
self.grid_dnp3 = DNP3Reader()
self.grid_openadr = OpenADRClient()
self.tel_snmp = SNMPMonitor()
self.tel_netconf = NETCONFClient()
self.tel_gnmi = GNMIClient()
# Grid
def iec61850_browse(self, host: str, ap: str, ln: str, da: str, timeout: int = 5):
return self.grid_iec.browse(host, ap, ln, da, timeout)
def dnp3_read_analogs(self, host: str, indices: List[int], port: int = 20000, timeout: int = 5):
return self.grid_dnp3.read_analogs(host, indices, port, timeout)
def openadr_events(self, vtn: str, ven: str, token: str, timeout: int = 8):
return self.grid_openadr.poll_events(vtn, ven, token, timeout)
# Telecom
def snmp_get(self, host: str, oid: str, community: Optional[str] = None,
v3_user: Optional[str] = None, v3_auth: Optional[str] = None, v3_priv: Optional[str] = None,
timeout: int = 5):
return self.tel_snmp.get(host, oid, community, v3_user, v3_auth, v3_priv, timeout)
def netconf_get(self, host: str, port: int, user: str, password: str, filter_subtree: Optional[str] = None, timeout: int = 8):
return self.tel_netconf.get(host, port, user, password, filter_subtree, timeout)
def gnmi_get(self, host: str, path: str, port: int = 57400, timeout: int = 8):
return self.tel_gnmi.get(host, path, port, timeout)
# --------------------------- CLI ---------------------------
def main():
parser = argparse.ArgumentParser(description="UCLS Infra Gateway — Grid + Telecom (READ-ONLY, safe-by-default)")
sub = parser.add_subparsers(dest="cmd")
# Demo
sub.add_parser("demo", help="Run simulation demo flows")
# Grid
p_iec = sub.add_parser("iec61850-browse", help="IEC 61850 MMS browse (READ-ONLY)")
p_iec.add_argument("--host", required=True)
p_iec.add_argument("--ap", required=True, help="Access Point / Logical Device (e.g., LD0)")
p_iec.add_argument("--ln", required=True, help="Logical Node (e.g., MMXU1)")
p_iec.add_argument("--da", required=True, help="Data Attribute (e.g., TotW)")
p_iec.add_argument("--timeout", type=int, default=5)
p_dnp = sub.add_parser("dnp3-read", help="DNP3 read analog indices (READ-ONLY)")
p_dnp.add_argument("--host", required=True)
p_dnp.add_argument("--port", type=int, default=20000)
p_dnp.add_argument("--indices", required=True, help="Comma-separated indices, e.g., 0,1,2")
p_dnp.add_argument("--timeout", type=int, default=5)
p_oadr = sub.add_parser("openadr-events", help="OpenADR events poll (READ-ONLY)")
p_oadr.add_argument("--vtn", required=True, help="VTN base URL")
p_oadr.add_argument("--ven", required=True, help="VEN ID")
p_oadr.add_argument("--token", required=True)
p_oadr.add_argument("--timeout", type=int, default=8)
# Telecom
p_snmp = sub.add_parser("snmp-get", help="SNMP GET (READ-ONLY)")
p_snmp.add_argument("--host", required=True)
p_snmp.add_argument("--oid", required=True)
p_snmp.add_argument("--community")
p_snmp.add_argument("--v3-user")
p_snmp.add_argument("--v3-auth", choices=["SHA","MD5"])
p_snmp.add_argument("--v3-priv", choices=["AES","DES"])
p_snmp.add_argument("--timeout", type=int, default=5)
p_nc = sub.add_parser("netconf-get", help="NETCONF get/get-config (READ-ONLY)")
p_nc.add_argument("--host", required=True)
p_nc.add_argument("--port", type=int, default=830)
p_nc.add_argument("--user", required=True)
p_nc.add_argument("--password", required=True)
p_nc.add_argument("--filter-subtree")
p_nc.add_argument("--timeout", type=int, default=8)
p_gnmi = sub.add_parser("gnmi-get", help="gNMI Get (READ-ONLY, stub unless client installed)")
p_gnmi.add_argument("--host", required=True)
p_gnmi.add_argument("--path", required=True, help="YANG path, e.g., /interfaces/interface[name=xe-0/0/0]/state")
p_gnmi.add_argument("--port", type=int, default=57400)
p_gnmi.add_argument("--timeout", type=int, default=8)
args = parser.parse_args()
gw = UCLSInfraGateway()
if args.cmd == "demo" or args.cmd is None:
# Simulation-only flows to show structure
print("# DEMO (simulation)")
print(json.dumps(gw.openadr_events("https://vtn.sim", "VEN_DEMO", "demo-token"), ensure_ascii=False, indent=2))
print(json.dumps(gw.snmp_get("router.sim", "1.3.6.1.2.1.1.3.0"), ensure_ascii=False, indent=2))
print(json.dumps(gw.netconf_get("router.sim", 830, "user", "pass", "<interfaces/>"), ensure_ascii=False, indent=2))
print(json.dumps(gw.gnmi_get("router.sim", "/interfaces/interface[name=xe-0/0/0]/state"), ensure_ascii=False, indent=2))
print(json.dumps(gw.iec61850_browse("substation.sim", "LD0", "MMXU1", "TotW"), ensure_ascii=False, indent=2))
print(json.dumps(gw.dnp3_read_analogs("rtu.sim", [0,1,2]), ensure_ascii=False, indent=2))
return
if args.cmd == "iec61850-browse":
res = gw.iec61850_browse(args.host, args.ap, args.ln, args.da, args.timeout)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if args.cmd == "dnp3-read":
idx = [int(x.strip()) for x in args.indices.split(",") if x.strip().isdigit()]
res = gw.dnp3_read_analogs(args.host, idx, args.port, args.timeout)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if args.cmd == "openadr-events":
res = gw.openadr_events(args.vtn, args.ven, args.token, args.timeout)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if args.cmd == "snmp-get":
res = gw.snmp_get(args.host, args.oid, args.community, args.v3_user, args.v3_auth, args.v3_priv, args.timeout)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if args.cmd == "netconf-get":
res = gw.netconf_get(args.host, args.port, args.user, args.password, args.filter_subtree, args.timeout)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if args.cmd == "gnmi-get":
res = gw.gnmi_get(args.host, args.path, args.port, args.timeout)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if __name__ == "__main__":
main()
Master Isotope Table (Known vs Predicted, Z = 1 → 118) – SolveForce Communications
The Anatomy of Global Currencies – SolveForce Communications
Key design choices (safety-first):
- Read-only adapters only. No breaker operations, no NETCONF
edit-config, no SNMPset, no gNMISet. - LIVE mode off by default (
UCLS_LIVE=0), simulation responses demonstrate structure. - Host allowlist required for any live call (
UCLS_WHITELIST). - Role gate (
UCLS_ROLE) enforces least privilege; even “admin” here is read-only. - Signed audit records printed as JSON for every operation.
If you want, I can add OpenADR VEN registration flows, IEC-61850 path helpers (LN/DO/DA traversal), or TM Forum Open APIs (for OSS/BSS) as additional read-only adapters under the same gateway.