ucls_unified_meter_gateway.py


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ucls_unified_meter_gateway.py
-------------------------------------------------------------------------------------------------
UCLS Unified Meter Gateway — a compatible, standardized, SAFE-BY-DEFAULT bridge that can *read*
from diverse meters and standards across interdisciplinary sciences and fields, normalize all
values into a canonical UCLS Measurement Model (UCLS-MM), and publish to dashboards.

READ-ONLY by construction. No control/operate paths are implemented.

COVERAGE (adapters = optional libs; all degrade to simulation/dry-run):
  • Electric / multi-utility meters:
      - DLMS/COSEM (IEC 62056)          [gurux-dlms if installed; else simulate]
      - ANSI C12.x (C12.18/.19/.22)     [stub simulate]
      - M-Bus / OMS (EN 13757)          [pymbus/pymeterbus if installed; else simulate]
      - Modbus/TCP                      [pymodbus if installed; else simulate]
  • Building/industrial:
      - BACnet/IP (ASHRAE 135)          [BAC0/pybacnet if installed; else simulate]
      - OPC UA                          [python-opcua if installed; else simulate]
  • IoT / Web / Messaging:
      - OGC SensorThings API            [HTTP GET; requests if installed; else simulate]
      - MQTT (Sparkplug B & JSON)       [paho-mqtt if installed; else simulate]
      - WebSocket push (dashboards)     [websocket-client if installed; else simulate]

CANONICAL DATA MODEL (UCLS-MM)
  {
    "id": "uuid or ULID",
    "device_id": "string",
    "stream_id": "string (protocol/topic/obis/path)",
    "observed_property": "string (e.g., 'active_power')",
    "value": float,
    "unit_ucum": "UCUM code (e.g., 'W', 'kW', 'kWh', 'm3/h', 'Cel')",
    "unit_si": "SI base unit (e.g., 'W','J','m3/s','K')",
    "si_value": float,   # converted to SI if possible
    "quality": {"flags": ["valid","simulated","overflow",...], "status": "ok|warn|bad"},
    "time": "ISO8601 UTC",
    "location": {"lat": float, "lon": float, "elev_m": float} (optional),
    "provenance": {"protocol": "dlms|mbus|modbus|bacnet|opcua|sensorthings|mqtt", "adapter": "classname", "source": "..."},
    "tags": {"discipline":"energy|hydrology|air|bio|...","sector":"grid|bms|plant|lab|..."}
  }

SAFETY
  - LIVE=0 by default → simulation/dry-run only
  - Allowlist REQUIRED for any live host/device (UCLS_WHITELIST, UCLS_MBUS_SERIAL, UCLS_DLMS_SERIAL etc.)
  - READ-ONLY across all protocols (no write/operate)
  - HMAC-signed JSON audit log per operation

CLI (examples)
  # Simulated quick reads
  python ucls_unified_meter_gateway.py demo

  # DLMS read (DRY-RUN unless LIVE=1 and allow-listed)
  UCLS_LIVE=1 UCLS_WHITELIST="meter01.utility" \
    python ucls_unified_meter_gateway.py read-dlms --host meter01.utility --port 4059 --obis 1-0:1.8.0

  # M-Bus TCP
  UCLS_LIVE=1 UCLS_WHITELIST="mbus.gateway.local" \
    python ucls_unified_meter_gateway.py read-mbus-tcp --host mbus.gateway.local --port 502 --secondary-id 12345678

  # Modbus/TCP (holding registers)
  UCLS_LIVE=1 UCLS_WHITELIST="plc.site.local" \
    python ucls_unified_meter_gateway.py read-modbus --host plc.site.local --addr 30001 --count 2

  # OPC UA
  UCLS_LIVE=1 UCLS_WHITELIST="opcua.site.local" \
    python ucls_unified_meter_gateway.py read-opcua --endpoint opc.tcp://opcua.site.local:4840 --node "ns=2;i=10853"

  # SensorThings API
  UCLS_LIVE=1 UCLS_WHITELIST="sensors.example.com" \
    python ucls_unified_meter_gateway.py read-sensorthings --base https://sensors.example.com/v1.1 --observation 12345

  # MQTT (SparkplugB/JSON) subscribe for 10s
  UCLS_LIVE=1 UCLS_WHITELIST="mqtt.broker.local" \
    python ucls_unified_meter_gateway.py mqtt-sub --broker mqtt.broker.local --topic "spBv1.0/#" --seconds 10

  # Normalize any raw JSON into UCLS-MM
  python ucls_unified_meter_gateway.py normalize --json '{"device_id":"x","observed_property":"temp","value":72,"unit_ucum":"[degF]"}'

  # Publish a measurement to MQTT/WebSocket
  python ucls_unified_meter_gateway.py publish --json '{...ucls-mm...}' --mqtt-broker mqtt.local --mqtt-topic ucls/mm --ws-url ws://localhost:8765
