import argparse
import json
import os
import zipfile
import numpy as np
import zarr
from scipy.optimize import linear_sum_assignment
from scipy.spatial.distance import dice # , jaccard
from skimage.measure import label as relabel
from skimage.transform import rescale
from scipy.spatial import cKDTree
from sklearn.metrics import accuracy_score, jaccard_score
from tqdm import tqdm
from upath import UPath
from cellmap_data import CellMapImage
import zarr.errors
from .config import PROCESSED_PATH, SUBMISSION_PATH, BASE_DATA_PATH
from .utils import TEST_CROPS, TEST_CROPS_DICT
INSTANCE_CLASSES = [
"nuc",
"vim",
"ves",
"endo",
"lyso",
"ld",
"perox",
"mito",
"np",
"mt",
"cell",
"instance",
]
HAUSDORFF_DISTANCE_MAX = np.inf
TRUTH_PATH = (BASE_DATA_PATH / "ground_truth.zarr").path
[docs]
def unzip_file(zip_path):
"""
Unzip a zip file to a specified directory.
Args:
zip_path (str): The path to the zip file.
Example usage:
unzip_file('submission.zip')
"""
saved_path = UPath(zip_path).with_suffix(".zarr").path
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(saved_path)
print(f"Unzipped {zip_path} to {saved_path}")
return UPath(saved_path)
[docs]
def save_numpy_class_labels_to_zarr(
save_path, test_volume_name, label_name, labels, overwrite=False, attrs=None
):
"""
Save a single 3D numpy array of class labels to a
Zarr-2 file with the required structure.
Args:
save_path (str): The path to save the Zarr-2 file (ending with <filename>.zarr).
test_volume_name (str): The name of the test volume.
label_names (str): The names of the labels.
labels (np.ndarray): A 3D numpy array of class labels.
overwrite (bool): Whether to overwrite the Zarr-2 file if it already exists.
attrs (dict): A dictionary of attributes to save with the Zarr-2 file.
Example usage:
# Generate random class labels, with 0 as background
labels = np.random.randint(0, 4, (128, 128, 128))
save_numpy_labels_to_zarr('submission.zarr', 'test_volume', ['label1', 'label2', 'label3'], labels)
"""
# Create a Zarr-2 file
if not UPath(save_path).exists():
os.makedirs(UPath(save_path).parent, exist_ok=True)
print(f"Saving to {save_path}")
store = zarr.DirectoryStore(save_path)
zarr_group = zarr.group(store)
# Save the test volume group
zarr_group.create_group(test_volume_name, overwrite=overwrite)
# Save the labels
for i, label_name in enumerate(label_name):
print(f"Saving {label_name}")
ds = zarr_group[test_volume_name].create_dataset(
label_name,
data=(labels == i + 1),
chunks=64,
# compressor=zarr.Blosc(cname='zstd', clevel=3, shuffle=2),
)
for k, v in (attrs or {}).items():
ds.attrs[k] = v
print("Done saving")
[docs]
def save_numpy_class_arrays_to_zarr(
save_path, test_volume_name, label_names, labels, mode="append", attrs=None
):
"""
Save a list of 3D numpy arrays of binary or instance labels to a
Zarr-2 file with the required structure.
Args:
save_path (str): The path to save the Zarr-2 file (ending with <filename>.zarr).
test_volume_name (str): The name of the test volume.
label_names (list): A list of label names corresponding to the list of 3D numpy arrays.
labels (list): A list of 3D numpy arrays of binary labels.
mode (str): The mode to use when saving the Zarr-2 file. Options are 'append' or 'overwrite'.
attrs (dict): A dictionary of attributes to save with the Zarr-2 file.
Example usage:
label_names = ['label1', 'label2', 'label3']
# Generate random binary volumes for each label
labels = [np.random.randint(0, 2, (128, 128, 128)) for _ in range len(label_names)]
save_numpy_binary_to_zarr('submission.zarr', 'test_volume', label_names, labels)
"""
# Create a Zarr-2 file
if not UPath(save_path).exists():
os.makedirs(UPath(save_path).parent, exist_ok=True)
print(f"Saving to {save_path}")
store = zarr.DirectoryStore(save_path)
zarr_group = zarr.group(store)
# Save the test volume group
try:
zarr_group.create_group(test_volume_name, overwrite=(mode == "overwrite"))
except zarr.errors.ContainsGroupError:
print(f"Appending to existing group {test_volume_name}")
# Save the labels
for i, label_name in enumerate(label_names):
print(f"Saving {label_name}")
ds = zarr_group[test_volume_name].create_dataset(
label_name,
data=labels[i],
chunks=64,
# compressor=zarr.Blosc(cname='zstd', clevel=3, shuffle=2),
)
for k, v in (attrs or {}).items():
ds.attrs[k] = v
print("Done saving")
[docs]
def resize_array(arr, target_shape, pad_value=0):
"""
Resize an array to a target shape by padding or cropping as needed.
Parameters:
arr (np.ndarray): Input array to resize.
target_shape (tuple): Desired shape for the output array.
pad_value (int, float, etc.): Value to use for padding if the array is smaller than the target shape.
Returns:
np.ndarray: Resized array with the specified target shape.
"""
arr_shape = arr.shape
resized_arr = arr
# Pad if the array is smaller than the target shape
pad_width = []
for i in range(len(target_shape)):
if arr_shape[i] < target_shape[i]:
# Padding needed: calculate amount for both sides
pad_before = (target_shape[i] - arr_shape[i]) // 2
pad_after = target_shape[i] - arr_shape[i] - pad_before
pad_width.append((pad_before, pad_after))
else:
# No padding needed for this dimension
pad_width.append((0, 0))
if any(pad > 0 for pads in pad_width for pad in pads):
resized_arr = np.pad(
resized_arr, pad_width, mode="constant", constant_values=pad_value
)
# Crop if the array is larger than the target shape
slices = []
for i in range(len(target_shape)):
if arr_shape[i] > target_shape[i]:
# Calculate cropping slices to center the crop
start = (arr_shape[i] - target_shape[i]) // 2
end = start + target_shape[i]
slices.append(slice(start, end))
else:
# No cropping needed for this dimension
slices.append(slice(None))
return resized_arr[tuple(slices)]
[docs]
def hausdorff_distance(
image0, image1, voxel_size, max_distance=np.inf, method="standard"
):
"""Calculate the Hausdorff distance between nonzero elements of given images.
Modified from the `scipy.spatial.distance.hausdorff` function by Jeff Rhoades (2024) to handle non-isotropic resolutions.
Parameters
----------
image0, image1 : ndarray
Arrays where ``True`` represents a point that is included in a
set of points. Both arrays must have the same shape.
voxel_size : tuple
The size of a voxel in each dimension. Assumes the same resolution for both images.
max_distance : float, optional
The maximum distance to consider. Default is infinity.
method : {'standard', 'modified'}, optional, default = 'standard'
The method to use for calculating the Hausdorff distance.
``standard`` is the standard Hausdorff distance, while ``modified``
is the modified Hausdorff distance.
Returns
-------
distance : float
The Hausdorff distance between coordinates of nonzero pixels in
``image0`` and ``image1``, using the Euclidean distance.
Notes
-----
The Hausdorff distance [1]_ is the maximum distance between any point on
``image0`` and its nearest point on ``image1``, and vice-versa.
The Modified Hausdorff Distance (MHD) has been shown to perform better
than the directed Hausdorff Distance (HD) in the following work by
Dubuisson et al. [2]_. The function calculates forward and backward
mean distances and returns the largest of the two.
References
----------
.. [1] http://en.wikipedia.org/wiki/Hausdorff_distance
.. [2] M. P. Dubuisson and A. K. Jain. A Modified Hausdorff distance for object
matching. In ICPR94, pages A:566-568, Jerusalem, Israel, 1994.
:DOI:`10.1109/ICPR.1994.576361`
http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.1.8155
"""
if method not in ("standard", "modified"):
raise ValueError(f"unrecognized method {method}")
a_points = np.argwhere(image0)
b_points = np.argwhere(image1)
# Handle empty sets properly:
# - if both sets are empty, return zero
# - if only one set is empty, return infinity
if len(a_points) == 0:
return 0 if len(b_points) == 0 else np.inf
elif len(b_points) == 0:
return np.inf
# Scale the points by the voxel size
a_points = a_points * np.array(voxel_size)
b_points = b_points * np.array(voxel_size)
fwd, bwd = (
cKDTree(a_points).query(b_points, k=1, distance_upper_bound=max_distance)[0],
cKDTree(b_points).query(a_points, k=1, distance_upper_bound=max_distance)[0],
)
if method == "standard": # standard Hausdorff distance
return max(max(fwd), max(bwd))
elif method == "modified": # modified Hausdorff distance
return max(np.mean(fwd), np.mean(bwd))
[docs]
def score_instance(
pred_label, truth_label, voxel_size, hausdorff_distance_max=HAUSDORFF_DISTANCE_MAX
) -> dict[str, float]:
"""
Score a single instance label volume against the ground truth instance label volume.
Args:
pred_label (np.ndarray): The predicted instance label volume.
truth_label (np.ndarray): The ground truth instance label volume.
voxel_size (tuple): The size of a voxel in each dimension.
hausdorff_distance_max (float): The maximum distance to consider for the Hausdorff distance.
Returns:
dict: A dictionary of scores for the instance label volume.
Example usage:
scores = score_instance(pred_label, truth_label)
"""
# Relabel the predicted instance labels to be consistent with the ground truth instance labels
print("Scoring instance segmentation...")
pred_label = relabel(pred_label, connectivity=len(pred_label.shape))
# pred_label = label(pred_label)
# Construct the cost matrix for Hungarian matching
pred_ids = np.unique(pred_label)
truth_ids = np.unique(truth_label)
cost_matrix = np.zeros((len(truth_ids), len(pred_ids)))
bar = tqdm(pred_ids, desc="Computing cost matrix", leave=True, dynamic_ncols=True)
for j, pred_id in enumerate(bar):
if pred_id == 0:
# Don't score the background
continue
pred_mask = pred_label == pred_id
these_truth_ids = np.unique(truth_label[pred_mask])
truth_indices = [
np.argmax(truth_ids == truth_id) for truth_id in these_truth_ids
]
for i, truth_id in zip(truth_indices, these_truth_ids):
if truth_id == 0:
# Don't score the background
continue
truth_mask = truth_label == truth_id
cost_matrix[i, j] = jaccard_score(truth_mask.flatten(), pred_mask.flatten())
# Match the predicted instances to the ground truth instances
row_inds, col_inds = linear_sum_assignment(cost_matrix, maximize=True)
# Contruct the volume for the matched instances
matched_pred_label = np.zeros_like(pred_label)
for i, j in tqdm(
zip(col_inds, row_inds), desc="Relabeled matched instances", dynamic_ncols=True
):
if pred_ids[i] == 0 or truth_ids[j] == 0:
# Don't score the background
continue
pred_mask = pred_label == pred_ids[i]
matched_pred_label[pred_mask] = truth_ids[j]
hausdorff_distances = []
for truth_id in tqdm(
truth_ids, desc="Computing Hausdorff distances", dynamic_ncols=True
):
if truth_id == 0:
# Don't score the background
continue
h_dist = hausdorff_distance(
truth_label == truth_id,
matched_pred_label == truth_id,
voxel_size,
)
h_dist = min(h_dist, hausdorff_distance_max)
hausdorff_distances.append(h_dist)
# Compute the scores
accuracy = accuracy_score(truth_label.flatten(), matched_pred_label.flatten())
hausdorff_dist = np.mean(hausdorff_distances) if hausdorff_distances else 0
normalized_hausdorff_dist = 1.01 ** (
-hausdorff_dist / np.linalg.norm(voxel_size)
) # normalize Hausdorff distance to [0, 1] using the maximum distance represented by a voxel. 32 is abritrary chosen to have a reasonable range
combined_score = (accuracy * normalized_hausdorff_dist) ** 0.5
print(f"Accuracy: {accuracy:.4f}")
print(f"Hausdorff Distance: {hausdorff_dist:.4f}")
print(f"Normalized Hausdorff Distance: {normalized_hausdorff_dist:.4f}")
print(f"Combined Score: {combined_score:.4f}")
return {
"accuracy": accuracy,
"hausdorff_distance": hausdorff_dist,
"normalized_hausdorff_distance": normalized_hausdorff_dist,
"combined_score": combined_score,
}
[docs]
def score_semantic(pred_label, truth_label) -> dict[str, float]:
"""
Score a single semantic label volume against the ground truth semantic label volume.
Args:
pred_label (np.ndarray): The predicted semantic label volume.
truth_label (np.ndarray): The ground truth semantic label volume.
Returns:
dict: A dictionary of scores for the semantic label volume.
Example usage:
scores = score_semantic(pred_label, truth_label)
"""
print("Scoring semantic segmentation...")
# Flatten the label volumes and convert to binary
pred_label = (pred_label > 0.0).flatten()
truth_label = (truth_label > 0.0).flatten()
# Compute the scores
dice_score = 1 - dice(truth_label, pred_label)
scores = {
"iou": jaccard_score(truth_label, pred_label, zero_division=1),
"dice_score": dice_score if not np.isnan(dice_score) else 1,
}
print(f"IoU: {scores['iou']:.4f}")
print(f"Dice Score: {scores['dice_score']:.4f}")
return scores
[docs]
def score_label(
pred_label_path, truth_path=TRUTH_PATH, instance_classes=INSTANCE_CLASSES
) -> dict[str, float]:
"""
Score a single label volume against the ground truth label volume.
Args:
pred_label_path (str): The path to the predicted label volume.
Returns:
dict: A dictionary of scores for the label volume.
Example usage:
scores = score_label('pred.zarr/test_volume/label1')
"""
print(f"Scoring {pred_label_path}...")
truth_path = UPath(truth_path)
# Load the predicted and ground truth label volumes
label_name = UPath(pred_label_path).name
crop_name = UPath(pred_label_path).parent.name
truth_label_path = (truth_path / crop_name / label_name).path
truth_label_ds = zarr.open(truth_label_path, mode="r")
truth_label = truth_label_ds[:]
crop = TEST_CROPS_DICT[int(crop_name.removeprefix("crop")), label_name]
pred_label = match_crop_space(
pred_label_path,
label_name,
crop.voxel_size,
crop.shape,
crop.translation,
)
mask_path = truth_path / crop_name / f"{label_name}_mask"
if mask_path.exists():
# Mask out uncertain regions resulting from low-res ground truth annotations
print(f"Masking {label_name} with {mask_path}...")
mask = zarr.open(mask_path.path, mode="r")[:]
pred_label = pred_label * mask
truth_label = truth_label * mask
# Compute the scores
if label_name in instance_classes:
results = score_instance(pred_label, truth_label, crop.voxel_size)
else:
results = score_semantic(pred_label, truth_label)
results["num_voxels"] = int(np.prod(truth_label.shape))
results["voxel_size"] = crop.voxel_size
results["is_missing"] = False
return results
[docs]
def score_volume(
pred_volume_path, truth_path=TRUTH_PATH, instance_classes=INSTANCE_CLASSES
) -> dict[str, dict[str, float]]:
"""
Score a single volume against the ground truth volume.
Args:
pred_volume_path (str): The path to the predicted volume.
Returns:
dict: A dictionary of scores for the volume.
Example usage:
scores = score_volume('pred.zarr/test_volume')
"""
print(f"Scoring {pred_volume_path}...")
pred_volume_path = UPath(pred_volume_path)
truth_path = UPath(truth_path)
# Find labels to score
pred_labels = [a for a in zarr.open(pred_volume_path.path, mode="r").array_keys()]
volume_name = pred_volume_path.name
truth_labels = [
a for a in zarr.open((truth_path / volume_name).path, mode="r").array_keys()
]
found_labels = list(set(pred_labels) & set(truth_labels))
missing_labels = list(set(truth_labels) - set(pred_labels))
# Score each label
scores = {
label: score_label(
pred_volume_path / label,
truth_path=truth_path,
instance_classes=instance_classes,
)
for label in found_labels
}
scores.update(
{
label: (
{
"accuracy": 0,
"hausdorff_distance": 0,
"normalized_hausdorff_distance": 0,
"combined_score": 0,
"num_voxels": int(
np.prod(
zarr.open(
(truth_path / volume_name / label).path, mode="r"
).shape
)
),
"voxel_size": zarr.open(
(truth_path / volume_name / label).path, mode="r"
).attrs["voxel_size"],
"is_missing": True,
}
if label in instance_classes
else {
"iou": 0,
"dice_score": 0,
"num_voxels": int(
np.prod(
zarr.open(
(truth_path / volume_name / label).path, mode="r"
).shape
)
),
"voxel_size": zarr.open(
(truth_path / volume_name / label).path, mode="r"
).attrs["voxel_size"],
"is_missing": True,
}
)
for label in missing_labels
}
)
print(f"Missing labels: {missing_labels}")
return scores
[docs]
def missing_volume_score(
truth_volume_path, instance_classes=INSTANCE_CLASSES
) -> dict[str, dict[str, float]]:
"""
Score a missing volume as 0's, congruent with the score_volume function.
Args:
truth_volume_path (str): The path to the ground truth volume.
Returns:
dict: A dictionary of scores for the volume.
Example usage:
scores = missing_volume_score('truth.zarr/test_volume')
"""
print(f"Scoring missing volume {truth_volume_path}...")
truth_volume_path = UPath(truth_volume_path)
# Find labels to score
truth_labels = [a for a in zarr.open(truth_volume_path.path, mode="r").array_keys()]
# Score each label
scores = {
label: (
{
"accuracy": 0.0,
"hausdorff_distance": 0.0,
"normalized_hausdorff_distance": 0.0,
"combined_score": 0.0,
"num_voxels": int(
np.prod(zarr.open((truth_volume_path / label).path, mode="r").shape)
),
"voxel_size": zarr.open(
(truth_volume_path / label).path, mode="r"
).attrs["voxel_size"],
"is_missing": True,
}
if label in instance_classes
else {
"iou": 0.0,
"dice_score": 0.0,
"num_voxels": int(
np.prod(zarr.open((truth_volume_path / label).path, mode="r").shape)
),
"voxel_size": zarr.open(
(truth_volume_path / label).path, mode="r"
).attrs["voxel_size"],
"is_missing": True,
}
)
for label in truth_labels
}
return scores
[docs]
def combine_scores(scores, include_missing=True, instance_classes=INSTANCE_CLASSES):
"""
Combine scores across volumes, normalizing by the number of voxels.
Args:
scores (dict): A dictionary of scores for each volume, as returned by `score_volume`.
include_missing (bool): Whether to include missing volumes in the combined scores.
instance_classes (list): A list of instance classes.
Returns:
dict: A dictionary of combined scores across all volumes.
Example usage:
combined_scores = combine_scores(scores)
"""
# Combine label scores across volumes, normalizing by the number of voxels
print(f"Combining label scores...")
scores = scores.copy()
label_scores = {}
total_volumes = {}
for these_scores in scores.values():
for label, this_score in these_scores.items():
print(this_score)
if this_score["is_missing"] and not include_missing:
continue
total_volume = np.prod(this_score["voxel_size"]) * this_score["num_voxels"]
if label in instance_classes:
if label not in label_scores:
label_scores[label] = {
"accuracy": 0,
"hausdorff_distance": 0,
"normalized_hausdorff_distance": 0,
"combined_score": 0,
}
total_volumes[label] = 0
label_scores[label]["accuracy"] += this_score["accuracy"] * total_volume
label_scores[label]["hausdorff_distance"] += (
this_score["hausdorff_distance"] * total_volume
)
label_scores[label]["normalized_hausdorff_distance"] += (
this_score["normalized_hausdorff_distance"] * total_volume
)
label_scores[label]["combined_score"] += (
this_score["combined_score"] * total_volume
)
total_volumes[label] += total_volume
else:
if label not in label_scores:
label_scores[label] = {"iou": 0, "dice_score": 0}
total_volumes[label] = 0
label_scores[label]["iou"] += this_score["iou"] * total_volume
label_scores[label]["dice_score"] += (
this_score["dice_score"] * total_volume
)
total_volumes[label] += total_volume
# Normalize back to the total number of voxels
for label in label_scores:
if label in instance_classes:
label_scores[label]["accuracy"] /= total_volumes[label]
label_scores[label]["hausdorff_distance"] /= total_volumes[label]
label_scores[label]["normalized_hausdorff_distance"] /= total_volumes[label]
label_scores[label]["combined_score"] /= total_volumes[label]
else:
label_scores[label]["iou"] /= total_volumes[label]
label_scores[label]["dice_score"] /= total_volumes[label]
scores["label_scores"] = label_scores
# Compute the overall score
print("Computing overall scores...")
overall_instance_scores = []
overall_semantic_scores = []
for label in label_scores:
if label in instance_classes:
overall_instance_scores += [label_scores[label]["combined_score"]]
else:
overall_semantic_scores += [label_scores[label]["iou"]]
scores["overall_instance_score"] = np.mean(overall_instance_scores)
scores["overall_semantic_score"] = np.mean(overall_semantic_scores)
scores["overall_score"] = (
scores["overall_instance_score"] * scores["overall_semantic_score"]
) ** 0.5 # geometric mean
return scores
[docs]
def score_submission(
submission_path=UPath(SUBMISSION_PATH).with_suffix(".zip").path,
result_file=None,
truth_path=TRUTH_PATH,
instance_classes=INSTANCE_CLASSES,
):
"""
Score a submission against the ground truth data.
Args:
submission_path (str): The path to the zipped submission Zarr-2 file.
result_file (str): The path to save the scores.
Returns:
dict: A dictionary of scores for the submission.
Example usage:
scores = score_submission('submission.zip')
The results json is a dictionary with the following structure:
{
"volume" (the name of the ground truth volume): {
"label" (the name of the predicted class): {
(For semantic segmentation)
"iou": (the intersection over union score),
"dice_score": (the dice score),
OR
(For instance segmentation)
"accuracy": (the accuracy score),
"haussdorf_distance": (the haussdorf distance),
"normalized_haussdorf_distance": (the normalized haussdorf distance),
"combined_score": (the geometric mean of the accuracy and normalized haussdorf distance),
}
"num_voxels": (the number of voxels in the ground truth volume),
}
"label_scores": {
(the name of the predicted class): {
(For semantic segmentation)
"iou": (the mean intersection over union score),
"dice_score": (the mean dice score),
OR
(For instance segmentation)
"accuracy": (the mean accuracy score),
"haussdorf_distance": (the mean haussdorf distance),
"combined_score": (the mean geometric mean of the accuracy and haussdorf distance),
}
"overall_score": (the mean of the combined scores across all classes),
}
"""
print(f"Scoring {submission_path}...")
# Unzip the submission
submission_path = unzip_file(submission_path)
# Find volumes to score
print(f"Scoring volumes in {submission_path}...")
pred_volumes = [d.name for d in UPath(submission_path).glob("*") if d.is_dir()]
truth_path = UPath(truth_path)
print(f"Volumes: {pred_volumes}")
print(f"Truth path: {truth_path}")
truth_volumes = [d.name for d in truth_path.glob("*") if d.is_dir()]
print(f"Truth volumes: {truth_volumes}")
found_volumes = list(
set(pred_volumes) & set(truth_volumes)
) # TODO: Score all volumes, and just return 0 scores for missing volumes
missing_volumes = list(set(truth_volumes) - set(pred_volumes))
if len(found_volumes) == 0:
raise ValueError(
"No volumes found to score. Make sure the submission is formatted correctly."
)
print(f"Scoring volumes: {found_volumes}")
if len(missing_volumes) > 0:
print(f"Missing volumes: {missing_volumes}")
print("Scoring missing volumes as 0's")
# Score each volume
scores = {
volume: score_volume(
submission_path / volume,
truth_path=truth_path,
instance_classes=instance_classes,
)
for volume in found_volumes
}
scores.update(
{
volume: missing_volume_score(
truth_path / volume, instance_classes=instance_classes
)
for volume in missing_volumes
}
)
# Combine label scores across volumes, normalizing by the number of voxels
all_scores = combine_scores(
scores, include_missing=True, instance_classes=instance_classes
)
found_scores = combine_scores(
scores, include_missing=False, instance_classes=instance_classes
)
print("Scores combined across all test volumes:")
print(f"\tOverall Instance Score: {all_scores['overall_instance_score']:.4f}")
print(f"\tOverall Semantic Score: {all_scores['overall_semantic_score']:.4f}")
print(f"\tOverall Score: {all_scores['overall_score']:.4f}")
print("Scores combined across test volumes with data submitted:")
print(f"\tOverall Instance Score: {found_scores['overall_instance_score']:.4f}")
print(f"\tOverall Semantic Score: {found_scores['overall_semantic_score']:.4f}")
print(f"\tOverall Score: {found_scores['overall_score']:.4f}")
# Save the scores
if result_file:
print(f"Saving collected scores to {result_file}...")
with open(result_file, "w") as f:
json.dump(all_scores, f)
found_result_file = str(result_file).replace(
UPath(result_file).suffix, "_submitted_only" + UPath(result_file).suffix
)
print(f"Saving scores for only submitted data to {found_result_file}...")
with open(found_result_file, "w") as f:
json.dump(found_scores, f)
else:
return all_scores
[docs]
def package_submission(
input_search_path: str | UPath = PROCESSED_PATH,
output_path: str | UPath = SUBMISSION_PATH,
overwrite: bool = False,
):
"""
Package a submission for the CellMap challenge. This will create a zarr file, combining all the processed volumes, and then zip it.
Args:
input_search_path (str): The base path to the processed volumes, with placeholders for dataset and crops.
output_path (str | UPath): The path to save the submission zarr to. (ending with `<filename>.zarr`; `.zarr` will be appended if not present, and replaced with `.zip` when zipped).
overwrite (bool): Whether to overwrite the submission zarr if it already exists.
"""
input_search_path = str(input_search_path)
output_path = UPath(output_path)
output_path = output_path.with_suffix(".zarr")
# Create a zarr file to store the submission
if not output_path.exists():
os.makedirs(output_path.parent, exist_ok=True)
store = zarr.DirectoryStore(output_path)
zarr_group = zarr.group(store, overwrite=True)
# Find all the processed test volumes
for crop in TEST_CROPS:
crop_path = (
UPath(input_search_path.format(dataset=crop.dataset, crop=f"crop{crop.id}"))
/ crop.class_label
)
if not crop_path.exists():
print(f"Skipping {crop_path} as it does not exist.")
continue
if f"crop{crop.id}" not in zarr_group:
crop_group = zarr_group.create_group(f"crop{crop.id}")
else:
crop_group = zarr_group[f"crop{crop.id}"]
# Rescale the processed volumes to match the expected submission resolution if required
label_array = crop_group.create_dataset(
crop.class_label,
overwrite=overwrite,
shape=crop.shape,
)
print(f"Scaling {crop_path} to {crop.voxel_size} nm")
# Match the resolution of the processed volume to the test volume
image = match_crop_space(
path=crop_path.path,
class_label=crop.class_label,
voxel_size=crop.voxel_size,
shape=crop.shape,
translation=crop.translation,
)
# Save the processed labels to the submission zarr
label_array[:] = image
# Add the metadata
label_array.attrs["voxel_size"] = crop.voxel_size
label_array.attrs["translation"] = crop.translation
print(f"Saved submission to {output_path}")
print("Zipping submission...")
zip_submission(output_path)
print("Done packaging submission")
[docs]
def match_crop_space(path, class_label, voxel_size, shape, translation) -> np.ndarray:
"""
Match the resolution of a zarr array to a target resolution and shape, resampling as necessary with interpolation dependent on the class label. Instance segmentations will be resampled with nearest neighbor interpolation, while semantic segmentations will be resampled with linear interpolation and then thresholded.
Args:
path (str | UPath): The path to the zarr array to be adjusted. The zarr can be an OME-NGFF multiscale zarr file, or a traditional single scale formatted zarr.
class_label (str): The class label of the array.
voxel_size (tuple): The target voxel size.
shape (tuple): The target shape.
translation (tuple): The translation (i.e. offset) of the array in world units.
Returns:
np.ndarray: The rescaled array.
"""
ds = zarr.open(str(path), mode="r")
if "multiscales" in ds.attrs:
# Handle multiscale zarr files
_image = CellMapImage(
path=path,
target_class=class_label,
target_scale=voxel_size,
target_voxel_shape=shape,
pad=True,
pad_value=0,
)
path = UPath(path) / _image.scale_level
for attr in ds.attrs["multiscales"][0]["datasets"]:
if attr["path"] == _image.scale_level:
for transform in attr["coordinateTransformations"]:
if transform["type"] == "translation":
input_translation = transform["translation"]
elif transform["type"] == "scale":
input_voxel_size = transform["scale"]
break
ds = zarr.open(path.path, mode="r")
elif (
("voxel_size" in ds.attrs)
or ("resolution" in ds.attrs)
or ("scale" in ds.attrs)
) or (("translation" in ds.attrs) or ("offset" in ds.attrs)):
# Handle single scale zarr files
if "voxel_size" in ds.attrs:
input_voxel_size = ds.attrs["voxel_size"]
elif "resolution" in ds.attrs:
input_voxel_size = ds.attrs["resolution"]
elif "scale" in ds.attrs:
input_voxel_size = ds.attrs["scale"]
else:
input_voxel_size = None
if "translation" in ds.attrs:
input_translation = ds.attrs["translation"]
elif "offset" in ds.attrs:
input_translation = ds.attrs["offset"]
else:
input_translation = None
else:
print(f"Could not find voxel size and translation for {path}")
print(
"Assuming voxel size matches target voxel size and will crop to target shape centering the volume."
)
image = ds[:]
# Crop the array if necessary
if any(s1 != s2 for s1, s2 in zip(image.shape, shape)):
return resize_array(image, shape) # type: ignore
return image # type: ignore
# Load the array
image = ds[:]
# Rescale the array if necessary
if input_voxel_size is not None and any(
r1 != r2 for r1, r2 in zip(input_voxel_size, voxel_size)
):
if class_label in INSTANCE_CLASSES:
image = rescale(
image, np.divide(input_voxel_size, voxel_size), order=0, mode="constant"
)
else:
image = rescale(
image,
np.divide(input_voxel_size, voxel_size),
order=1,
mode="constant",
preserve_range=True,
)
image = image > 0.5
if input_translation is not None:
# Calculate the relative offset
adjusted_input_translation = (
np.array(input_translation) // np.array(voxel_size)
) * np.array(voxel_size)
# Positive relative offset is the amount to crop from the start, negative is the amount to pad at the start
relative_offset = (
abs(np.subtract(adjusted_input_translation, translation))
// np.array(voxel_size)
* np.sign(np.subtract(adjusted_input_translation, translation))
)
else:
# TODO: Handle the case where the translation is not provided
relative_offset = np.zeros(len(shape))
# Translate and crop the array if necessary
if any(offset != 0 for offset in relative_offset) or any(
s1 != s2 for s1, s2 in zip(image.shape, shape)
):
print(
f"Translating and cropping {path} to {shape} with offset {relative_offset}"
)
# Make destination array
result = np.zeros(shape, dtype=image.dtype)
# Calculate the slices for the source and destination arrays
input_slices = []
output_slices = []
for i in range(len(shape)):
if relative_offset[i] < 0:
# Crop from the start
input_start = abs(relative_offset[i])
output_start = 0
input_end = min(input_start + shape[i], image.shape[i])
input_length = input_end - input_start
output_end = output_start + input_length
else:
# Pad at the start
input_start = 0
output_start = relative_offset[i]
output_end = min(shape[i], image.shape[i])
input_length = output_end - output_start
input_end = input_length
if input_length <= 0:
print("WARNING: Cropping to proper offset resulted in empty volume.")
print(f"\tInput shape: {image.shape}, Output shape: {shape}")
print(f"\tInput offset: {input_start}, Output offset: {output_start}")
return result
input_slices.append(slice(int(input_start), int(input_end)))
output_slices.append(slice(int(output_start), int(output_end)))
# Copy the data
result[*output_slices] = image[*input_slices]
return result
else:
return image
[docs]
def zip_submission(zarr_path: str | UPath = SUBMISSION_PATH):
"""
(Re-)Zip a submission zarr file.
Args:
zarr_path (str | UPath): The path to the submission zarr file (ending with `<filename>.zarr`). `.zarr` will be replaced with `.zip`.
"""
zarr_path = UPath(zarr_path)
if not zarr_path.exists():
raise FileNotFoundError(f"Submission zarr file not found at {zarr_path}")
zip_path = zarr_path.with_suffix(".zip")
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(zarr_path, followlinks=True):
for file in files:
file_path = os.path.join(root, file)
# Ensure symlink targets are added as files
if os.path.islink(file_path):
file_path = os.readlink(file_path)
# Define the relative path in the zip archive
arcname = os.path.relpath(file_path, zarr_path)
zipf.write(file_path, arcname)
print(f"Zipped {zarr_path} to {zip_path}")
return zip_path
if __name__ == "__main__":
# When called on the commandline, evaluate the submission
# example usage: python evaluate.py submission.zip
argparser = argparse.ArgumentParser()
argparser.add_argument(
"submission_file", help="Path to submission zip file to score"
)
argparser.add_argument(
"result_file",
nargs="?",
help="If provided, store submission results in this file. Else print them to stdout",
)
argparser.add_argument(
"--truth-path", default=TRUTH_PATH, help="Path to zarr containing ground truth"
)
args = argparser.parse_args()
score_submission(args.submission_file, args.result_file, args.truth_path)