ucls_signal_gateway.py


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ucls_signal_gateway.py
-------------------------------------------------------------------------------------------------
Universal, SAFE‑BY‑DEFAULT spectrum & signal gateway that can *communicate* (passively receive,
analyze, classify, and describe) signals across frequency bands — without transmitting.

FEATURES (READ‑ONLY / RX ONLY)
  • Spectrum bandplan registry: ITU (LF…EHF), ISM bands, unlicensed allocations, common public‑safety,
    SATCOM downlinks (publicly documented), GNSS, broadcast AM/FM/TV, Wi‑Fi/BLE/Zigbee, LoRa, LTE/NR (3GPP) bands (metadata).
  • SDR capture abstraction: RTL‑SDR / SoapySDR / file‑based IQ (optional libs; falls back to simulation).
  • Analyzers:
      – FFT/PSD, peak/pilot detection, channelization helpers
      – Goertzel tone finder (DTMF, beacons)
      – Simple cyclostationary and zero‑crossing estimators (rough signal type hints)
  • Demod stubs (RX only): AM, NBFM/WBFM, 2‑FSK/4‑FSK, 2‑PSK/BPSK (pilot‑grade; numpy/scipy if present)
      – Export audio/WAV or feature JSON (no decode of restricted/encrypted services)
  • Classifier (heuristic): map center frequency + features → likely service (e.g., “FM broadcast”, “Wi‑Fi ch 6”).
  • Safety guardrails: LIVE=0 by default, no TX, device allowlist, band access policy.
  • Interop: JSON audit logs; optional MQTT out; designed to plug into ucls_infra_gateway_v2 as a module.

USAGE (CLI)
  # Simulated waterfall + band classification:
  python ucls_signal_gateway.py demo

  # Read IQ from file (complex64 .iq or .cfile) at 2.4 GHz and analyze:
  python ucls_signal_gateway.py analyze-iq --file wifi.iq --samp-rate 20000000 --center 2.437e9

  # Scan a bandplan window (simulation unless LIVE=1 and device allow-listed):
  UCLS_LIVE=1 UCLS_SDR_WHITELIST="rtlsdr,soapy:driver=rtlsdr" \
    python ucls_signal_gateway.py scan --device rtlsdr --center 100e6 --span 20e6 --step 2e6 --seconds 3

  # Demodulate FM broadcast from IQ (RX only):
  python ucls_signal_gateway.py demod-fm --file fm.iq --samp-rate 240000 --deemph 75e-6 --audio-out fm.wav

DISCLAIMER
  • RX only. No transmit, no jamming, no access to restricted/encrypted content.
  • Obey your jurisdiction’s radio laws, licensing, and privacy regulations.