"""
from __future__ import annotations

import os, sys, json, time, re, socket, hmac, hashlib, uuid, math, argparse
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List, Tuple

# ---------- Optional dependencies (graceful) ----------
try:
    import requests
    HAS_REQUESTS = True
except Exception:
    HAS_REQUESTS = False

try:
    from pymodbus.client import ModbusTcpClient
    HAS_PYMODBUS = True
except Exception:
    HAS_PYMODBUS = False

try:
    from opcua import Client as OPCUAClient
    HAS_OPCUA = True
except Exception:
    HAS_OPCUA = False

try:
    import paho.mqtt.client as paho_mqtt
    HAS_PAHO = True
except Exception:
    HAS_PAHO = False

try:
    import websocket as ws_client
    HAS_WSCLIENT = True
except Exception:
    HAS_WSCLIENT = False

# DLMS/COSEM + M-Bus + BACnet: provide stubs unless libs available
try:
    import gurux_dlms  # type: ignore  # placeholder; typical package name is gurux_dlms (pip)
    HAS_DLMS = True
except Exception:
    HAS_DLMS = False

try:
    import meterbus  # type: ignore
    HAS_MBUS = True
except Exception:
    HAS_MBUS = False

try:
    import BAC0  # type: ignore
    HAS_BACNET = True
except Exception:
    HAS_BACNET = False

# ===================== Safety & Audit =====================

def is_live() -> bool:
    return os.environ.get("UCLS_LIVE","0") in ("1","true","TRUE","yes","YES")

def allowlist() -> List[str]:
    raw = os.environ.get("UCLS_WHITELIST","")
    return [h.strip() for h in raw.split(",") if h.strip()]

def host_allowed(host_or_url: str) -> bool:
    host = host_or_url
    m = re.match(r"^https?://([^/:]+)", host_or_url, re.I)
    if m: host = m.group(1)
    wl = allowlist()
    if not wl: return False
    try:
        ip = socket.gethostbyname(host)
    except Exception:
        ip = None
    return host in wl or (ip and ip in wl)

AUDIT_KEY = os.environ.get("UCLS_AUDIT_KEY","ucls-dev-key").encode()

def audit(action: str, target: str, payload: Dict[str,Any], result: Any):
    rec = {
        "ts": time.time(), "actor": os.environ.get("USER","ucls"),
        "action": action, "target": target, "payload": payload,
        "result_preview": (json.dumps(result, ensure_ascii=False)[:400] + "…") if len(str(result))>420 else result
    }
    body = json.dumps(rec, sort_keys=True, ensure_ascii=False).encode()
    rec["signature"] = hmac.new(AUDIT_KEY, body, hashlib.sha256).hexdigest()
    print(json.dumps(rec, ensure_ascii=False))

# ===================== Canonical Units & Conversion (UCUM→SI) =====================

# Minimal UCUM→SI mapping for common disciplines (extend as needed)
UNIT_MAP: Dict[str, Tuple[str, float, float]] = {
    # ucum : (si_unit, scale, offset)
    "W":    ("W", 1.0, 0.0),
    "kW":   ("W", 1000.0, 0.0),
    "MW":   ("W", 1e6, 0.0),
    "Wh":   ("J", 3600.0, 0.0),
    "kWh":  ("J", 3.6e6, 0.0),
    "m3":   ("m3", 1.0, 0.0),
    "m3/h": ("m3/s", 1.0/3600.0, 0.0),
    "L":    ("m3", 1e-3, 0.0),
    "Pa":   ("Pa", 1.0, 0.0),
    "bar":  ("Pa", 1e5, 0.0),
    "°C":   ("K", 1.0, 273.15),
    "Cel":  ("K", 1.0, 273.15),
    "[degF]": ("K", 5.0/9.0, 255.3722222),  # (F-32)*5/9 + 273.15
    "%RH":  ("%", 1.0, 0.0),
    "Hz":   ("Hz", 1.0, 0.0),
    "V":    ("V", 1.0, 0.0),
    "A":    ("A", 1.0, 0.0),
    "VAR":  ("VAR", 1.0, 0.0),
    "VA":   ("VA", 1.0, 0.0),
    "pf":   ("ratio", 1.0, 0.0),
    "ppm":  ("ppm", 1.0, 0.0),
    "mg/m3":("kg/m3", 1e-6, 0.0),
    "ug/m3":("kg/m3", 1e-9, 0.0)
}

def to_si(value: float, unit_ucum: str) -> Tuple[float, str]:
    u = UNIT_MAP.get(unit_ucum)
    if not u: return value, unit_ucum  # unknown, keep
    si_u, scale, offset = u
    # handle temperature with offset (C/F)
    if unit_ucum in ("°C","Cel","[degF]"):
        return value*scale + offset, si_u
    return value*scale + offset, si_u

# ===================== JSON-LD Context & SHACL (validation payloads) =====================

UCLS_MM_CONTEXT = {
  "@context": {
    "@vocab": "https://ucls.org/mm/",
    "id": "@id", "type": "@type",
    "device_id": "https://ucls.org/mm/device_id",
    "stream_id": "https://ucls.org/mm/stream_id",
    "observed_property": "https://ucls.org/mm/observed_property",
    "value": "https://ucls.org/mm/value",
    "unit_ucum": "https://ucls.org/mm/unit_ucum",
    "unit_si": "https://ucls.org/mm/unit_si",
    "si_value": "https://ucls.org/mm/si_value",
    "quality": "https://ucls.org/mm/quality",
    "time": "https://ucls.org/mm/time",
    "location": "https://ucls.org/mm/location",
    "provenance": "https://ucls.org/mm/provenance",
    "tags": "https://ucls.org/mm/tags"
  }
}

UCLS_MM_SHACL_TTL = """@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix ucls: <https://ucls.org/mm/> .
ucls:MeasurementShape a sh:NodeShape ;
  sh:targetClass ucls:Measurement ;
  sh:property [ sh:path ucls:device_id ; sh:minCount 1 ] ;
  sh:property [ sh:path ucls:observed_property ; sh:minCount 1 ] ;
  sh:property [ sh:path ucls:value ; sh:minCount 1 ] ;
  sh:property [ sh:path ucls:unit_ucum ; sh:minCount 1 ] ;
  sh:property [ sh:path ucls:time ; sh:minCount 1 ] .
