ucls_infra_gateway.py


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ucls_infra_gateway.py
----------------------------------------------------------------------
A single, *safe-by-default* gateway that can communicate with:

• Electrical grid systems (READ-ONLY):
    - IEC 61850 MMS browser (substation data attributes)  [stub with optional lib]
    - DNP3 outstation reader                              [stub with optional lib]
    - OpenADR 2.0b/IEEE 2030.5 (SEP 2.0) event polling    [HTTPS, read-only]

• Telecommunication networks (READ-ONLY):
    - SNMPv2c/v3 polling (interfaces, uptime, health)     [pysnmp if present]
    - NETCONF get/get-config                              [ncclient if present]
    - gNMI telemetry subscribe/get                        [stub with optional lib]

The script implements strict guardrails:
    - LIVE mode is OFF by default (simulation only).
    - Read-only operations only; no control commands are implemented.
    - Host allowlist is required for live connections (UCLS_WHITELIST).
    - TLS verification is on, with optional CA/cert pinning.
    - Signed audit records for every operation.
    - Simple policy check (role-based) before any live network calls.

USAGE (CLI)
    # Simulation (default):
    python ucls_infra_gateway.py --demo

    # Live READ-ONLY calls (you must set LIVE=1 and whitelist targets):
    export UCLS_LIVE=1
    export UCLS_WHITELIST="10.0.0.10,router.example.net,substation01.utility"
    export UCLS_ROLE="viewer"   # or "operator" (still read-only in this tool)

    # SNMP GET if pysnmp is installed:
    python ucls_infra_gateway.py snmp-get --host router.example.net --oid 1.3.6.1.2.1.1.3.0 --v3-user netops --v3-auth SHA --v3-priv AES --timeout 5

    # NETCONF get-config (if ncclient is installed):
    python ucls_infra_gateway.py netconf-get --host router.example.net --port 830 --user admin --password '***' --filter-subtree '<interfaces/>'

    # IEC 61850 browse (stub if no lib):
    python ucls_infra_gateway.py iec61850-browse --host substation01.utility --ap LD0 --ln "MMXU1" --da "TotW"

    # OpenADR poll events (read-only):
    python ucls_infra_gateway.py openadr-events --vtn https://vtn.example/oadr --ven VEN_123 --token "***"

DISCLAIMER
    This is a high-integrity READ-ONLY integration scaffold for authorized environments.
    Do not use on systems you do not own or explicitly control. No control/operate APIs
    are provided here. Extending to controls requires rigorous safety, testing, and formal approval.
