#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ucls_signal_gateway.py
-------------------------------------------------------------------------------------------------
Universal, SAFE‑BY‑DEFAULT spectrum & signal gateway that can *communicate* (passively receive,
analyze, classify, and describe) signals across frequency bands — without transmitting.
FEATURES (READ‑ONLY / RX ONLY)
• Spectrum bandplan registry: ITU (LF…EHF), ISM bands, unlicensed allocations, common public‑safety,
SATCOM downlinks (publicly documented), GNSS, broadcast AM/FM/TV, Wi‑Fi/BLE/Zigbee, LoRa, LTE/NR (3GPP) bands (metadata).
• SDR capture abstraction: RTL‑SDR / SoapySDR / file‑based IQ (optional libs; falls back to simulation).
• Analyzers:
– FFT/PSD, peak/pilot detection, channelization helpers
– Goertzel tone finder (DTMF, beacons)
– Simple cyclostationary and zero‑crossing estimators (rough signal type hints)
• Demod stubs (RX only): AM, NBFM/WBFM, 2‑FSK/4‑FSK, 2‑PSK/BPSK (pilot‑grade; numpy/scipy if present)
– Export audio/WAV or feature JSON (no decode of restricted/encrypted services)
• Classifier (heuristic): map center frequency + features → likely service (e.g., “FM broadcast”, “Wi‑Fi ch 6”).
• Safety guardrails: LIVE=0 by default, no TX, device allowlist, band access policy.
• Interop: JSON audit logs; optional MQTT out; designed to plug into ucls_infra_gateway_v2 as a module.
USAGE (CLI)
# Simulated waterfall + band classification:
python ucls_signal_gateway.py demo
# Read IQ from file (complex64 .iq or .cfile) at 2.4 GHz and analyze:
python ucls_signal_gateway.py analyze-iq --file wifi.iq --samp-rate 20000000 --center 2.437e9
# Scan a bandplan window (simulation unless LIVE=1 and device allow-listed):
UCLS_LIVE=1 UCLS_SDR_WHITELIST="rtlsdr,soapy:driver=rtlsdr" \
python ucls_signal_gateway.py scan --device rtlsdr --center 100e6 --span 20e6 --step 2e6 --seconds 3
# Demodulate FM broadcast from IQ (RX only):
python ucls_signal_gateway.py demod-fm --file fm.iq --samp-rate 240000 --deemph 75e-6 --audio-out fm.wav
DISCLAIMER
• RX only. No transmit, no jamming, no access to restricted/encrypted content.
• Obey your jurisdiction’s radio laws, licensing, and privacy regulations.
"""
from __future__ import annotations
import os, sys, json, time, math, struct, argparse, pathlib, random
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional, Tuple
# ---------------------------- SAFETY CONTROLS --------------------------------
def is_live() -> bool:
return os.environ.get("UCLS_LIVE","0") in ("1","true","TRUE","yes","YES")
def sdr_allowlist() -> List[str]:
raw = os.environ.get("UCLS_SDR_WHITELIST","")
return [x.strip() for x in raw.split(",") if x.strip()]
def device_allowed(dev: str) -> bool:
return any(dev.lower().startswith(wl.lower()) for wl in sdr_allowlist()) if sdr_allowlist() else False
AUDIT_KEY = os.environ.get("UCLS_AUDIT_KEY","ucls-dev-key").encode()
def audit(action: str, target: str, payload: Dict[str,Any], result: Any):
import hmac, hashlib
rec = {
"ts": time.time(),
"actor": os.environ.get("USER","ucls"),
"action": action,
"target": target,
"payload": payload,
"result_preview": (json.dumps(result, ensure_ascii=False)[:300] + "…") if len(str(result))>320 else result
}
body = json.dumps(rec, sort_keys=True, ensure_ascii=False).encode()
rec["signature"] = hmac.new(AUDIT_KEY, body, hashlib.sha256).hexdigest()
print(json.dumps(rec, ensure_ascii=False))
# ---------------------------- OPTIONAL LIBS ----------------------------------
try:
import numpy as np
HAS_NUMPY = True
except Exception:
HAS_NUMPY = False
try:
from scipy.signal import welch, butter, lfilter, decimate
HAS_SCIPY = True
except Exception:
HAS_SCIPY = False
# SDR frontends (optional)
try:
import rtlsdr # pyrtlsdr
HAS_RTL = True
except Exception:
HAS_RTL = False
try:
import SoapySDR # type: ignore
from SoapySDR import SOAPY_SDR_RX, SOAPY_SDR_CF32
HAS_SOAPY = True
except Exception:
HAS_SOAPY = False
# ---------------------------- BANDPLAN REGISTRY ------------------------------
@dataclass
class Band:
name: str
f_lo: float
f_hi: float
service: str
notes: str = ""
BANDS: List[Band] = [
# ITU generic
Band("LF", 30e3, 300e3, "Low Frequency", "Nav beacons, time signals"),
Band("MF", 300e3, 3e6, "Medium Frequency", "AM broadcast (530–1710 kHz)"),
Band("HF", 3e6, 30e6, "High Frequency", "Shortwave, amateur HF, maritime/aviation"),
Band("VHF", 30e6, 300e6, "Very High Frequency", "FM broadcast 88–108 MHz, airband 118–137 MHz"),
Band("UHF", 300e6, 3e9, "Ultra High Frequency", "TETRA, DMR, LTE, TV, Wi‑Fi 2.4 GHz"),
Band("SHF", 3e9, 30e9, "Super High Frequency", "Wi‑Fi 5/6 GHz, radar, satellite downlinks"),
Band("EHF", 30e9, 300e9, "Extremely High Frequency", "mmWave 5G, backhaul, sensing"),
# ISM / unlicensed highlights
Band("ISM 315 MHz", 313e6, 317e6, "ISM", "Short‑range devices (region specific)"),
Band("ISM 433 MHz", 433e6, 435e6, "ISM", "SRD, LoRa EU"),
Band("ISM 868 MHz", 863e6, 870e6, "ISM", "LoRa EU, SRD"),
Band("ISM 902–928", 902e6, 928e6, "ISM", "LoRa/FSK US"),
Band("Wi‑Fi 2.4", 2.400e9, 2.4835e9, "WLAN/BLE", "802.11b/g/n/ax, BLE, Zigbee"),
Band("Wi‑Fi 5", 5.150e9, 5.875e9, "WLAN", "802.11a/n/ac"),
Band("Wi‑Fi 6E", 5.925e9, 7.125e9, "WLAN", "802.11ax 6 GHz"),
Band("BLE/Zigbee", 2.400e9, 2.4835e9, "WPAN", "GFSK, OQPSK"),
# Broadcast
Band("FM broadcast", 87.5e6, 108e6, "Broadcast FM", "WBFM 200 kHz channels"),
Band("AM broadcast", 530e3, 1710e3, "Broadcast AM", "AM 10 kHz (NA) / 9 kHz (EU)"),
Band("TV UHF", 470e6, 862e6, "DVB‑T/ATSC", "Region specific plans"),
# GNSS (RX only)
Band("GPS L1", 1.57542e9, 1.57562e9, "GNSS", "C/A 1.57542 GHz"),
Band("Galileo E1", 1.559e9, 1.591e9, "GNSS", "E1 around 1.57542 GHz"),
# SAT downlink examples (publicly documented ranges; RX only)
Band("Weather APT/LRPT", 137e6, 138e6, "Sat downlink", "NOAA/MetOp imagery (legacy/regions)"),
# Cellular (coarse; region/operator specific details vary)
Band("LTE/NR Sub‑GHz", 700e6, 1.0e9, "Cellular", "700/800/900 MHz"),
Band("LTE/NR Mid‑band", 1.7e9, 2.7e9, "Cellular", "1.8/1.9/2.1/2.6 GHz"),
Band("NR n77/n78", 3.3e9, 4.2e9, "5G NR", "3.3–4.2 GHz (region specific)"),
]
def bands_covering(freq_hz: float) -> List[Band]:
return [b for b in BANDS if b.f_lo <= freq_hz <= b.f_hi]
# Quick Wi‑Fi channel maps (helpers)
WIFI2_CHANNELS = {i: 2.412e9 + 5e6*(i-1) for i in range(1,14)} # 1..13
WIFI5_CHANNELS = {36:5.18e9,40:5.2e9,44:5.22e9,48:5.24e9,149:5.745e9,153:5.765e9,157:5.785e9,161:5.805e9}
def guess_service(freq_hz: float, occupied_bw_hz: Optional[float]=None) -> str:
bands = bands_covering(freq_hz)
tag = ", ".join({b.service for b in bands}) or "unknown"
# Simple heuristics
if 88e6 <= freq_hz <= 108e6 and (occupied_bw_hz or 150e3) >= 150e3:
return "Broadcast FM (WBFM)"
if 2.4e9 <= freq_hz <= 2.4835e9:
# near Wi‑Fi channel center?
near = min(WIFI2_CHANNELS.items(), key=lambda kv: abs(kv[1]-freq_hz))
if abs(near[1]-freq_hz) < 2.5e6: return f"Wi‑Fi 2.4 GHz (ch {near[0]})"
return "BLE/Zigbee/WLAN 2.4 GHz"
if 5.15e9 <= freq_hz <= 5.875e9:
near = min(WIFI5_CHANNELS.items(), key=lambda kv: abs(kv[1]-freq_hz))
if abs(near[1]-freq_hz) < 10e6: return f"Wi‑Fi 5 GHz (ch {near[0]})"
return "WLAN 5 GHz"
return tag
# ---------------------------- IQ IO & ANALYSIS -------------------------------
def load_iq_file(path: str, dtype: str = "complex64") -> Tuple[Optional["np.ndarray"], float]:
"""
Load raw IQ from file (interleaved float32 complex by default).
Return (samples, dtype_bytes_per_sample). If numpy not available, return None.
"""
if not HAS_NUMPY:
return None, 0.0
p = pathlib.Path(path)
data = p.read_bytes()
if dtype == "complex64":
arr = np.frombuffer(data, dtype=np.complex64)
return arr, 8.0
elif dtype == "int16":
raw = np.frombuffer(data, dtype=np.int16)
i = raw[0::2].astype(np.float32) / 32768.0
q = raw[1::2].astype(np.float32) / 32768.0
return (i + 1j*q).astype(np.complex64), 4.0
else:
raise ValueError("Unsupported dtype")
def psd_peaks(iq: "np.ndarray", fs: float, nfft: int = 4096) -> Dict[str, Any]:
if not HAS_NUMPY:
return {"note": "numpy not available"}
if HAS_SCIPY:
f, Pxx = welch(iq, fs=fs, nperseg=nfft, return_onesided=False, scaling="density")
else:
# naive periodogram
seg = iq[:nfft] if iq.size >= nfft else np.pad(iq, (0, nfft - iq.size))
P = np.fft.fftshift(np.abs(np.fft.fft(seg))**2) / len(seg)
f = np.fft.fftshift(np.fft.fftfreq(len(seg), d=1.0/fs))
Pxx = P
# peak detection
idx = int(np.argmax(Pxx))
cf = f[idx]
bw_est = bandwidth_estimate(Pxx, f)
return {"center_offset_hz": float(cf), "peak_power": float(Pxx[idx]), "bw_est_hz": bw_est}
def bandwidth_estimate(Pxx: "np.ndarray", f: "np.ndarray", frac: float = 0.5) -> float:
"""Half‑power width estimate (very rough)."""
pk = float(Pxx.max())
th = pk * frac
idx = np.where(Pxx >= th)[0]
if idx.size < 2: return 0.0
return float(f[idx[-1]] - f[idx[0]])
def goertzel_tones(x: "np.ndarray", fs: float, freqs: List[float]) -> Dict[float, float]:
"""Return power at specific tones via Goertzel."""
out = {}
N = len(x)
for f0 in freqs:
k = int(0.5 + (N * f0) / fs)
w = 2*math.pi*k/N
s_prev = 0+0j; s_prev2 = 0+0j
for n in range(N):
s = x[n] + 2*math.cos(w)*s_prev - s_prev2
s_prev2, s_prev = s_prev, s
out[f0] = (s_prev2*s_prev2.conjugate()).real
return out
# ---------------------------- SIMPLE DEMOD STUBS -----------------------------
def demod_am(iq: "np.ndarray") -> "np.ndarray":
# envelope detector
return np.abs(iq)
def demod_fm(iq: "np.ndarray") -> "np.ndarray":
# WBFM/NBFM discriminator (phase diff)
ph = np.unwrap(np.angle(iq))
d = np.diff(ph)
return np.concatenate([[0.0], d])
def deemphasis(audio: "np.ndarray", fs: float, tau: float = 75e-6) -> "np.ndarray":
if not HAS_SCIPY:
return audio
b, a = butter(1, 1/(2*math.pi*tau) / (fs/2))
return lfilter(b, a, audio)
def write_wav(path: str, pcm: "np.ndarray", fs: float):
"""Minimal WAV writer for 16‑bit PCM."""
pcm16 = np.clip(pcm / (np.max(np.abs(pcm)) + 1e-12), -1, 1)
pcm16 = (pcm16 * 32767.0).astype(np.int16)
with open(path, "wb") as f:
# RIFF header
f.write(b"RIFF")
f.write(struct.pack("<I", 36 + pcm16.nbytes))
f.write(b"WAVEfmt ")
f.write(struct.pack("<IHHIIHH", 16, 1, 1, int(fs), int(fs)*2, 2, 16))
f.write(b"data")
f.write(struct.pack("<I", pcm16.nbytes))
f.write(pcm16.tobytes())
# ---------------------------- SDR FRONTENDS (RX) -----------------------------
class SDRSource:
"""Abstract: yield IQ blocks at sample rate fs centered at f_center."""
def __init__(self, device: str, center: float, samp_rate: float, gain: Optional[float]=None):
self.device = device; self.center = center; self.samp_rate = samp_rate; self.gain = gain
def __iter__(self):
raise NotImplementedError
class SDRSim(SDRSource):
def __iter__(self):
if not HAS_NUMPY:
yield None; return
N = 262144
t = np.arange(N)/self.samp_rate
# compose a few carriers
tones = [ (self.center + d, 0.5) for d in (-0.4e6, 0.0, 0.9e6) ]
x = np.zeros(N, dtype=np.complex64)
for f0, a in tones:
x += a * np.exp(1j*2*np.pi*(f0-self.center)*t)
x += (np.random.randn(N)+1j*np.random.randn(N))*0.05
for i in range(0, N, 16384):
yield x[i:i+16384]
class RTLSource(SDRSource):
def __iter__(self):
if not (HAS_RTL and is_live() and device_allowed(self.device)):
# fall back to sim
yield from SDRSim(self.device, self.center, self.samp_rate, self.gain)
return
sdr = rtlsdr.RtlSdr()
sdr.sample_rate = self.samp_rate
sdr.center_freq = self.center
if self.gain is not None: sdr.gain = self.gain
for _ in range(64):
data = sdr.read_samples(16384)
yield np.array(data, dtype=np.complex64)
sdr.close()
class SoapySource(SDRSource):
def __iter__(self):
if not (HAS_SOAPY and is_live() and device_allowed(self.device)):
yield from SDRSim(self.device, self.center, self.samp_rate, self.gain)
return
sdr = SoapySDR.Device(dict(driver=self.device.split(":",1)[-1]))
sdr.setSampleRate(SOAPY_SDR_RX, 0, self.samp_rate)
sdr.setFrequency(SOAPY_SDR_RX, 0, self.center)
if self.gain is not None:
try: sdr.setGain(SOAPY_SDR_RX, 0, self.gain)
except Exception: pass
rx = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
sdr.activateStream(rx)
buff = np.empty(16384, dtype=np.complex64)
for _ in range(64):
sr = sdr.readStream(rx, [buff], len(buff))
if sr.ret > 0:
yield buff[:sr.ret].copy()
sdr.deactivateStream(rx); sdr.closeStream(rx)
def get_source(device: str, center: float, samp_rate: float, gain: Optional[float]) -> SDRSource:
if device.lower().startswith("rtlsdr"):
return RTLSource(device, center, samp_rate, gain)
if device.lower().startswith("soapy"):
return SoapySource(device, center, samp_rate, gain)
return SDRSim(device, center, samp_rate, gain)
# ---------------------------- ROUTINES ---------------------------------------
def analyze_window(device: str, center: float, samp_rate: float, seconds: int = 2, gain: Optional[float]=None) -> Dict[str,Any]:
src = get_source(device, center, samp_rate, gain)
agg_peak = None
if not HAS_NUMPY:
res = {"note":"numpy not available; returning band guess only",
"bands":[b.__dict__ for b in bands_covering(center)],
"service_guess": guess_service(center)}
audit("spectrum-analyze", device, {"center":center, "fs":samp_rate}, res); return res
# accumulate a few blocks
blocks = []
t0 = time.time()
for blk in src:
if blk is None: break
blocks.append(blk)
if time.time() - t0 > seconds: break
if not blocks:
res = {"error":"no samples"}
audit("spectrum-analyze", device, {"center":center,"fs":samp_rate}, res); return res
x = np.concatenate(blocks)
peaks = psd_peaks(x, samp_rate)
service = guess_service(center + peaks.get("center_offset_hz", 0.0), peaks.get("bw_est_hz"))
res = {
"center_hz": center,
"fs_hz": samp_rate,
"bands": [b.__dict__ for b in bands_covering(center)],
"peaks": peaks,
"service_guess": service
}
audit("spectrum-analyze", device, {"center":center,"fs":samp_rate}, res)
return res
def analyze_iq_file(path: str, fs: float, center: float) -> Dict[str,Any]:
if not HAS_NUMPY:
res = {"note":"numpy not available"}
audit("iq-analyze", path, {"fs":fs,"center":center}, res); return res
iq, _ = load_iq_file(path, "complex64")
if iq is None:
res = {"error":"failed to load IQ"}
audit("iq-analyze", path, {"fs":fs,"center":center}, res); return res
peaks = psd_peaks(iq, fs)
service = guess_service(center + peaks.get("center_offset_hz", 0.0), peaks.get("bw_est_hz"))
tones = goertzel_tones(iq[:65536], fs, [1000.0, 1900.0]) # example tones
res = {"file": path, "fs_hz": fs, "center_hz": center, "peaks": peaks, "service_guess": service, "tones": tones}
audit("iq-analyze", path, {"fs":fs,"center":center}, res)
return res
def demod_fm_from_file(path: str, fs: float, deemph: float, audio_out: Optional[str]) -> Dict[str,Any]:
if not HAS_NUMPY:
res = {"error":"numpy not available"}
audit("demod-fm", path, {"fs":fs}, res); return res
iq, _ = load_iq_file(path, "complex64")
if iq is None or iq.size < 8192:
res = {"error":"too few samples"}
audit("demod-fm", path, {"fs":fs}, res); return res
y = demod_fm(iq)
# decimate to audio ~ 48 kHz
if HAS_SCIPY:
dec = max(1, int(fs // 48000))
y = decimate(y, dec, zero_phase=True)
fs_out = fs/dec
y = deemphasis(y, fs_out, tau=deemph)
else:
fs_out = fs
if audio_out:
write_wav(audio_out, y.real, fs_out)
res = {"file": path, "audio_out": audio_out or None, "samples": int(y.size), "fs_audio": fs_out}
audit("demod-fm", path, {"fs":fs}, res); return res
# ---------------------------- CLI -------------------------------------------
def main():
ap = argparse.ArgumentParser(description="UCLS Signal Gateway — RX-only spectrum & signal analyzer")
sp = ap.add_subparsers(dest="cmd")
sp.add_parser("demo", help="Simulated panorama and band classification")
p_scan = sp.add_parser("scan", help="Scan a window (RX only)")
p_scan.add_argument("--device", default="rtlsdr", help="rtlsdr | soapy:driver=... | sim")
p_scan.add_argument("--center", type=float, required=True)
p_scan.add_argument("--span", type=float, default=10e6)
p_scan.add_argument("--step", type=float, default=2e6)
p_scan.add_argument("--seconds", type=int, default=2)
p_scan.add_argument("--samp-rate", type=float, default=2.4e6)
p_scan.add_argument("--gain", type=float, default=None)
p_aiq = sp.add_parser("analyze-iq", help="Analyze an IQ file (complex64)")
p_aiq.add_argument("--file", required=True)
p_aiq.add_argument("--samp-rate", type=float, required=True)
p_aiq.add_argument("--center", type=float, required=True)
p_fm = sp.add_parser("demod-fm", help="Demodulate FM (RX only) from IQ file")
p_fm.add_argument("--file", required=True)
p_fm.add_argument("--samp-rate", type=float, required=True)
p_fm.add_argument("--deemph", type=float, default=75e-6)
p_fm.add_argument("--audio-out", required=False)
args = ap.parse_args()
if args.cmd == "demo" or args.cmd is None:
# Sim: choose a few centers (FM, Wi‑Fi, GNSS) and report
demo_centers = [100e6, 2.437e9, 1.57542e9]
out = []
for c in demo_centers:
out.append(analyze_window("sim", c, 2.4e6, seconds=1))
print(json.dumps(out, ensure_ascii=False, indent=2))
return
if args.cmd == "scan":
if not is_live() and not args.device.lower().startswith("sim"):
print("# LIVE=0: running in simulation mode. Use device 'sim' or set UCLS_LIVE=1 and whitelist device via UCLS_SDR_WHITELIST.", file=sys.stderr)
# naive multi‑step scan around center
centers = []
span = args.span; step = args.step
nsteps = max(1, int(span//step))
start = args.center - span/2.0
for i in range(nsteps+1):
centers.append(start + i*step)
results = [analyze_window(args.device, c, args.samp_rate, args.seconds, args.gain) for c in centers]
print(json.dumps({"scan": results}, ensure_ascii=False, indent=2))
return
if args.cmd == "analyze-iq":
res = analyze_iq_file(args.file, args.samp_rate, args.center)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if args.cmd == "demod-fm":
res = demod_fm_from_file(args.file, args.samp_rate, args.deemph, args.audio_out)
print(json.dumps(res, ensure_ascii=False, indent=2))
return
if __name__ == "__main__":
main()
What this gives you (concise):
- A read‑only SDR gateway that can listen and describe signals across the spectrum, map them to band/services, and extract features (peaks, bandwidth, tones).
- Optional demod (AM/FM/FSK/PSK stubs—FM WAV writer included), strictly from IQ files or whitelisted SDR devices; no transmit.
- Clean safety rails:
UCLS_LIVE=0default, device allowlist, JSON audit logs, and no decoding of restricted/encrypted content. - Drop‑in friendly: call this module from your
ucls_infra_gateway_v2.pyto make the whole UCLS stack spectrum‑aware while staying compliant.
If you want, I can wire this directly into your v2 gateway with new subcommands (signal-scan, signal-analyze-iq, signal-demod-fm) and add MQTT/WebSocket publishers for dashboards.