"""

# ===================== Canonical Builder =====================

def make_mm(device_id: str, stream_id: str, observed_property: str, value: float,
            unit_ucum: str, protocol: str, adapter: str, source: str,
            tags: Optional[Dict[str,Any]]=None, quality_flags: Optional[List[str]]=None) -> Dict[str,Any]:
    si_value, unit_si = to_si(value, unit_ucum)
    meas = {
        "id": str(uuid.uuid4()),
        "device_id": device_id,
        "stream_id": stream_id,
        "observed_property": observed_property,
        "value": float(value),
        "unit_ucum": unit_ucum,
        "unit_si": unit_si,
        "si_value": float(si_value),
        "quality": {"flags": quality_flags or ["valid"], "status": "ok"},
        "time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        "provenance": {"protocol": protocol, "adapter": adapter, "source": source},
        "tags": tags or {}
    }
    return meas

# ===================== Adapters (READ-ONLY; SIM fallback) =====================

class DLMSReader:
    """DLMS/COSEM OBIS read (IEC 62056). Sim fallback if gurux_dlms not present or LIVE=0."""
    def read_obis(self, host: str, port: int, obis: str, device_id: Optional[str]=None) -> Dict[str,Any]:
        if not is_live() or not host_allowed(host) or not HAS_DLMS:
            # Simulate common OBIS 1-0:1.8.0 (active energy import, total)
            val = 12345.678  # kWh
            mm = make_mm(device_id or host, obis, "active_energy_import_total",
                         val, "kWh", "dlms", "DLMSReader", f"{host}:{port}",
                         tags={"discipline":"energy","sector":"grid"}, quality_flags=["simulated"])
            audit("dlms-read-sim", host, {"obis":obis,"port":port}, mm); return mm
        # Real DLMS call would go here (association, GET by OBIS) — omitted for safety.
        mm = make_mm(device_id or host, obis, "unknown_property",
                     float("nan"), "", "dlms", "DLMSReader", f"{host}:{port}", quality_flags=["unknown"])
        audit("dlms-read", host, {"obis":obis,"port":port}, mm); return mm

class MBusReader:
    """M-Bus EN 13757/OMS — TCP or serial. Sim fallback."""
    def read_tcp(self, host: str, port: int, secondary_id: Optional[str]) -> Dict[str,Any]:
        if not is_live() or not host_allowed(host) or not HAS_MBUS:
            mm = make_mm(secondary_id or host, f"mbus:{secondary_id or 'sim'}",
                         "volume_total", 12.345, "m3", "mbus", "MBusReader", f"{host}:{port}",
                         tags={"discipline":"hydrology","sector":"utility"}, quality_flags=["simulated"])
            audit("mbus-read-sim", host, {"port":port,"sec_id":secondary_id}, mm); return mm
        mm = make_mm(secondary_id or host, f"mbus:{secondary_id}",
                     "unknown_property", float("nan"), "", "mbus","MBusReader", f"{host}:{port}", quality_flags=["unknown"])
        audit("mbus-read", host, {"port":port,"sec_id":secondary_id}, mm); return mm

class ANSI_C12Reader:
    """ANSI C12.x stub — simulate common reads (demand/energy)."""
    def read(self, port: str, register: str, device_id: Optional[str]=None) -> Dict[str,Any]:
        # Simulation only in this scaffold
        mm = make_mm(device_id or port, f"c12:{register}", "active_energy_import_total",
                     6789.01, "kWh", "ansi_c12", "ANSI_C12Reader", port,
                     tags={"discipline":"energy","sector":"grid"}, quality_flags=["simulated"])
        audit("ansi-c12-read-sim", port, {"register":register}, mm); return mm

class ModbusReader:
    def read_holding(self, host: str, addr: int, count: int, device_id: Optional[str]=None, port: int = 502) -> Dict[str,Any]:
        if not is_live() or not host_allowed(host) or not HAS_PYMODBUS:
            # Simulate two registers composing a 32-bit float (e.g., flow)
            value = 123.4
            mm = make_mm(device_id or host, f"modbus:{addr}/{count}", "flow_rate",
                         value, "m3/h", "modbus", "ModbusReader", f"{host}:{port}",
                         tags={"discipline":"process","sector":"plant"}, quality_flags=["simulated"])
            audit("modbus-read-sim", host, {"addr":addr,"count":count}, mm); return mm
        client = ModbusTcpClient(host=host, port=port)
        client.connect()
        rr = client.read_holding_registers(addr, count, slave=1)
        client.close()
        value = float(rr.registers[0]) if rr and hasattr(rr,"registers") else float("nan")
        mm = make_mm(device_id or host, f"modbus:{addr}/{count}", "value_raw",
                     value, "", "modbus","ModbusReader", f"{host}:{port}")
        audit("modbus-read", host, {"addr":addr,"count":count}, mm); return mm

class BACnetReader:
    def read(self, device: str, obj: str, prop: str, device_id: Optional[str]=None) -> Dict[str,Any]:
        if not is_live() or not HAS_BACNET:
            mm = make_mm(device_id or device, f"bacnet:{obj}/{prop}", "air_temperature",
                         22.4, "Cel", "bacnet", "BACnetReader", device,
                         tags={"discipline":"hvac","sector":"bms"}, quality_flags=["simulated"])
            audit("bacnet-read-sim", device, {"obj":obj,"prop":prop}, mm); return mm
        mm = make_mm(device_id or device, f"bacnet:{obj}/{prop}", "value_raw",
                     float("nan"), "", "bacnet","BACnetReader", device, quality_flags=["unknown"])
        audit("bacnet-read", device, {"obj":obj,"prop":prop}, mm); return mm

class OPCUAReader:
    def read_node(self, endpoint: str, node: str, device_id: Optional[str]=None) -> Dict[str,Any]:
        if not is_live() or not HAS_OPCUA:
            mm = make_mm(device_id or endpoint, f"opcua:{node}", "pressure",
                         2.1, "bar", "opcua","OPCUAReader", endpoint,
                         tags={"discipline":"process","sector":"plant"}, quality_flags=["simulated"])
            audit("opcua-read-sim", endpoint, {"node":node}, mm); return mm
        host = re.sub(r"^opc\.tcp://","", endpoint).split(":")[0]
        if not host_allowed(host):
            raise PermissionError("Endpoint host not allowed")
        client = OPCUAClient(endpoint); client.connect()
        v = client.get_node(node).get_value(); client.disconnect()
        val = float(v) if isinstance(v,(int,float)) else float("nan")
        mm = make_mm(device_id or endpoint, f"opcua:{node}", "value_raw",
                     val, "", "opcua","OPCUAReader", endpoint)
        audit("opcua-read", endpoint, {"node":node}, mm); return mm

class SensorThingsReader:
    def read_observation(self, base: str, observation_id: str, device_id: Optional[str]=None) -> Dict[str,Any]:
        if not is_live() or not host_allowed(base) or not HAS_REQUESTS:
            mm = make_mm(device_id or base, f"sensorthings:obs/{observation_id}", "pm25",
                         12.7, "ug/m3", "sensorthings","SensorThingsReader", base,
                         tags={"discipline":"air","sector":"env"}, quality_flags=["simulated"])
            audit("sensorthings-read-sim", base, {"obs_id":observation_id}, mm); return mm
        url = f"{base.rstrip('/')}/Observations({observation_id})"
        r = requests.get(url, timeout=10)
        if r.status_code != 200:
            mm = make_mm(device_id or base, f"sensorthings:obs/{observation_id}", "value_raw",
                         float("nan"), "", "sensorthings","SensorThingsReader", base, quality_flags=["bad"])
            audit("sensorthings-read", base, {"obs_id":observation_id}, {"status": r.status_code}); return mm
        data = r.json()
        value = data.get("result")
        prop = (data.get("Datastream",{}) or {}).get("name","observation")
        unit = ((data.get("Datastream",{}) or {}).get("unitOfMeasurement",{}) or {}).get("symbol","")
        mm = make_mm(device_id or base, f"sensorthings:obs/{observation_id}", prop,
                     float(value) if isinstance(value,(int,float)) else float("nan"),
                     unit or "", "sensorthings","SensorThingsReader", url)
        audit("sensorthings-read", base, {"obs_id":observation_id}, mm); return mm

class MQTTMonitor:
    def subscribe(self, broker: str, topic: str, seconds: int = 10) -> List[Dict[str,Any]]:
        if not is_live() or not HAS_PAHO or not host_allowed(broker):
            mm = make_mm(broker, f"mqtt:{topic}", "message_count",
                         1, "", "mqtt","MQTTMonitor", broker,
                         tags={"discipline":"messaging"}, quality_flags=["simulated"])
            audit("mqtt-sub-sim", broker, {"topic":topic}, mm); return [mm]
        messages: List[Dict[str,Any]] = []
        def on_message(client, userdata, msg):
            try:
                payload = json.loads(msg.payload.decode(errors="replace"))
                # If SparkplugB or JSON with value/unit, attempt normalization
                if isinstance(payload, dict) and "value" in payload and "unit" in payload:
                    mm = make_mm(broker, f"mqtt:{msg.topic}", payload.get("observed_property","value"),
                                 float(payload["value"]), str(payload["unit"]), "mqtt","MQTTMonitor", broker)
                else:
                    mm = make_mm(broker, f"mqtt:{msg.topic}", "message",
                                 1.0, "", "mqtt","MQTTMonitor", broker, quality_flags=["raw"])
            except Exception:
                mm = make_mm(broker, f"mqtt:{msg.topic}", "message", 1.0, "", "mqtt","MQTTMonitor", broker, quality_flags=["raw"])
            messages.append(mm)
        client = paho_mqtt.Client()
        client.on_message = on_message; client.connect(broker, 1883, 60)
        client.subscribe(topic); client.loop_start()
        t0 = time.time()
        while time.time() - t0 < seconds: time.sleep(0.1)
        client.loop_stop(); client.disconnect()
        for m in messages: audit("mqtt-sub", broker, {"topic":topic}, m)
        return messages

# ===================== Publishers =====================

def publish_mqtt(obj: Dict[str,Any], broker: Optional[str], topic: Optional[str]) -> Dict[str,Any]:
    if not broker or not topic:
        return {"published": False, "reason": "no broker/topic"}
    if not is_live() or not HAS_PAHO or not host_allowed(broker):
        res = {"published": False, "simulated": True, "broker": broker, "topic": topic}
        audit("mqtt-publish-sim", broker, {"topic":topic}, res); return res
    client = paho_mqtt.Client(); client.connect(broker, 1883, 60)
    payload = json.dumps(obj, ensure_ascii=False); rc = client.publish(topic, payload, qos=0, retain=False)
    client.disconnect()
    res = {"published": rc.rc == 0, "broker": broker, "topic": topic}
    audit("mqtt-publish", broker, {"topic":topic}, res); return res

def publish_ws(obj: Dict[str,Any], ws_url: Optional[str]) -> Dict[str,Any]:
    if not ws_url:
        return {"published": False, "reason": "no ws_url"}
    payload = json.dumps(obj, ensure_ascii=False)
    if not is_live() or not HAS_WSCLIENT or not host_allowed(ws_url):
        res = {"published": False, "simulated": True, "ws_url": ws_url}
        audit("ws-publish-sim", ws_url, {}, res); return res
    ws = ws_client.create_connection(ws_url, timeout=5)
    ws.send(payload)
    try:
        ack = ws.recv()
    except Exception:
        ack = None
    ws.close()
    res = {"published": True, "ws_url": ws_url, "ack": (ack[:200]+"…") if isinstance(ack,str) and len(ack)>200 else ack}
    audit("ws-publish", ws_url, {}, res); return res

# ===================== CLI =====================

def main():
    dlms = DLMSReader(); mbus = MBusReader(); c12 = ANSI_C12Reader()
    modbus = ModbusReader(); bacnet = BACnetReader(); opcua = OPCUAReader()
    sens = SensorThingsReader(); mqttm = MQTTMonitor()

    p = argparse.ArgumentParser(description="UCLS Unified Meter Gateway — standardized, read-only")
    sub = p.add_subparsers(dest="cmd")

    sub.add_parser("demo", help="Run simulated reads across adapters")

    # DLMS
    d = sub.add_parser("read-dlms"); d.add_argument("--host", required=True); d.add_argument("--port", type=int, default=4059)
    d.add_argument("--obis", required=True); d.add_argument("--device-id")

    # M-Bus TCP
    mt = sub.add_parser("read-mbus-tcp"); mt.add_argument("--host", required=True); mt.add_argument("--port", type=int, default=502)
    mt.add_argument("--secondary-id"); mt.add_argument("--device-id")

    # ANSI C12 stub (serial id)
    c = sub.add_parser("read-c12"); c.add_argument("--port", required=True); c.add_argument("--register", default="1.0.0"); c.add_argument("--device-id")

    # Modbus/TCP
    m = sub.add_parser("read-modbus"); m.add_argument("--host", required=True); m.add_argument("--addr", type=int, required=True)
    m.add_argument("--count", type=int, default=2); m.add_argument("--port", type=int, default=502); m.add_argument("--device-id")

    # BACnet/IP
    b = sub.add_parser("read-bacnet"); b.add_argument("--device", required=True); b.add_argument("--object", required=True); b.add_argument("--property", required=True); b.add_argument("--device-id")

    # OPC UA
    o = sub.add_parser("read-opcua"); o.add_argument("--endpoint", required=True); o.add_argument("--node", required=True); o.add_argument("--device-id")

    # SensorThings
    s = sub.add_parser("read-sensorthings"); s.add_argument("--base", required=True); s.add_argument("--observation", required=True); s.add_argument("--device-id")

    # MQTT subscribe
    mq = sub.add_parser("mqtt-sub"); mq.add_argument("--broker", required=True); mq.add_argument("--topic", required=True); mq.add_argument("--seconds", type=int, default=10)

    # Normalize / Publish
    n = sub.add_parser("normalize"); n.add_argument("--json", required=True)
    pub = sub.add_parser("publish"); pub.add_argument("--json", required=True); pub.add_argument("--mqtt-broker"); pub.add_argument("--mqtt-topic"); pub.add_argument("--ws-url")

    args = p.parse_args()

    if args.cmd in (None, "demo"):
        out = []
        out.append(dlms.read_obis("meter.sim", 4059, "1-0:1.8.0"))
        out.append(mbus.read_tcp("mbus.sim", 502, "12345678"))
        out.append(c12.read("/dev/ttyS0", "1.0.0"))
        out.append(modbus.read_holding("plc.sim", 30001, 2))
        out.append(bacnet.read("bacnet.sim", "analogInput,1", "presentValue"))
        out.append(opcua.read_node("opc.tcp://opcua.sim:4840", "ns=2;i=10853"))
        out.append(sens.read_observation("https://sens.sim/v1.1", "1"))
        msgs = mqttm.subscribe("mqtt.sim","ucls/#", seconds=2); out.extend(msgs)
        print(json.dumps(out, ensure_ascii=False, indent=2)); return

    if args.cmd == "read-dlms":
        print(json.dumps(dlms.read_obis(args.host, args.port, args.obis, args.device_id), ensure_ascii=False, indent=2)); return

    if args.cmd == "read-mbus-tcp":
        print(json.dumps(mbus.read_tcp(args.host, args.port, args.secondary_id), ensure_ascii=False, indent=2)); return

    if args.cmd == "read-c12":
        print(json.dumps(c12.read(args.port, args.register, args.device_id), ensure_ascii=False, indent=2)); return

    if args.cmd == "read-modbus":
        print(json.dumps(modbus.read_holding(args.host, args.addr, args.count, args.device_id, args.port), ensure_ascii=False, indent=2)); return

    if args.cmd == "read-bacnet":
        print(json.dumps(bacnet.read(args.device, args.object, args.property, args.device_id), ensure_ascii=False, indent=2)); return

    if args.cmd == "read-opcua":
        print(json.dumps(opcua.read_node(args.endpoint, args.node, args.device_id), ensure_ascii=False, indent=2)); return

    if args.cmd == "read-sensorthings":
        print(json.dumps(sens.read_observation(args.base, args.observation, args.device_id), ensure_ascii=False, indent=2)); return

    if args.cmd == "mqtt-sub":
        print(json.dumps(mqttm.subscribe(args.broker, args.topic, args.seconds), ensure_ascii=False, indent=2)); return

    if args.cmd == "normalize":
        raw = json.loads(args.json)
        # If already UCLS-MM, just echo; else try to map common patterns
        if {"device_id","observed_property","value","unit_ucum","time"} <= set(raw.keys()):
            mm = raw
        else:
            # Heuristic mapping
            device_id = raw.get("device") or raw.get("device_id") or "unknown"
            prop = raw.get("observed_property") or raw.get("property") or "value"
            val = float(raw.get("value", "nan"))
            unit = raw.get("unit") or raw.get("unit_ucum") or ""
            mm = make_mm(device_id, raw.get("stream","raw"), prop, val, unit, "normalize","mapper","local", quality_flags=["normalized"])
        print(json.dumps(mm, ensure_ascii=False, indent=2)); return

    if args.cmd == "publish":
        obj = json.loads(args.json)
        if not isinstance(obj, dict):
            print(json.dumps({"error":"payload must be an object"}, ensure_ascii=False)); return
        if args.mqtt_broker and args.mqtt_topic:
            publish_mqtt(obj, args.mqtt_broker, args.mqtt_topic)
        if args.ws_url:
            publish_ws(obj, args.ws_url)
        print(json.dumps({"ok": True}, ensure_ascii=False)); return

if __name__ == "__main__":
    main()

- SolveForce -

🗂️ Quick Links

Home

Fiber Lookup Tool

Suppliers

Services

Technology

Quote Request

Contact

🌐 Solutions by Sector

Communications & Connectivity

Information Technology (IT)

Industry 4.0 & Automation

Cross-Industry Enabling Technologies

🛠️ Our Services

Managed IT Services

Cloud Services

Cybersecurity Solutions

Unified Communications (UCaaS)

Internet of Things (IoT)

🔍 Technology Solutions

Cloud Computing

AI & Machine Learning

Edge Computing

Blockchain

VR/AR Solutions

💼 Industries Served

Healthcare

Finance & Insurance

Manufacturing

Education

Retail & Consumer Goods

Energy & Utilities

🌍 Worldwide Coverage

North America

South America

Europe

Asia

Africa

Australia

Oceania

📚 Resources

Blog & Articles

Case Studies

Industry Reports

Whitepapers

FAQs

🤝 Partnerships & Affiliations

Industry Partners

Technology Partners

Affiliations

Awards & Certifications

📄 Legal & Privacy

Privacy Policy

Terms of Service

Cookie Policy

Accessibility

Site Map


📞 Contact SolveForce
Toll-Free: (888) 765-8301
Email: support@solveforce.com

Follow Us: LinkedIn | Twitter/X | Facebook | YouTube