#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ucls_unified_meter_gateway.py
-------------------------------------------------------------------------------------------------
UCLS Unified Meter Gateway — a compatible, standardized, SAFE-BY-DEFAULT bridge that can *read*
from diverse meters and standards across interdisciplinary sciences and fields, normalize all
values into a canonical UCLS Measurement Model (UCLS-MM), and publish to dashboards.
READ-ONLY by construction. No control/operate paths are implemented.
COVERAGE (adapters = optional libs; all degrade to simulation/dry-run):
• Electric / multi-utility meters:
- DLMS/COSEM (IEC 62056) [gurux-dlms if installed; else simulate]
- ANSI C12.x (C12.18/.19/.22) [stub simulate]
- M-Bus / OMS (EN 13757) [pymbus/pymeterbus if installed; else simulate]
- Modbus/TCP [pymodbus if installed; else simulate]
• Building/industrial:
- BACnet/IP (ASHRAE 135) [BAC0/pybacnet if installed; else simulate]
- OPC UA [python-opcua if installed; else simulate]
• IoT / Web / Messaging:
- OGC SensorThings API [HTTP GET; requests if installed; else simulate]
- MQTT (Sparkplug B & JSON) [paho-mqtt if installed; else simulate]
- WebSocket push (dashboards) [websocket-client if installed; else simulate]
CANONICAL DATA MODEL (UCLS-MM)
{
"id": "uuid or ULID",
"device_id": "string",
"stream_id": "string (protocol/topic/obis/path)",
"observed_property": "string (e.g., 'active_power')",
"value": float,
"unit_ucum": "UCUM code (e.g., 'W', 'kW', 'kWh', 'm3/h', 'Cel')",
"unit_si": "SI base unit (e.g., 'W','J','m3/s','K')",
"si_value": float, # converted to SI if possible
"quality": {"flags": ["valid","simulated","overflow",...], "status": "ok|warn|bad"},
"time": "ISO8601 UTC",
"location": {"lat": float, "lon": float, "elev_m": float} (optional),
"provenance": {"protocol": "dlms|mbus|modbus|bacnet|opcua|sensorthings|mqtt", "adapter": "classname", "source": "..."},
"tags": {"discipline":"energy|hydrology|air|bio|...","sector":"grid|bms|plant|lab|..."}
}
SAFETY
- LIVE=0 by default → simulation/dry-run only
- Allowlist REQUIRED for any live host/device (UCLS_WHITELIST, UCLS_MBUS_SERIAL, UCLS_DLMS_SERIAL etc.)
- READ-ONLY across all protocols (no write/operate)
- HMAC-signed JSON audit log per operation
CLI (examples)
# Simulated quick reads
python ucls_unified_meter_gateway.py demo
# DLMS read (DRY-RUN unless LIVE=1 and allow-listed)
UCLS_LIVE=1 UCLS_WHITELIST="meter01.utility" \
python ucls_unified_meter_gateway.py read-dlms --host meter01.utility --port 4059 --obis 1-0:1.8.0
# M-Bus TCP
UCLS_LIVE=1 UCLS_WHITELIST="mbus.gateway.local" \
python ucls_unified_meter_gateway.py read-mbus-tcp --host mbus.gateway.local --port 502 --secondary-id 12345678
# Modbus/TCP (holding registers)
UCLS_LIVE=1 UCLS_WHITELIST="plc.site.local" \
python ucls_unified_meter_gateway.py read-modbus --host plc.site.local --addr 30001 --count 2
# OPC UA
UCLS_LIVE=1 UCLS_WHITELIST="opcua.site.local" \
python ucls_unified_meter_gateway.py read-opcua --endpoint opc.tcp://opcua.site.local:4840 --node "ns=2;i=10853"
# SensorThings API
UCLS_LIVE=1 UCLS_WHITELIST="sensors.example.com" \
python ucls_unified_meter_gateway.py read-sensorthings --base https://sensors.example.com/v1.1 --observation 12345
# MQTT (SparkplugB/JSON) subscribe for 10s
UCLS_LIVE=1 UCLS_WHITELIST="mqtt.broker.local" \
python ucls_unified_meter_gateway.py mqtt-sub --broker mqtt.broker.local --topic "spBv1.0/#" --seconds 10
# Normalize any raw JSON into UCLS-MM
python ucls_unified_meter_gateway.py normalize --json '{"device_id":"x","observed_property":"temp","value":72,"unit_ucum":"[degF]"}'
# Publish a measurement to MQTT/WebSocket
python ucls_unified_meter_gateway.py publish --json '{...ucls-mm...}' --mqtt-broker mqtt.local --mqtt-topic ucls/mm --ws-url ws://localhost:8765
"""
from __future__ import annotations
import os, sys, json, time, re, socket, hmac, hashlib, uuid, math, argparse
from dataclasses import dataclass, asdict
from typing import Optional, Dict, Any, List, Tuple
# ---------- Optional dependencies (graceful) ----------
try:
import requests
HAS_REQUESTS = True
except Exception:
HAS_REQUESTS = False
try:
from pymodbus.client import ModbusTcpClient
HAS_PYMODBUS = True
except Exception:
HAS_PYMODBUS = False
try:
from opcua import Client as OPCUAClient
HAS_OPCUA = True
except Exception:
HAS_OPCUA = False
try:
import paho.mqtt.client as paho_mqtt
HAS_PAHO = True
except Exception:
HAS_PAHO = False
try:
import websocket as ws_client
HAS_WSCLIENT = True
except Exception:
HAS_WSCLIENT = False
# DLMS/COSEM + M-Bus + BACnet: provide stubs unless libs available
try:
import gurux_dlms # type: ignore # placeholder; typical package name is gurux_dlms (pip)
HAS_DLMS = True
except Exception:
HAS_DLMS = False
try:
import meterbus # type: ignore
HAS_MBUS = True
except Exception:
HAS_MBUS = False
try:
import BAC0 # type: ignore
HAS_BACNET = True
except Exception:
HAS_BACNET = False
# ===================== Safety & Audit =====================
def is_live() -> bool:
return os.environ.get("UCLS_LIVE","0") in ("1","true","TRUE","yes","YES")
def allowlist() -> List[str]:
raw = os.environ.get("UCLS_WHITELIST","")
return [h.strip() for h in raw.split(",") if h.strip()]
def host_allowed(host_or_url: str) -> bool:
host = host_or_url
m = re.match(r"^https?://([^/:]+)", host_or_url, re.I)
if m: host = m.group(1)
wl = allowlist()
if not wl: return False
try:
ip = socket.gethostbyname(host)
except Exception:
ip = None
return host in wl or (ip and ip in wl)
AUDIT_KEY = os.environ.get("UCLS_AUDIT_KEY","ucls-dev-key").encode()
def audit(action: str, target: str, payload: Dict[str,Any], result: Any):
rec = {
"ts": time.time(), "actor": os.environ.get("USER","ucls"),
"action": action, "target": target, "payload": payload,
"result_preview": (json.dumps(result, ensure_ascii=False)[:400] + "…") if len(str(result))>420 else result
}
body = json.dumps(rec, sort_keys=True, ensure_ascii=False).encode()
rec["signature"] = hmac.new(AUDIT_KEY, body, hashlib.sha256).hexdigest()
print(json.dumps(rec, ensure_ascii=False))
# ===================== Canonical Units & Conversion (UCUM→SI) =====================
# Minimal UCUM→SI mapping for common disciplines (extend as needed)
UNIT_MAP: Dict[str, Tuple[str, float, float]] = {
# ucum : (si_unit, scale, offset)
"W": ("W", 1.0, 0.0),
"kW": ("W", 1000.0, 0.0),
"MW": ("W", 1e6, 0.0),
"Wh": ("J", 3600.0, 0.0),
"kWh": ("J", 3.6e6, 0.0),
"m3": ("m3", 1.0, 0.0),
"m3/h": ("m3/s", 1.0/3600.0, 0.0),
"L": ("m3", 1e-3, 0.0),
"Pa": ("Pa", 1.0, 0.0),
"bar": ("Pa", 1e5, 0.0),
"°C": ("K", 1.0, 273.15),
"Cel": ("K", 1.0, 273.15),
"[degF]": ("K", 5.0/9.0, 255.3722222), # (F-32)*5/9 + 273.15
"%RH": ("%", 1.0, 0.0),
"Hz": ("Hz", 1.0, 0.0),
"V": ("V", 1.0, 0.0),
"A": ("A", 1.0, 0.0),
"VAR": ("VAR", 1.0, 0.0),
"VA": ("VA", 1.0, 0.0),
"pf": ("ratio", 1.0, 0.0),
"ppm": ("ppm", 1.0, 0.0),
"mg/m3":("kg/m3", 1e-6, 0.0),
"ug/m3":("kg/m3", 1e-9, 0.0)
}
def to_si(value: float, unit_ucum: str) -> Tuple[float, str]:
u = UNIT_MAP.get(unit_ucum)
if not u: return value, unit_ucum # unknown, keep
si_u, scale, offset = u
# handle temperature with offset (C/F)
if unit_ucum in ("°C","Cel","[degF]"):
return value*scale + offset, si_u
return value*scale + offset, si_u
# ===================== JSON-LD Context & SHACL (validation payloads) =====================
UCLS_MM_CONTEXT = {
"@context": {
"@vocab": "https://ucls.org/mm/",
"id": "@id", "type": "@type",
"device_id": "https://ucls.org/mm/device_id",
"stream_id": "https://ucls.org/mm/stream_id",
"observed_property": "https://ucls.org/mm/observed_property",
"value": "https://ucls.org/mm/value",
"unit_ucum": "https://ucls.org/mm/unit_ucum",
"unit_si": "https://ucls.org/mm/unit_si",
"si_value": "https://ucls.org/mm/si_value",
"quality": "https://ucls.org/mm/quality",
"time": "https://ucls.org/mm/time",
"location": "https://ucls.org/mm/location",
"provenance": "https://ucls.org/mm/provenance",
"tags": "https://ucls.org/mm/tags"
}
}
UCLS_MM_SHACL_TTL = """@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix ucls: <https://ucls.org/mm/> .
ucls:MeasurementShape a sh:NodeShape ;
sh:targetClass ucls:Measurement ;
sh:property [ sh:path ucls:device_id ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:observed_property ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:value ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:unit_ucum ; sh:minCount 1 ] ;
sh:property [ sh:path ucls:time ; sh:minCount 1 ] .
"""
# ===================== Canonical Builder =====================
def make_mm(device_id: str, stream_id: str, observed_property: str, value: float,
unit_ucum: str, protocol: str, adapter: str, source: str,
tags: Optional[Dict[str,Any]]=None, quality_flags: Optional[List[str]]=None) -> Dict[str,Any]:
si_value, unit_si = to_si(value, unit_ucum)
meas = {
"id": str(uuid.uuid4()),
"device_id": device_id,
"stream_id": stream_id,
"observed_property": observed_property,
"value": float(value),
"unit_ucum": unit_ucum,
"unit_si": unit_si,
"si_value": float(si_value),
"quality": {"flags": quality_flags or ["valid"], "status": "ok"},
"time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"provenance": {"protocol": protocol, "adapter": adapter, "source": source},
"tags": tags or {}
}
return meas
# ===================== Adapters (READ-ONLY; SIM fallback) =====================
class DLMSReader:
"""DLMS/COSEM OBIS read (IEC 62056). Sim fallback if gurux_dlms not present or LIVE=0."""
def read_obis(self, host: str, port: int, obis: str, device_id: Optional[str]=None) -> Dict[str,Any]:
if not is_live() or not host_allowed(host) or not HAS_DLMS:
# Simulate common OBIS 1-0:1.8.0 (active energy import, total)
val = 12345.678 # kWh
mm = make_mm(device_id or host, obis, "active_energy_import_total",
val, "kWh", "dlms", "DLMSReader", f"{host}:{port}",
tags={"discipline":"energy","sector":"grid"}, quality_flags=["simulated"])
audit("dlms-read-sim", host, {"obis":obis,"port":port}, mm); return mm
# Real DLMS call would go here (association, GET by OBIS) — omitted for safety.
mm = make_mm(device_id or host, obis, "unknown_property",
float("nan"), "", "dlms", "DLMSReader", f"{host}:{port}", quality_flags=["unknown"])
audit("dlms-read", host, {"obis":obis,"port":port}, mm); return mm
class MBusReader:
"""M-Bus EN 13757/OMS — TCP or serial. Sim fallback."""
def read_tcp(self, host: str, port: int, secondary_id: Optional[str]) -> Dict[str,Any]:
if not is_live() or not host_allowed(host) or not HAS_MBUS:
mm = make_mm(secondary_id or host, f"mbus:{secondary_id or 'sim'}",
"volume_total", 12.345, "m3", "mbus", "MBusReader", f"{host}:{port}",
tags={"discipline":"hydrology","sector":"utility"}, quality_flags=["simulated"])
audit("mbus-read-sim", host, {"port":port,"sec_id":secondary_id}, mm); return mm
mm = make_mm(secondary_id or host, f"mbus:{secondary_id}",
"unknown_property", float("nan"), "", "mbus","MBusReader", f"{host}:{port}", quality_flags=["unknown"])
audit("mbus-read", host, {"port":port,"sec_id":secondary_id}, mm); return mm
class ANSI_C12Reader:
"""ANSI C12.x stub — simulate common reads (demand/energy)."""
def read(self, port: str, register: str, device_id: Optional[str]=None) -> Dict[str,Any]:
# Simulation only in this scaffold
mm = make_mm(device_id or port, f"c12:{register}", "active_energy_import_total",
6789.01, "kWh", "ansi_c12", "ANSI_C12Reader", port,
tags={"discipline":"energy","sector":"grid"}, quality_flags=["simulated"])
audit("ansi-c12-read-sim", port, {"register":register}, mm); return mm
class ModbusReader:
def read_holding(self, host: str, addr: int, count: int, device_id: Optional[str]=None, port: int = 502) -> Dict[str,Any]:
if not is_live() or not host_allowed(host) or not HAS_PYMODBUS:
# Simulate two registers composing a 32-bit float (e.g., flow)
value = 123.4
mm = make_mm(device_id or host, f"modbus:{addr}/{count}", "flow_rate",
value, "m3/h", "modbus", "ModbusReader", f"{host}:{port}",
tags={"discipline":"process","sector":"plant"}, quality_flags=["simulated"])
audit("modbus-read-sim", host, {"addr":addr,"count":count}, mm); return mm
client = ModbusTcpClient(host=host, port=port)
client.connect()
rr = client.read_holding_registers(addr, count, slave=1)
client.close()
value = float(rr.registers[0]) if rr and hasattr(rr,"registers") else float("nan")
mm = make_mm(device_id or host, f"modbus:{addr}/{count}", "value_raw",
value, "", "modbus","ModbusReader", f"{host}:{port}")
audit("modbus-read", host, {"addr":addr,"count":count}, mm); return mm
class BACnetReader:
def read(self, device: str, obj: str, prop: str, device_id: Optional[str]=None) -> Dict[str,Any]:
if not is_live() or not HAS_BACNET:
mm = make_mm(device_id or device, f"bacnet:{obj}/{prop}", "air_temperature",
22.4, "Cel", "bacnet", "BACnetReader", device,
tags={"discipline":"hvac","sector":"bms"}, quality_flags=["simulated"])
audit("bacnet-read-sim", device, {"obj":obj,"prop":prop}, mm); return mm
mm = make_mm(device_id or device, f"bacnet:{obj}/{prop}", "value_raw",
float("nan"), "", "bacnet","BACnetReader", device, quality_flags=["unknown"])
audit("bacnet-read", device, {"obj":obj,"prop":prop}, mm); return mm
class OPCUAReader:
def read_node(self, endpoint: str, node: str, device_id: Optional[str]=None) -> Dict[str,Any]:
if not is_live() or not HAS_OPCUA:
mm = make_mm(device_id or endpoint, f"opcua:{node}", "pressure",
2.1, "bar", "opcua","OPCUAReader", endpoint,
tags={"discipline":"process","sector":"plant"}, quality_flags=["simulated"])
audit("opcua-read-sim", endpoint, {"node":node}, mm); return mm
host = re.sub(r"^opc\.tcp://","", endpoint).split(":")[0]
if not host_allowed(host):
raise PermissionError("Endpoint host not allowed")
client = OPCUAClient(endpoint); client.connect()
v = client.get_node(node).get_value(); client.disconnect()
val = float(v) if isinstance(v,(int,float)) else float("nan")
mm = make_mm(device_id or endpoint, f"opcua:{node}", "value_raw",
val, "", "opcua","OPCUAReader", endpoint)
audit("opcua-read", endpoint, {"node":node}, mm); return mm
class SensorThingsReader:
def read_observation(self, base: str, observation_id: str, device_id: Optional[str]=None) -> Dict[str,Any]:
if not is_live() or not host_allowed(base) or not HAS_REQUESTS:
mm = make_mm(device_id or base, f"sensorthings:obs/{observation_id}", "pm25",
12.7, "ug/m3", "sensorthings","SensorThingsReader", base,
tags={"discipline":"air","sector":"env"}, quality_flags=["simulated"])
audit("sensorthings-read-sim", base, {"obs_id":observation_id}, mm); return mm
url = f"{base.rstrip('/')}/Observations({observation_id})"
r = requests.get(url, timeout=10)
if r.status_code != 200:
mm = make_mm(device_id or base, f"sensorthings:obs/{observation_id}", "value_raw",
float("nan"), "", "sensorthings","SensorThingsReader", base, quality_flags=["bad"])
audit("sensorthings-read", base, {"obs_id":observation_id}, {"status": r.status_code}); return mm
data = r.json()
value = data.get("result")
prop = (data.get("Datastream",{}) or {}).get("name","observation")
unit = ((data.get("Datastream",{}) or {}).get("unitOfMeasurement",{}) or {}).get("symbol","")
mm = make_mm(device_id or base, f"sensorthings:obs/{observation_id}", prop,
float(value) if isinstance(value,(int,float)) else float("nan"),
unit or "", "sensorthings","SensorThingsReader", url)
audit("sensorthings-read", base, {"obs_id":observation_id}, mm); return mm
class MQTTMonitor:
def subscribe(self, broker: str, topic: str, seconds: int = 10) -> List[Dict[str,Any]]:
if not is_live() or not HAS_PAHO or not host_allowed(broker):
mm = make_mm(broker, f"mqtt:{topic}", "message_count",
1, "", "mqtt","MQTTMonitor", broker,
tags={"discipline":"messaging"}, quality_flags=["simulated"])
audit("mqtt-sub-sim", broker, {"topic":topic}, mm); return [mm]
messages: List[Dict[str,Any]] = []
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode(errors="replace"))
# If SparkplugB or JSON with value/unit, attempt normalization
if isinstance(payload, dict) and "value" in payload and "unit" in payload:
mm = make_mm(broker, f"mqtt:{msg.topic}", payload.get("observed_property","value"),
float(payload["value"]), str(payload["unit"]), "mqtt","MQTTMonitor", broker)
else:
mm = make_mm(broker, f"mqtt:{msg.topic}", "message",
1.0, "", "mqtt","MQTTMonitor", broker, quality_flags=["raw"])
except Exception:
mm = make_mm(broker, f"mqtt:{msg.topic}", "message", 1.0, "", "mqtt","MQTTMonitor", broker, quality_flags=["raw"])
messages.append(mm)
client = paho_mqtt.Client()
client.on_message = on_message; client.connect(broker, 1883, 60)
client.subscribe(topic); client.loop_start()
t0 = time.time()
while time.time() - t0 < seconds: time.sleep(0.1)
client.loop_stop(); client.disconnect()
for m in messages: audit("mqtt-sub", broker, {"topic":topic}, m)
return messages
# ===================== Publishers =====================
def publish_mqtt(obj: Dict[str,Any], broker: Optional[str], topic: Optional[str]) -> Dict[str,Any]:
if not broker or not topic:
return {"published": False, "reason": "no broker/topic"}
if not is_live() or not HAS_PAHO or not host_allowed(broker):
res = {"published": False, "simulated": True, "broker": broker, "topic": topic}
audit("mqtt-publish-sim", broker, {"topic":topic}, res); return res
client = paho_mqtt.Client(); client.connect(broker, 1883, 60)
payload = json.dumps(obj, ensure_ascii=False); rc = client.publish(topic, payload, qos=0, retain=False)
client.disconnect()
res = {"published": rc.rc == 0, "broker": broker, "topic": topic}
audit("mqtt-publish", broker, {"topic":topic}, res); return res
def publish_ws(obj: Dict[str,Any], ws_url: Optional[str]) -> Dict[str,Any]:
if not ws_url:
return {"published": False, "reason": "no ws_url"}
payload = json.dumps(obj, ensure_ascii=False)
if not is_live() or not HAS_WSCLIENT or not host_allowed(ws_url):
res = {"published": False, "simulated": True, "ws_url": ws_url}
audit("ws-publish-sim", ws_url, {}, res); return res
ws = ws_client.create_connection(ws_url, timeout=5)
ws.send(payload)
try:
ack = ws.recv()
except Exception:
ack = None
ws.close()
res = {"published": True, "ws_url": ws_url, "ack": (ack[:200]+"…") if isinstance(ack,str) and len(ack)>200 else ack}
audit("ws-publish", ws_url, {}, res); return res
# ===================== CLI =====================
def main():
dlms = DLMSReader(); mbus = MBusReader(); c12 = ANSI_C12Reader()
modbus = ModbusReader(); bacnet = BACnetReader(); opcua = OPCUAReader()
sens = SensorThingsReader(); mqttm = MQTTMonitor()
p = argparse.ArgumentParser(description="UCLS Unified Meter Gateway — standardized, read-only")
sub = p.add_subparsers(dest="cmd")
sub.add_parser("demo", help="Run simulated reads across adapters")
# DLMS
d = sub.add_parser("read-dlms"); d.add_argument("--host", required=True); d.add_argument("--port", type=int, default=4059)
d.add_argument("--obis", required=True); d.add_argument("--device-id")
# M-Bus TCP
mt = sub.add_parser("read-mbus-tcp"); mt.add_argument("--host", required=True); mt.add_argument("--port", type=int, default=502)
mt.add_argument("--secondary-id"); mt.add_argument("--device-id")
# ANSI C12 stub (serial id)
c = sub.add_parser("read-c12"); c.add_argument("--port", required=True); c.add_argument("--register", default="1.0.0"); c.add_argument("--device-id")
# Modbus/TCP
m = sub.add_parser("read-modbus"); m.add_argument("--host", required=True); m.add_argument("--addr", type=int, required=True)
m.add_argument("--count", type=int, default=2); m.add_argument("--port", type=int, default=502); m.add_argument("--device-id")
# BACnet/IP
b = sub.add_parser("read-bacnet"); b.add_argument("--device", required=True); b.add_argument("--object", required=True); b.add_argument("--property", required=True); b.add_argument("--device-id")
# OPC UA
o = sub.add_parser("read-opcua"); o.add_argument("--endpoint", required=True); o.add_argument("--node", required=True); o.add_argument("--device-id")
# SensorThings
s = sub.add_parser("read-sensorthings"); s.add_argument("--base", required=True); s.add_argument("--observation", required=True); s.add_argument("--device-id")
# MQTT subscribe
mq = sub.add_parser("mqtt-sub"); mq.add_argument("--broker", required=True); mq.add_argument("--topic", required=True); mq.add_argument("--seconds", type=int, default=10)
# Normalize / Publish
n = sub.add_parser("normalize"); n.add_argument("--json", required=True)
pub = sub.add_parser("publish"); pub.add_argument("--json", required=True); pub.add_argument("--mqtt-broker"); pub.add_argument("--mqtt-topic"); pub.add_argument("--ws-url")
args = p.parse_args()
if args.cmd in (None, "demo"):
out = []
out.append(dlms.read_obis("meter.sim", 4059, "1-0:1.8.0"))
out.append(mbus.read_tcp("mbus.sim", 502, "12345678"))
out.append(c12.read("/dev/ttyS0", "1.0.0"))
out.append(modbus.read_holding("plc.sim", 30001, 2))
out.append(bacnet.read("bacnet.sim", "analogInput,1", "presentValue"))
out.append(opcua.read_node("opc.tcp://opcua.sim:4840", "ns=2;i=10853"))
out.append(sens.read_observation("https://sens.sim/v1.1", "1"))
msgs = mqttm.subscribe("mqtt.sim","ucls/#", seconds=2); out.extend(msgs)
print(json.dumps(out, ensure_ascii=False, indent=2)); return
if args.cmd == "read-dlms":
print(json.dumps(dlms.read_obis(args.host, args.port, args.obis, args.device_id), ensure_ascii=False, indent=2)); return
if args.cmd == "read-mbus-tcp":
print(json.dumps(mbus.read_tcp(args.host, args.port, args.secondary_id), ensure_ascii=False, indent=2)); return
if args.cmd == "read-c12":
print(json.dumps(c12.read(args.port, args.register, args.device_id), ensure_ascii=False, indent=2)); return
if args.cmd == "read-modbus":
print(json.dumps(modbus.read_holding(args.host, args.addr, args.count, args.device_id, args.port), ensure_ascii=False, indent=2)); return
if args.cmd == "read-bacnet":
print(json.dumps(bacnet.read(args.device, args.object, args.property, args.device_id), ensure_ascii=False, indent=2)); return
if args.cmd == "read-opcua":
print(json.dumps(opcua.read_node(args.endpoint, args.node, args.device_id), ensure_ascii=False, indent=2)); return
if args.cmd == "read-sensorthings":
print(json.dumps(sens.read_observation(args.base, args.observation, args.device_id), ensure_ascii=False, indent=2)); return
if args.cmd == "mqtt-sub":
print(json.dumps(mqttm.subscribe(args.broker, args.topic, args.seconds), ensure_ascii=False, indent=2)); return
if args.cmd == "normalize":
raw = json.loads(args.json)
# If already UCLS-MM, just echo; else try to map common patterns
if {"device_id","observed_property","value","unit_ucum","time"} <= set(raw.keys()):
mm = raw
else:
# Heuristic mapping
device_id = raw.get("device") or raw.get("device_id") or "unknown"
prop = raw.get("observed_property") or raw.get("property") or "value"
val = float(raw.get("value", "nan"))
unit = raw.get("unit") or raw.get("unit_ucum") or ""
mm = make_mm(device_id, raw.get("stream","raw"), prop, val, unit, "normalize","mapper","local", quality_flags=["normalized"])
print(json.dumps(mm, ensure_ascii=False, indent=2)); return
if args.cmd == "publish":
obj = json.loads(args.json)
if not isinstance(obj, dict):
print(json.dumps({"error":"payload must be an object"}, ensure_ascii=False)); return
if args.mqtt_broker and args.mqtt_topic:
publish_mqtt(obj, args.mqtt_broker, args.mqtt_topic)
if args.ws_url:
publish_ws(obj, args.ws_url)
print(json.dumps({"ok": True}, ensure_ascii=False)); return
if __name__ == "__main__":
main()