"""

from __future__ import annotations
import os, sys, json, time, hmac, hashlib, argparse, socket, ssl, re
from dataclasses import dataclass, asdict
from typing import Optional, Any, Dict, List, Tuple

# Optional dependencies (graceful degradation)
try:
    from pysnmp.hlapi import (
        SnmpEngine, UdpTransportTarget, CommunityData,
        ContextData, ObjectType, ObjectIdentity, getCmd,
        UsmUserData, usmHMACSHAAuthProtocol, usmHMACMD5AuthProtocol,
        usmAesCfb128Protocol, usmDESPrivProtocol
    )
    HAS_PYSNMP = True
except Exception:
    HAS_PYSNMP = False

try:
    from ncclient import manager as nc_manager
    HAS_NCCLIENT = True
except Exception:
    HAS_NCCLIENT = False

# gNMI client libs vary; we present a safe stub
HAS_GNMI = False

# --------------------------- Safety & Policy ---------------------------

def is_live() -> bool:
    return os.environ.get("UCLS_LIVE","0") in ("1","true","TRUE","yes","YES")

def whitelist() -> List[str]:
    raw = os.environ.get("UCLS_WHITELIST","")
    return [h.strip() for h in raw.split(",") if h.strip()]

def role() -> str:
    return os.environ.get("UCLS_ROLE","viewer")

def is_host_allowed(host: str) -> bool:
    allowed = whitelist()
    if not allowed:
        return False
    try:
        # Allow either raw match or resolved FQDN/IP match
        host_ip = socket.gethostbyname(host)
        return host in allowed or host_ip in allowed
    except Exception:
        # fall back to string check
        return host in allowed

def require_live_and_allowed(host: str):
    if not is_live():
        raise PermissionError("LIVE mode is disabled. Set UCLS_LIVE=1 to enable live network calls.")
    if not is_host_allowed(host):
        raise PermissionError(f"Host '{host}' not in UCLS_WHITELIST. Set UCLS_WHITELIST to a comma-separated list of allowed hosts/IPs.")

def require_role_at_least(min_role: str = "viewer"):
    # simple role lattice: viewer < operator < admin
    lattice = {"viewer":0, "operator":1, "admin":2}
    if lattice.get(role(),0) < lattice.get(min_role,0):
        raise PermissionError(f"Insufficient role: need '{min_role}', have '{role()}'.")

# --------------------------- Audit Trail ---------------------------

AUDIT_KEY = os.environ.get("UCLS_AUDIT_KEY","ucls-dev-key").encode()

@dataclass
class AuditRecord:
    ts: float
    actor: str
    action: str
    target: str
    payload: Dict[str, Any]
    result_preview: str
    signature: str = ""

def sign_record(rec: AuditRecord) -> AuditRecord:
    body = json.dumps({
        "ts": rec.ts,
        "actor": rec.actor,
        "action": rec.action,
        "target": rec.target,
        "payload": rec.payload,
        "result_preview": rec.result_preview
    }, sort_keys=True, ensure_ascii=False).encode()
    mac = hmac.new(AUDIT_KEY, body, hashlib.sha256).hexdigest()
    rec.signature = mac
    return rec

def audit_log(rec: AuditRecord):
    signed = sign_record(rec)
    print(json.dumps(asdict(signed), ensure_ascii=False))

# --------------------------- Utilities ---------------------------

def preview(data: Any, limit: int = 240) -> str:
    s = json.dumps(data, ensure_ascii=False) if isinstance(data, (dict,list)) else str(data)
    return (s[:limit] + "…") if len(s) > limit else s

# --------------------------- Grid Adapters (READ-ONLY) ---------------------------

class IEC61850Browser:
    """
    IEC 61850 MMS browser — safe stub.
    In a live setup, use a Python binding to 61850 MMS (or a gateway API) with READ access.
    """
    def browse(self, host: str, ap: str, ln: str, da: str, timeout: int = 5) -> Dict[str, Any]:
        require_live_and_allowed(host)
        require_role_at_least("viewer")
        # Stub: in real use, open MMS association, read DA value path: e.g., LD0/MMXU1.TotW.mag.f
        data = {
            "path": f"{ap}/{ln}.{da}",
            "quality": "valid",
            "timestamp": time.time(),
            "value": 12_345.67
        }
        audit_log(AuditRecord(time.time(), role(), "iec61850-browse", host,
                              {"ap":ap,"ln":ln,"da":da}, preview(data)))
        return data

class DNP3Reader:
    """
    DNP3 outstation (slave) reader — safe stub.
    In a live setup, use a DNP3 client binding to poll binary inputs/counters/analogs.
    """
    def read_analogs(self, host: str, indices: List[int], port: int = 20000, timeout: int = 5) -> Dict[str, Any]:
        require_live_and_allowed(host)
        require_role_at_least("viewer")
        data = {f"AI{idx}": (idx * 1.234) for idx in indices}
        audit_log(AuditRecord(time.time(), role(), "dnp3-read-analogs", f"{host}:{port}",
                              {"indices":indices}, preview(data)))
        return data

class OpenADRClient:
    """
    OpenADR 2.0b / IEEE 2030.5 VEN → VTN event poller — read-only.
    Real implementations exchange XML/JSON over TLS with VEN/VTN certificates.
    """
    def poll_events(self, vtn_url: str, ven_id: str, token: str, timeout: int = 8) -> Dict[str, Any]:
        if not is_live():
            # simulate event list
            data = {
                "vtn": vtn_url, "ven": ven_id,
                "events":[
                    {"eventID":"evt-123", "signalType":"simple", "signalName":"LOAD_SHED",
                     "start":"2025-08-18T20:00:00Z", "duration":"PT1H", "level":2}
                ]
            }
        else:
            # For live use, perform HTTPS GET/POST per OpenADR spec (omitted here; read-only).
            # Always verify TLS certificates and restrict to whitelisted domains.
            if not is_host_allowed(re.sub(r"^https?://","", vtn_url).split("/")[0]):
                raise PermissionError("VTN host not in allowlist.")
            data = {"vtn": vtn_url, "ven": ven_id, "events":[]}
        audit_log(AuditRecord(time.time(), role(), "openadr-events", vtn_url,
                              {"ven":ven_id}, preview(data)))
        return data

# --------------------------- Telecom Adapters (READ-ONLY) ---------------------------

class SNMPMonitor:
    """
    SNMPv2c/v3 GET — read-only polling.
    Requires pysnmp; falls back to simulation if unavailable or LIVE off.
    """
    def get(self, host: str, oid: str, community: Optional[str] = None,
            v3_user: Optional[str] = None, v3_auth: Optional[str] = None, v3_priv: Optional[str] = None,
            timeout: int = 5) -> Dict[str, Any]:
        if not is_live() or not HAS_PYSNMP:
            data = {"host": host, "oid": oid, "value": "simulated-uptime-123456"}
            audit_log(AuditRecord(time.time(), role(), "snmp-get-sim", host,
                                  {"oid":oid}, preview(data)))
            return data

        require_live_and_allowed(host)
        require_role_at_least("viewer")

        auth_proto = {"SHA": usmHMACSHAAuthProtocol, "MD5": usmHMACMD5AuthProtocol}.get(v3_auth, None)
        priv_proto = {"AES": usmAesCfb128Protocol, "DES": usmDESPrivProtocol}.get(v3_priv, None)

        if v3_user:
            security = UsmUserData(v3_user, authProtocol=auth_proto, privProtocol=priv_proto)
        else:
            # v2c
            community = community or "public"
            security = CommunityData(community, mpModel=1)

        errorIndication, errorStatus, errorIndex, varBinds = next(getCmd(
            SnmpEngine(),
            security,
            UdpTransportTarget((host, 161), timeout=timeout, retries=1),
            ContextData(),
            ObjectType(ObjectIdentity(oid))
        ))

        if errorIndication:
            raise RuntimeError(f"SNMP error: {errorIndication}")
        if errorStatus:
            raise RuntimeError(f"SNMP error: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex)-1][0] or '?'}")

        value = {str(name): str(val) for name, val in varBinds}
        audit_log(AuditRecord(time.time(), role(), "snmp-get", host,
                              {"oid":oid}, preview(value)))
        return {"host": host, "oid": oid, "value": value.get(oid, value)}

class NETCONFClient:
    """
    NETCONF over SSH — get/get-config (read-only).
    Requires ncclient; falls back to simulation if unavailable or LIVE off.
    """
    def get(self, host: str, port: int, user: str, password: str, filter_subtree: Optional[str] = None,
            timeout: int = 8) -> Dict[str, Any]:
        if not is_live() or not HAS_NCCLIENT:
            data = {"host": host, "operation": "get", "filter": filter_subtree or "<interfaces/>", "payload":"<simulated/>"}
            audit_log(AuditRecord(time.time(), role(), "netconf-get-sim", f"{host}:{port}",
                                  {"filter": filter_subtree}, preview(data)))
            return data

        require_live_and_allowed(host)
        require_role_at_least("viewer")

        with nc_manager.connect(host=host, port=port, username=user, password=password,
                                hostkey_verify=True, allow_agent=False, look_for_keys=False,
                                timeout=timeout) as m:
            if filter_subtree:
                reply = m.get(('subtree', filter_subtree))
            else:
                reply = m.get()
            xml = reply.xml
        audit_log(AuditRecord(time.time(), role(), "netconf-get", f"{host}:{port}",
                              {"filter": filter_subtree}, preview(xml)))
        return {"host": host, "operation":"get", "xml": xml}

class GNMIClient:
    """
    gNMI (gRPC Network Management Interface) — telemetry subscribe/get (read-only).
    Stub unless a concrete Python gNMI client is installed in your environment.
    """
    def get(self, host: str, path: str, port: int = 57400, timeout: int = 8) -> Dict[str, Any]:
        if not is_live() or not HAS_GNMI:
            data = {"host":host, "path":path, "value":{"interfaces/interface[name=xe-0/0/0]/state/counters/in-octets": 123456789}}
            audit_log(AuditRecord(time.time(), role(), "gnmi-get-sim", f"{host}:{port}",
                                  {"path": path}, preview(data)))
            return data
        require_live_and_allowed(host)
        require_role_at_least("viewer")
        # Real client would dial TLS gRPC and issue GetRequest.
        data = {"host":host, "path":path, "value":{}}
        audit_log(AuditRecord(time.time(), role(), "gnmi-get", f"{host}:{port}",
                              {"path": path}, preview(data)))
        return data

# --------------------------- Router ---------------------------

class UCLSInfraGateway:
    def __init__(self):
        self.grid_iec = IEC61850Browser()
        self.grid_dnp3 = DNP3Reader()
        self.grid_openadr = OpenADRClient()
        self.tel_snmp = SNMPMonitor()
        self.tel_netconf = NETCONFClient()
        self.tel_gnmi = GNMIClient()

    # Grid
    def iec61850_browse(self, host: str, ap: str, ln: str, da: str, timeout: int = 5):
        return self.grid_iec.browse(host, ap, ln, da, timeout)

    def dnp3_read_analogs(self, host: str, indices: List[int], port: int = 20000, timeout: int = 5):
        return self.grid_dnp3.read_analogs(host, indices, port, timeout)

    def openadr_events(self, vtn: str, ven: str, token: str, timeout: int = 8):
        return self.grid_openadr.poll_events(vtn, ven, token, timeout)

    # Telecom
    def snmp_get(self, host: str, oid: str, community: Optional[str] = None,
                 v3_user: Optional[str] = None, v3_auth: Optional[str] = None, v3_priv: Optional[str] = None,
                 timeout: int = 5):
        return self.tel_snmp.get(host, oid, community, v3_user, v3_auth, v3_priv, timeout)

    def netconf_get(self, host: str, port: int, user: str, password: str, filter_subtree: Optional[str] = None, timeout: int = 8):
        return self.tel_netconf.get(host, port, user, password, filter_subtree, timeout)

    def gnmi_get(self, host: str, path: str, port: int = 57400, timeout: int = 8):
        return self.tel_gnmi.get(host, path, port, timeout)

# --------------------------- CLI ---------------------------

def main():
    parser = argparse.ArgumentParser(description="UCLS Infra Gateway — Grid + Telecom (READ-ONLY, safe-by-default)")
    sub = parser.add_subparsers(dest="cmd")

    # Demo
    sub.add_parser("demo", help="Run simulation demo flows")

    # Grid
    p_iec = sub.add_parser("iec61850-browse", help="IEC 61850 MMS browse (READ-ONLY)")
    p_iec.add_argument("--host", required=True)
    p_iec.add_argument("--ap", required=True, help="Access Point / Logical Device (e.g., LD0)")
    p_iec.add_argument("--ln", required=True, help="Logical Node (e.g., MMXU1)")
    p_iec.add_argument("--da", required=True, help="Data Attribute (e.g., TotW)")
    p_iec.add_argument("--timeout", type=int, default=5)

    p_dnp = sub.add_parser("dnp3-read", help="DNP3 read analog indices (READ-ONLY)")
    p_dnp.add_argument("--host", required=True)
    p_dnp.add_argument("--port", type=int, default=20000)
    p_dnp.add_argument("--indices", required=True, help="Comma-separated indices, e.g., 0,1,2")
    p_dnp.add_argument("--timeout", type=int, default=5)

    p_oadr = sub.add_parser("openadr-events", help="OpenADR events poll (READ-ONLY)")
    p_oadr.add_argument("--vtn", required=True, help="VTN base URL")
    p_oadr.add_argument("--ven", required=True, help="VEN ID")
    p_oadr.add_argument("--token", required=True)
    p_oadr.add_argument("--timeout", type=int, default=8)

    # Telecom
    p_snmp = sub.add_parser("snmp-get", help="SNMP GET (READ-ONLY)")
    p_snmp.add_argument("--host", required=True)
    p_snmp.add_argument("--oid", required=True)
    p_snmp.add_argument("--community")
    p_snmp.add_argument("--v3-user")
    p_snmp.add_argument("--v3-auth", choices=["SHA","MD5"])
    p_snmp.add_argument("--v3-priv", choices=["AES","DES"])
    p_snmp.add_argument("--timeout", type=int, default=5)

    p_nc = sub.add_parser("netconf-get", help="NETCONF get/get-config (READ-ONLY)")
    p_nc.add_argument("--host", required=True)
    p_nc.add_argument("--port", type=int, default=830)
    p_nc.add_argument("--user", required=True)
    p_nc.add_argument("--password", required=True)
    p_nc.add_argument("--filter-subtree")
    p_nc.add_argument("--timeout", type=int, default=8)

    p_gnmi = sub.add_parser("gnmi-get", help="gNMI Get (READ-ONLY, stub unless client installed)")
    p_gnmi.add_argument("--host", required=True)
    p_gnmi.add_argument("--path", required=True, help="YANG path, e.g., /interfaces/interface[name=xe-0/0/0]/state")
    p_gnmi.add_argument("--port", type=int, default=57400)
    p_gnmi.add_argument("--timeout", type=int, default=8)

    args = parser.parse_args()
    gw = UCLSInfraGateway()

    if args.cmd == "demo" or args.cmd is None:
        # Simulation-only flows to show structure
        print("# DEMO (simulation)")
        print(json.dumps(gw.openadr_events("https://vtn.sim", "VEN_DEMO", "demo-token"), ensure_ascii=False, indent=2))
        print(json.dumps(gw.snmp_get("router.sim", "1.3.6.1.2.1.1.3.0"), ensure_ascii=False, indent=2))
        print(json.dumps(gw.netconf_get("router.sim", 830, "user", "pass", "<interfaces/>"), ensure_ascii=False, indent=2))
        print(json.dumps(gw.gnmi_get("router.sim", "/interfaces/interface[name=xe-0/0/0]/state"), ensure_ascii=False, indent=2))
        print(json.dumps(gw.iec61850_browse("substation.sim", "LD0", "MMXU1", "TotW"), ensure_ascii=False, indent=2))
        print(json.dumps(gw.dnp3_read_analogs("rtu.sim", [0,1,2]), ensure_ascii=False, indent=2))
        return

    if args.cmd == "iec61850-browse":
        res = gw.iec61850_browse(args.host, args.ap, args.ln, args.da, args.timeout)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

    if args.cmd == "dnp3-read":
        idx = [int(x.strip()) for x in args.indices.split(",") if x.strip().isdigit()]
        res = gw.dnp3_read_analogs(args.host, idx, args.port, args.timeout)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

    if args.cmd == "openadr-events":
        res = gw.openadr_events(args.vtn, args.ven, args.token, args.timeout)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

    if args.cmd == "snmp-get":
        res = gw.snmp_get(args.host, args.oid, args.community, args.v3_user, args.v3_auth, args.v3_priv, args.timeout)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

    if args.cmd == "netconf-get":
        res = gw.netconf_get(args.host, args.port, args.user, args.password, args.filter_subtree, args.timeout)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

    if args.cmd == "gnmi-get":
        res = gw.gnmi_get(args.host, args.path, args.port, args.timeout)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

if __name__ == "__main__":
    main()

When designing a “Logos‑based” language system, each unit of language must be defined by its role in the system rather than by vague usage so that machines build and interpret utterances coherently. – SolveForce Communications


Master Isotope Table (Known vs Predicted, Z = 1 → 118) – SolveForce Communications


Perfect, Ron — we’ll extend your universal math/operator interpreter to cover all major currency symbols. That way, the script will recognize not just +, ≤, ∑, etc., but also €, $, ¥, ₹, ₿, etc., and return their interpretations: symbol, LaTeX, HTML/Unicode, ASCII fallback, explanation, aliases, and language variants. – SolveForce Communications


Analysis and Resolution of Recurring Python Exceptions in a Notebook Environment – SolveForce Communications


The Anatomy of Global Currencies – SolveForce Communications


Key design choices (safety-first):

  • Read-only adapters only. No breaker operations, no NETCONF edit-config, no SNMP set, no gNMI Set.
  • LIVE mode off by default (UCLS_LIVE=0), simulation responses demonstrate structure.
  • Host allowlist required for any live call (UCLS_WHITELIST).
  • Role gate (UCLS_ROLE) enforces least privilege; even “admin” here is read-only.
  • Signed audit records printed as JSON for every operation.

If you want, I can add OpenADR VEN registration flows, IEC-61850 path helpers (LN/DO/DA traversal), or TM Forum Open APIs (for OSS/BSS) as additional read-only adapters under the same gateway.