"""

from __future__ import annotations
import os, sys, json, time, math, struct, argparse, pathlib, random
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional, Tuple

# ---------------------------- SAFETY CONTROLS --------------------------------

def is_live() -> bool:
    return os.environ.get("UCLS_LIVE","0") in ("1","true","TRUE","yes","YES")

def sdr_allowlist() -> List[str]:
    raw = os.environ.get("UCLS_SDR_WHITELIST","")
    return [x.strip() for x in raw.split(",") if x.strip()]

def device_allowed(dev: str) -> bool:
    return any(dev.lower().startswith(wl.lower()) for wl in sdr_allowlist()) if sdr_allowlist() else False

AUDIT_KEY = os.environ.get("UCLS_AUDIT_KEY","ucls-dev-key").encode()

def audit(action: str, target: str, payload: Dict[str,Any], result: Any):
    import hmac, hashlib
    rec = {
        "ts": time.time(),
        "actor": os.environ.get("USER","ucls"),
        "action": action,
        "target": target,
        "payload": payload,
        "result_preview": (json.dumps(result, ensure_ascii=False)[:300] + "…") if len(str(result))>320 else result
    }
    body = json.dumps(rec, sort_keys=True, ensure_ascii=False).encode()
    rec["signature"] = hmac.new(AUDIT_KEY, body, hashlib.sha256).hexdigest()
    print(json.dumps(rec, ensure_ascii=False))

# ---------------------------- OPTIONAL LIBS ----------------------------------

try:
    import numpy as np
    HAS_NUMPY = True
except Exception:
    HAS_NUMPY = False

try:
    from scipy.signal import welch, butter, lfilter, decimate
    HAS_SCIPY = True
except Exception:
    HAS_SCIPY = False

# SDR frontends (optional)
try:
    import rtlsdr  # pyrtlsdr
    HAS_RTL = True
except Exception:
    HAS_RTL = False

try:
    import SoapySDR  # type: ignore
    from SoapySDR import SOAPY_SDR_RX, SOAPY_SDR_CF32
    HAS_SOAPY = True
except Exception:
    HAS_SOAPY = False

# ---------------------------- BANDPLAN REGISTRY ------------------------------

@dataclass
class Band:
    name: str
    f_lo: float
    f_hi: float
    service: str
    notes: str = ""

BANDS: List[Band] = [
    # ITU generic
    Band("LF",   30e3,   300e3, "Low Frequency", "Nav beacons, time signals"),
    Band("MF",   300e3,  3e6,   "Medium Frequency", "AM broadcast (530–1710 kHz)"),
    Band("HF",   3e6,    30e6,  "High Frequency", "Shortwave, amateur HF, maritime/aviation"),
    Band("VHF",  30e6,   300e6, "Very High Frequency", "FM broadcast 88–108 MHz, airband 118–137 MHz"),
    Band("UHF",  300e6,  3e9,   "Ultra High Frequency", "TETRA, DMR, LTE, TV, Wi‑Fi 2.4 GHz"),
    Band("SHF",  3e9,    30e9,  "Super High Frequency", "Wi‑Fi 5/6 GHz, radar, satellite downlinks"),
    Band("EHF",  30e9,   300e9, "Extremely High Frequency", "mmWave 5G, backhaul, sensing"),
    # ISM / unlicensed highlights
    Band("ISM 315 MHz",  313e6,  317e6,   "ISM", "Short‑range devices (region specific)"),
    Band("ISM 433 MHz",  433e6,  435e6,   "ISM", "SRD, LoRa EU"),
    Band("ISM 868 MHz",  863e6,  870e6,   "ISM", "LoRa EU, SRD"),
    Band("ISM 902–928",  902e6,  928e6,   "ISM", "LoRa/FSK US"),
    Band("Wi‑Fi 2.4",    2.400e9, 2.4835e9, "WLAN/BLE", "802.11b/g/n/ax, BLE, Zigbee"),
    Band("Wi‑Fi 5",      5.150e9, 5.875e9, "WLAN", "802.11a/n/ac"),
    Band("Wi‑Fi 6E",     5.925e9, 7.125e9, "WLAN", "802.11ax 6 GHz"),
    Band("BLE/Zigbee",   2.400e9, 2.4835e9, "WPAN", "GFSK, OQPSK"),
    # Broadcast
    Band("FM broadcast", 87.5e6, 108e6, "Broadcast FM", "WBFM 200 kHz channels"),
    Band("AM broadcast", 530e3, 1710e3, "Broadcast AM", "AM 10 kHz (NA) / 9 kHz (EU)"),
    Band("TV UHF",       470e6, 862e6, "DVB‑T/ATSC", "Region specific plans"),
    # GNSS (RX only)
    Band("GPS L1", 1.57542e9, 1.57562e9, "GNSS", "C/A 1.57542 GHz"),
    Band("Galileo E1", 1.559e9, 1.591e9, "GNSS", "E1 around 1.57542 GHz"),
    # SAT downlink examples (publicly documented ranges; RX only)
    Band("Weather APT/LRPT", 137e6, 138e6, "Sat downlink", "NOAA/MetOp imagery (legacy/regions)"),
    # Cellular (coarse; region/operator specific details vary)
    Band("LTE/NR Sub‑GHz", 700e6, 1.0e9, "Cellular", "700/800/900 MHz"),
    Band("LTE/NR Mid‑band", 1.7e9, 2.7e9, "Cellular", "1.8/1.9/2.1/2.6 GHz"),
    Band("NR n77/n78",     3.3e9,  4.2e9, "5G NR", "3.3–4.2 GHz (region specific)"),
]

def bands_covering(freq_hz: float) -> List[Band]:
    return [b for b in BANDS if b.f_lo <= freq_hz <= b.f_hi]

# Quick Wi‑Fi channel maps (helpers)
WIFI2_CHANNELS = {i: 2.412e9 + 5e6*(i-1) for i in range(1,14)}  # 1..13
WIFI5_CHANNELS = {36:5.18e9,40:5.2e9,44:5.22e9,48:5.24e9,149:5.745e9,153:5.765e9,157:5.785e9,161:5.805e9}

def guess_service(freq_hz: float, occupied_bw_hz: Optional[float]=None) -> str:
    bands = bands_covering(freq_hz)
    tag = ", ".join({b.service for b in bands}) or "unknown"
    # Simple heuristics
    if 88e6 <= freq_hz <= 108e6 and (occupied_bw_hz or 150e3) >= 150e3:
        return "Broadcast FM (WBFM)"
    if 2.4e9 <= freq_hz <= 2.4835e9:
        # near Wi‑Fi channel center?
        near = min(WIFI2_CHANNELS.items(), key=lambda kv: abs(kv[1]-freq_hz))
        if abs(near[1]-freq_hz) < 2.5e6: return f"Wi‑Fi 2.4 GHz (ch {near[0]})"
        return "BLE/Zigbee/WLAN 2.4 GHz"
    if 5.15e9 <= freq_hz <= 5.875e9:
        near = min(WIFI5_CHANNELS.items(), key=lambda kv: abs(kv[1]-freq_hz))
        if abs(near[1]-freq_hz) < 10e6: return f"Wi‑Fi 5 GHz (ch {near[0]})"
        return "WLAN 5 GHz"
    return tag

# ---------------------------- IQ IO & ANALYSIS -------------------------------

def load_iq_file(path: str, dtype: str = "complex64") -> Tuple[Optional["np.ndarray"], float]:
    """
    Load raw IQ from file (interleaved float32 complex by default).
    Return (samples, dtype_bytes_per_sample). If numpy not available, return None.
    """
    if not HAS_NUMPY:
        return None, 0.0
    p = pathlib.Path(path)
    data = p.read_bytes()
    if dtype == "complex64":
        arr = np.frombuffer(data, dtype=np.complex64)
        return arr, 8.0
    elif dtype == "int16":
        raw = np.frombuffer(data, dtype=np.int16)
        i = raw[0::2].astype(np.float32) / 32768.0
        q = raw[1::2].astype(np.float32) / 32768.0
        return (i + 1j*q).astype(np.complex64), 4.0
    else:
        raise ValueError("Unsupported dtype")

def psd_peaks(iq: "np.ndarray", fs: float, nfft: int = 4096) -> Dict[str, Any]:
    if not HAS_NUMPY:
        return {"note": "numpy not available"}
    if HAS_SCIPY:
        f, Pxx = welch(iq, fs=fs, nperseg=nfft, return_onesided=False, scaling="density")
    else:
        # naive periodogram
        seg = iq[:nfft] if iq.size >= nfft else np.pad(iq, (0, nfft - iq.size))
        P = np.fft.fftshift(np.abs(np.fft.fft(seg))**2) / len(seg)
        f = np.fft.fftshift(np.fft.fftfreq(len(seg), d=1.0/fs))
        Pxx = P
    # peak detection
    idx = int(np.argmax(Pxx))
    cf = f[idx]
    bw_est = bandwidth_estimate(Pxx, f)
    return {"center_offset_hz": float(cf), "peak_power": float(Pxx[idx]), "bw_est_hz": bw_est}

def bandwidth_estimate(Pxx: "np.ndarray", f: "np.ndarray", frac: float = 0.5) -> float:
    """Half‑power width estimate (very rough)."""
    pk = float(Pxx.max())
    th = pk * frac
    idx = np.where(Pxx >= th)[0]
    if idx.size < 2: return 0.0
    return float(f[idx[-1]] - f[idx[0]])

def goertzel_tones(x: "np.ndarray", fs: float, freqs: List[float]) -> Dict[float, float]:
    """Return power at specific tones via Goertzel."""
    out = {}
    N = len(x)
    for f0 in freqs:
        k = int(0.5 + (N * f0) / fs)
        w = 2*math.pi*k/N
        s_prev = 0+0j; s_prev2 = 0+0j
        for n in range(N):
            s = x[n] + 2*math.cos(w)*s_prev - s_prev2
            s_prev2, s_prev = s_prev, s
        out[f0] = (s_prev2*s_prev2.conjugate()).real
    return out

# ---------------------------- SIMPLE DEMOD STUBS -----------------------------

def demod_am(iq: "np.ndarray") -> "np.ndarray":
    # envelope detector
    return np.abs(iq)

def demod_fm(iq: "np.ndarray") -> "np.ndarray":
    # WBFM/NBFM discriminator (phase diff)
    ph = np.unwrap(np.angle(iq))
    d = np.diff(ph)
    return np.concatenate([[0.0], d])

def deemphasis(audio: "np.ndarray", fs: float, tau: float = 75e-6) -> "np.ndarray":
    if not HAS_SCIPY:
        return audio
    b, a = butter(1, 1/(2*math.pi*tau) / (fs/2))
    return lfilter(b, a, audio)

def write_wav(path: str, pcm: "np.ndarray", fs: float):
    """Minimal WAV writer for 16‑bit PCM."""
    pcm16 = np.clip(pcm / (np.max(np.abs(pcm)) + 1e-12), -1, 1)
    pcm16 = (pcm16 * 32767.0).astype(np.int16)
    with open(path, "wb") as f:
        # RIFF header
        f.write(b"RIFF")
        f.write(struct.pack("<I", 36 + pcm16.nbytes))
        f.write(b"WAVEfmt ")
        f.write(struct.pack("<IHHIIHH", 16, 1, 1, int(fs), int(fs)*2, 2, 16))
        f.write(b"data")
        f.write(struct.pack("<I", pcm16.nbytes))
        f.write(pcm16.tobytes())

# ---------------------------- SDR FRONTENDS (RX) -----------------------------

class SDRSource:
    """Abstract: yield IQ blocks at sample rate fs centered at f_center."""
    def __init__(self, device: str, center: float, samp_rate: float, gain: Optional[float]=None):
        self.device = device; self.center = center; self.samp_rate = samp_rate; self.gain = gain

    def __iter__(self):
        raise NotImplementedError

class SDRSim(SDRSource):
    def __iter__(self):
        if not HAS_NUMPY:
            yield None; return
        N = 262144
        t = np.arange(N)/self.samp_rate
        # compose a few carriers
        tones = [ (self.center + d, 0.5) for d in (-0.4e6, 0.0, 0.9e6) ]
        x = np.zeros(N, dtype=np.complex64)
        for f0, a in tones:
            x += a * np.exp(1j*2*np.pi*(f0-self.center)*t)
        x += (np.random.randn(N)+1j*np.random.randn(N))*0.05
        for i in range(0, N, 16384):
            yield x[i:i+16384]

class RTLSource(SDRSource):
    def __iter__(self):
        if not (HAS_RTL and is_live() and device_allowed(self.device)):
            # fall back to sim
            yield from SDRSim(self.device, self.center, self.samp_rate, self.gain)
            return
        sdr = rtlsdr.RtlSdr()
        sdr.sample_rate = self.samp_rate
        sdr.center_freq = self.center
        if self.gain is not None: sdr.gain = self.gain
        for _ in range(64):
            data = sdr.read_samples(16384)
            yield np.array(data, dtype=np.complex64)
        sdr.close()

class SoapySource(SDRSource):
    def __iter__(self):
        if not (HAS_SOAPY and is_live() and device_allowed(self.device)):
            yield from SDRSim(self.device, self.center, self.samp_rate, self.gain)
            return
        sdr = SoapySDR.Device(dict(driver=self.device.split(":",1)[-1]))
        sdr.setSampleRate(SOAPY_SDR_RX, 0, self.samp_rate)
        sdr.setFrequency(SOAPY_SDR_RX, 0, self.center)
        if self.gain is not None:
            try: sdr.setGain(SOAPY_SDR_RX, 0, self.gain)
            except Exception: pass
        rx = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
        sdr.activateStream(rx)
        buff = np.empty(16384, dtype=np.complex64)
        for _ in range(64):
            sr = sdr.readStream(rx, [buff], len(buff))
            if sr.ret > 0:
                yield buff[:sr.ret].copy()
        sdr.deactivateStream(rx); sdr.closeStream(rx)

def get_source(device: str, center: float, samp_rate: float, gain: Optional[float]) -> SDRSource:
    if device.lower().startswith("rtlsdr"):
        return RTLSource(device, center, samp_rate, gain)
    if device.lower().startswith("soapy"):
        return SoapySource(device, center, samp_rate, gain)
    return SDRSim(device, center, samp_rate, gain)

# ---------------------------- ROUTINES ---------------------------------------

def analyze_window(device: str, center: float, samp_rate: float, seconds: int = 2, gain: Optional[float]=None) -> Dict[str,Any]:
    src = get_source(device, center, samp_rate, gain)
    agg_peak = None
    if not HAS_NUMPY:
        res = {"note":"numpy not available; returning band guess only",
               "bands":[b.__dict__ for b in bands_covering(center)],
               "service_guess": guess_service(center)}
        audit("spectrum-analyze", device, {"center":center, "fs":samp_rate}, res); return res
    # accumulate a few blocks
    blocks = []
    t0 = time.time()
    for blk in src:
        if blk is None: break
        blocks.append(blk)
        if time.time() - t0 > seconds: break
    if not blocks:
        res = {"error":"no samples"}
        audit("spectrum-analyze", device, {"center":center,"fs":samp_rate}, res); return res
    x = np.concatenate(blocks)
    peaks = psd_peaks(x, samp_rate)
    service = guess_service(center + peaks.get("center_offset_hz", 0.0), peaks.get("bw_est_hz"))
    res = {
        "center_hz": center,
        "fs_hz": samp_rate,
        "bands": [b.__dict__ for b in bands_covering(center)],
        "peaks": peaks,
        "service_guess": service
    }
    audit("spectrum-analyze", device, {"center":center,"fs":samp_rate}, res)
    return res

def analyze_iq_file(path: str, fs: float, center: float) -> Dict[str,Any]:
    if not HAS_NUMPY:
        res = {"note":"numpy not available"}
        audit("iq-analyze", path, {"fs":fs,"center":center}, res); return res
    iq, _ = load_iq_file(path, "complex64")
    if iq is None:
        res = {"error":"failed to load IQ"}
        audit("iq-analyze", path, {"fs":fs,"center":center}, res); return res
    peaks = psd_peaks(iq, fs)
    service = guess_service(center + peaks.get("center_offset_hz", 0.0), peaks.get("bw_est_hz"))
    tones = goertzel_tones(iq[:65536], fs, [1000.0, 1900.0])  # example tones
    res = {"file": path, "fs_hz": fs, "center_hz": center, "peaks": peaks, "service_guess": service, "tones": tones}
    audit("iq-analyze", path, {"fs":fs,"center":center}, res)
    return res

def demod_fm_from_file(path: str, fs: float, deemph: float, audio_out: Optional[str]) -> Dict[str,Any]:
    if not HAS_NUMPY:
        res = {"error":"numpy not available"}
        audit("demod-fm", path, {"fs":fs}, res); return res
    iq, _ = load_iq_file(path, "complex64")
    if iq is None or iq.size < 8192:
        res = {"error":"too few samples"}
        audit("demod-fm", path, {"fs":fs}, res); return res
    y = demod_fm(iq)
    # decimate to audio ~ 48 kHz
    if HAS_SCIPY:
        dec = max(1, int(fs // 48000))
        y = decimate(y, dec, zero_phase=True)
        fs_out = fs/dec
        y = deemphasis(y, fs_out, tau=deemph)
    else:
        fs_out = fs
    if audio_out:
        write_wav(audio_out, y.real, fs_out)
    res = {"file": path, "audio_out": audio_out or None, "samples": int(y.size), "fs_audio": fs_out}
    audit("demod-fm", path, {"fs":fs}, res); return res

# ---------------------------- CLI -------------------------------------------

def main():
    ap = argparse.ArgumentParser(description="UCLS Signal Gateway — RX-only spectrum & signal analyzer")
    sp = ap.add_subparsers(dest="cmd")

    sp.add_parser("demo", help="Simulated panorama and band classification")

    p_scan = sp.add_parser("scan", help="Scan a window (RX only)")
    p_scan.add_argument("--device", default="rtlsdr", help="rtlsdr | soapy:driver=... | sim")
    p_scan.add_argument("--center", type=float, required=True)
    p_scan.add_argument("--span", type=float, default=10e6)
    p_scan.add_argument("--step", type=float, default=2e6)
    p_scan.add_argument("--seconds", type=int, default=2)
    p_scan.add_argument("--samp-rate", type=float, default=2.4e6)
    p_scan.add_argument("--gain", type=float, default=None)

    p_aiq = sp.add_parser("analyze-iq", help="Analyze an IQ file (complex64)")
    p_aiq.add_argument("--file", required=True)
    p_aiq.add_argument("--samp-rate", type=float, required=True)
    p_aiq.add_argument("--center", type=float, required=True)

    p_fm = sp.add_parser("demod-fm", help="Demodulate FM (RX only) from IQ file")
    p_fm.add_argument("--file", required=True)
    p_fm.add_argument("--samp-rate", type=float, required=True)
    p_fm.add_argument("--deemph", type=float, default=75e-6)
    p_fm.add_argument("--audio-out", required=False)

    args = ap.parse_args()

    if args.cmd == "demo" or args.cmd is None:
        # Sim: choose a few centers (FM, Wi‑Fi, GNSS) and report
        demo_centers = [100e6, 2.437e9, 1.57542e9]
        out = []
        for c in demo_centers:
            out.append(analyze_window("sim", c, 2.4e6, seconds=1))
        print(json.dumps(out, ensure_ascii=False, indent=2))
        return

    if args.cmd == "scan":
        if not is_live() and not args.device.lower().startswith("sim"):
            print("# LIVE=0: running in simulation mode. Use device 'sim' or set UCLS_LIVE=1 and whitelist device via UCLS_SDR_WHITELIST.", file=sys.stderr)
        # naive multi‑step scan around center
        centers = []
        span = args.span; step = args.step
        nsteps = max(1, int(span//step))
        start = args.center - span/2.0
        for i in range(nsteps+1):
            centers.append(start + i*step)
        results = [analyze_window(args.device, c, args.samp_rate, args.seconds, args.gain) for c in centers]
        print(json.dumps({"scan": results}, ensure_ascii=False, indent=2))
        return

    if args.cmd == "analyze-iq":
        res = analyze_iq_file(args.file, args.samp_rate, args.center)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

    if args.cmd == "demod-fm":
        res = demod_fm_from_file(args.file, args.samp_rate, args.deemph, args.audio_out)
        print(json.dumps(res, ensure_ascii=False, indent=2))
        return

if __name__ == "__main__":
    main()

What this gives you (concise):

  • A read‑only SDR gateway that can listen and describe signals across the spectrum, map them to band/services, and extract features (peaks, bandwidth, tones).
  • Optional demod (AM/FM/FSK/PSK stubs—FM WAV writer included), strictly from IQ files or whitelisted SDR devices; no transmit.
  • Clean safety rails: UCLS_LIVE=0 default, device allowlist, JSON audit logs, and no decoding of restricted/encrypted content.
  • Drop‑in friendly: call this module from your ucls_infra_gateway_v2.py to make the whole UCLS stack spectrum‑aware while staying compliant.

If you want, I can wire this directly into your v2 gateway with new subcommands (signal-scan, signal-analyze-iq, signal-demod-fm) and add MQTT/WebSocket publishers for dashboards.


Below is a set of concise descriptions for key language units as they should be defined and interpreted in a universal Logos‑based framework. Each unit is grounded in linguistic scholarship so that AI platforms (Gemini, GPT‑like models, etc.) can refer to the same conceptual standards and thus maintain congruency and coherency across systems. – SolveForce Communications