import numpy as np
from scipy import ndimage as ndi
[docs]
def perturb_mask_iou_3d(
G,
target_iou,
band=2,
p_fn=0.5,
rng=None,
empty_k=64, # how many voxels to turn on if G is empty
empty_blob_radius=1, # blob radius in voxels (1 => 3x3x3-ish ball)
max_tries=4000,
):
"""
3D spatially-realistic perturbation:
P = G, then remove FN mainly from inner boundary band, add FP mainly to outer boundary band.
Attempts to achieve IoU ~= target_iou; if exact feasibility is tricky, chooses the closest feasible pair.
Graceful behavior:
- If G is empty: return a small random blob of positives (size ~ empty_k).
"""
G = np.asarray(G, dtype=bool)
if rng is None:
rng = np.random.default_rng()
t = float(target_iou)
if not (0 < t <= 1):
raise ValueError("target_iou must be in (0, 1].")
# ---- Empty GT: fail gracefully by returning some predicted positives ----
g = int(G.sum())
if g == 0:
P = np.zeros_like(G, dtype=bool)
# make a small spherical-ish blob at a random center
zdim, ydim, xdim = P.shape
cz = rng.integers(0, zdim)
cy = rng.integers(0, ydim)
cx = rng.integers(0, xdim)
r = int(max(0, empty_blob_radius))
zz, yy, xx = np.ogrid[-r : r + 1, -r : r + 1, -r : r + 1]
se = (xx * xx + yy * yy + zz * zz) <= r * r
coords = np.array(np.nonzero(se)).T - r # offsets
# place offsets, clipping to volume
pts = []
for dz, dy, dx in coords:
z, y, x = cz + dz, cy + dy, cx + dx
if 0 <= z < zdim and 0 <= y < ydim and 0 <= x < xdim:
pts.append((z, y, x))
# if blob is too small/large relative to empty_k, sprinkle random voxels too
for z, y, x in pts:
P[z, y, x] = True
need = max(0, int(empty_k) - int(P.sum()))
if need > 0:
flat = rng.choice(P.size, size=min(need, P.size), replace=False)
P.flat[flat] = True
return P
if t == 1:
return G.copy()
bg = ~G
b = int(bg.sum())
def iou_from_ra(r, a):
# For this construction: |G∩P| = g-r, |G∪P| = g+a
return (g - r) / (g + a) if (g + a) > 0 else 0.0
def a_from_r(r):
# integer FP count induced by r to hit target (subject to rounding)
return int(round((g - r) / t - g))
# ---- Choose an initial r near expected error split ----
# total “error mass” ~ g*(1-t); allocate p_fn to FN portion heuristically
r0 = int(np.clip(round(p_fn * g * (1 - t)), 0, g))
jitter = max(5, g // 80)
best = None # (abs_error, r, a, achieved_iou)
found_exactish = False
# ---- 1) Local randomized search around r0 (fast path) ----
for _ in range(int(max_tries)):
r = int(np.clip(r0 + rng.integers(-jitter, jitter + 1), 0, g))
a = a_from_r(r)
if 0 <= a <= b:
achieved = iou_from_ra(r, a)
err = abs(achieved - t)
cand = (err, r, a, achieved)
if best is None or cand < best:
best = cand
# If we're extremely close, stop early.
if err <= (1.0 / max(1, g)): # “one-voxel” scale tolerance
found_exactish = True
break
# ---- 2) Guaranteed fallback: broaden search deterministically if needed ----
if best is None or (not found_exactish and best[0] > 1.0 / max(1, g)):
# Try a coarse sweep of r values (bounded cost) to guarantee feasibility.
# We avoid O(g) when g is huge by sampling r candidates across [0,g].
n = 2048 # cap candidates
if g <= n:
r_candidates = np.arange(g + 1, dtype=int)
else:
r_candidates = np.unique(
np.concatenate(
[
np.linspace(0, g, n, dtype=int),
np.array(
[max(0, r0 - 4 * jitter), r0, min(g, r0 + 4 * jitter)],
dtype=int,
),
]
)
)
for r in r_candidates:
a = a_from_r(int(r))
if 0 <= a <= b:
achieved = iou_from_ra(int(r), int(a))
err = abs(achieved - t)
cand = (err, int(r), int(a), achieved)
if best is None or cand < best:
best = cand
if best is None:
# Absolute last resort: clamp a into feasible range and pick r to minimize error.
# This guarantees we return something rather than crash.
r = int(np.clip(r0, 0, g))
a = int(np.clip(a_from_r(r), 0, b))
best = (abs(iou_from_ra(r, a) - t), r, a, iou_from_ra(r, a))
_, r, a, achieved_iou = best
# ---- Build boundary bands for spatial realism ----
band = int(band)
if band > 0:
zz, yy, xx = np.ogrid[-band : band + 1, -band : band + 1, -band : band + 1]
se = (xx * xx + yy * yy + zz * zz) <= band * band
inner = G & ~ndi.binary_erosion(G, structure=se)
outer = ndi.binary_dilation(G, structure=se) & ~G
else:
inner, outer = G, ~G
P = G.copy()
def sample(mask, k):
idx = np.flatnonzero(mask)
if k <= 0 or idx.size == 0:
return np.array([], dtype=np.int64)
k = min(int(k), idx.size)
return rng.choice(idx, size=k, replace=False)
# FN removals (prefer inner band)
drop = sample(inner, r)
P.flat[drop] = False
rem = r - drop.size
if rem > 0:
drop2 = sample(P & G, rem)
P.flat[drop2] = False
# FP additions (prefer outer band)
add = sample(outer & ~P, a)
P.flat[add] = True
rem = a - add.size
if rem > 0:
add2 = sample((~G) & ~P, rem)
P.flat[add2] = True
return P
# ----------------- normalization -----------------
[docs]
def normalize_distance(distance: float, voxel_size) -> float:
if distance == np.inf:
return 0.0
return float(1.01 ** (-distance / np.linalg.norm(voxel_size)))
[docs]
def inv_normalize_distance(score: float, voxel_size) -> float:
"""Inverse of normalize_distance for score in (0,1]. Returns physical distance."""
score = float(score)
if score <= 0.0:
return np.inf
if score >= 1.0:
return 0.0
return -np.linalg.norm(voxel_size) * (np.log(score) / np.log(1.01))
# ----------------- geometry helpers -----------------
def _surface_3d(M: np.ndarray) -> np.ndarray:
st = ndi.generate_binary_structure(3, 1) # 6-connect
return M & ~ndi.binary_erosion(M, structure=st, iterations=1, border_value=0)
def _ball_se(r: int) -> np.ndarray:
r = int(r)
zz, yy, xx = np.ogrid[-r:r+1, -r:r+1, -r:r+1]
return (xx*xx + yy*yy + zz*zz) <= r*r
def _edge_margin_mm(shape, voxel_size) -> np.ndarray:
"""Per-voxel minimum physical distance to array boundary."""
Z, Y, X = shape
vz, vy, vx = voxel_size
z = np.arange(Z)[:, None, None]
y = np.arange(Y)[None, :, None]
x = np.arange(X)[None, None, :]
dz = np.minimum(z, (Z - 1) - z) * vz
dy = np.minimum(y, (Y - 1) - y) * vy
dx = np.minimum(x, (X - 1) - x) * vx
return np.minimum(np.minimum(dz, dy), dx)
def _pick_seed_on_surface_max_margin(S: np.ndarray, voxel_size, rng: np.random.Generator) -> int | None:
"""Pick a surface voxel flat-index, preferring max boundary margin (stable for 'out' bumps)."""
sidx = np.flatnonzero(S)
if sidx.size == 0:
return None
margin = _edge_margin_mm(S.shape, voxel_size).ravel()[sidx]
mmax = margin.max()
top = sidx[margin >= (mmax - 1e-6)]
return int(rng.choice(top))
def _max_radius_out_mm(M: np.ndarray, voxel_size) -> float:
"""Max outward bump radius limited by distance to volume boundary."""
S = _surface_3d(M)
if not S.any():
return 0.0
margin = _edge_margin_mm(M.shape, voxel_size)
return float(margin[S].max()) * 0.95
def _max_radius_in_mm(M: np.ndarray, voxel_size) -> float:
"""Max inward dent radius limited by object thickness."""
if not M.any():
return 0.0
dt_in = ndi.distance_transform_edt(M, sampling=voxel_size)
return float(dt_in.max()) * 0.95
# ----------------- hausdorff on voxels (your definition, no ROI needed for report) -----------------
[docs]
def hausdorff_voxels_mm(A: np.ndarray, B: np.ndarray, voxel_size) -> float:
"""
Symmetric Hausdorff distance between full voxel sets A and B (physical units).
Mirrors your "standard" method (max of directed maxima).
"""
A = A.astype(bool)
B = B.astype(bool)
a_n = int(A.sum())
b_n = int(B.sum())
if a_n == 0 and b_n == 0:
return 0.0
if a_n == 0 or b_n == 0:
return np.inf
dt_to_B = ndi.distance_transform_edt(~B, sampling=voxel_size)
dt_to_A = ndi.distance_transform_edt(~A, sampling=voxel_size)
fwd = dt_to_B[A]
bwd = dt_to_A[B]
return float(max(fwd.max(initial=0.0), bwd.max(initial=0.0)))
# ----------------- main perturbation -----------------
[docs]
def perturb_gt_instances_to_mean_norm_hd(
gt_labels: np.ndarray,
target_mean_norm: float,
voxel_size=(1.0, 1.0, 1.0),
mode: str = "out", # "out" bump (FP), "in" dent (FN), "random"
band_vox: int = 2, # thickness of patch selection around seed (in voxels)
avoid_instance_overlap: bool = True, # when bumping out, only add into background
report: bool = True, # compute achieved mean via hausdorff_voxels_mm (slower)
rng: np.random.Generator | None = None,
):
"""
Perturb GT instance labels so that the mean over instances of normalize_distance(HD_i) is ~ target_mean_norm.
- Equal weight per instance (mean of normalized per-instance HD).
- Creates spatially realistic local bumps/dents using EDT.
- Returns (perturbed_labels, info_dict) if report else perturbed_labels.
Known divergence sources:
- per-instance feasibility caps (r_max)
- voxel discretization
- overlap avoidance (bumps restricted to background)
"""
if rng is None:
rng = np.random.default_rng()
gt_labels = np.asarray(gt_labels)
if gt_labels.ndim != 3:
raise ValueError("gt_labels must be a 3D volume.")
voxel_size = np.asarray(voxel_size, dtype=float)
if voxel_size.size != 3:
voxel_size = voxel_size[-3:]
T = float(target_mean_norm)
if not (0.0 < T <= 1.0):
raise ValueError("target_mean_norm must be in (0, 1].")
ids = [int(i) for i in np.unique(gt_labels) if i != 0]
if not ids:
out = gt_labels.copy()
info = {
"target_mean_norm": T,
"achieved_mean_norm": 1.0,
"mean_divergence": 1.0 - T,
"note": "No instances in GT; nothing perturbed.",
}
return (out, info) if report else out
# ---- choose per-instance feasible minima (in normalized space) ----
# If mode=="random", use a conservative cap that works for either direction.
rmax = {}
smin = {}
for i in ids:
M = (gt_labels == i)
if mode == "in":
r = _max_radius_in_mm(M, voxel_size)
elif mode == "out":
r = _max_radius_out_mm(M, voxel_size)
else:
r = min(_max_radius_in_mm(M, voxel_size), _max_radius_out_mm(M, voxel_size))
rmax[i] = float(r)
smin[i] = normalize_distance(rmax[i], voxel_size) # smallest achievable score (worst)
best_possible_mean = float(np.mean([smin[i] for i in ids]))
# If target is below what is feasible, best effort = drive all to smin
if T < best_possible_mean:
target_s = {i: smin[i] for i in ids}
else:
# Allocate drops from 1.0 down toward T, using instances with most drop capacity first
total_drop = len(ids) * (1.0 - T)
target_s = {i: 1.0 for i in ids}
order = sorted(ids, key=lambda i: (1.0 - smin[i]), reverse=True)
for i in order:
cap = 1.0 - smin[i]
use = min(cap, total_drop)
target_s[i] = 1.0 - use
total_drop -= use
if total_drop <= 1e-12:
break
# ---- apply perturbations ----
out = gt_labels.copy()
background = (out == 0)
se = _ball_se(max(1, int(band_vox)))
st = ndi.generate_binary_structure(3, 1)
per_instance_r = {}
for i in ids:
M = (out == i) # note: uses current out; overlap avoidance keeps this stable
if not M.any():
per_instance_r[i] = 0.0
continue
# pick direction if random
dir_mode = mode
if dir_mode == "random":
dir_mode = "out" if rng.random() < 0.5 else "in"
# target radius in mm, capped by feasibility
r = inv_normalize_distance(target_s[i], voxel_size)
if r == np.inf:
# "score 0" request -> use max feasible for that instance
r = rmax[i]
else:
r = min(float(r), rmax[i])
per_instance_r[i] = float(r)
if r <= 0.0:
continue
S = M & ~ndi.binary_erosion(M, structure=st, iterations=1, border_value=0)
if not S.any():
continue
# seed selection
if dir_mode == "out":
seed_flat = _pick_seed_on_surface_max_margin(S, voxel_size, rng)
else:
sidx = np.flatnonzero(S)
seed_flat = int(rng.choice(sidx)) if sidx.size else None
if seed_flat is None:
continue
seeds = np.zeros_like(out, dtype=bool)
seeds.ravel()[seed_flat] = True
# patch around seed (band-limited in voxels for locality, then pushed by r in mm)
# 1) restrict to a local surface patch using voxel-based dilation (cheap + stable)
patch = S & ndi.binary_dilation(seeds, structure=se, iterations=1)
# 2) push region by physical radius r using EDT to patch
dt_patch = ndi.distance_transform_edt(~patch, sampling=voxel_size)
if dir_mode == "out":
add = (dt_patch <= r)
if avoid_instance_overlap:
add = add & background # only grow into background
out[add] = i
background = (out == 0)
else:
rem = (dt_patch <= r) & (out == i)
out[rem] = 0
background = (out == 0)
if not report:
return out
# ---- report achieved score + divergence ----
achieved_scores = []
requested_scores = []
achieved_r = {}
for i in ids:
A = (gt_labels == i)
B = (out == i)
d = hausdorff_voxels_mm(A, B, voxel_size)
s = normalize_distance(d, voxel_size)
achieved_scores.append(s)
requested_scores.append(float(target_s[i]))
achieved_r[i] = float(d)
achieved_mean = float(np.mean(achieved_scores)) if achieved_scores else 1.0
info = {
"target_mean_norm": T,
"best_possible_mean_norm": best_possible_mean,
"achieved_mean_norm": achieved_mean,
"mean_divergence": achieved_mean - T,
"per_instance_requested_norm": target_s,
"per_instance_target_radius_mm": per_instance_r,
"per_instance_achieved_hd_mm": achieved_r,
"per_instance_achieved_norm": {i: float(s) for i, s in zip(ids, achieved_scores)},
"note": (
"If target < best_possible_mean_norm, target is infeasible; output is best-effort. "
"Nonzero divergence can also come from discretization and overlap-avoidance."
),
}
return out, info