Source code for cellmap_segmentation_challenge.evaluate

import argparse
import json
import zipfile
import numpy as np
from skimage.measure import label as relabel
from scipy.optimize import linear_sum_assignment
from scipy.spatial.distance import dice  # , jaccard
from sklearn.metrics import (
    jaccard_score,
    accuracy_score,
)
from skimage.metrics import hausdorff_distance

import zarr
import os
from upath import UPath
from tqdm import tqdm


INSTANCE_CLASSES = [
    "nuc",
    "vim",
    "ves",
    "endo",
    "lyso",
    "ld",
    "perox",
    "mito",
    "np",
    "mt",
    "cell",
    "instance",
]
HAUSDORFF_DISTANCE_MAX = np.inf

TRUTH_PATH = UPath("data/ground_truth.zarr").path


[docs] def unzip_file(zip_path): """ Unzip a zip file to a specified directory. Args: zip_path (str): The path to the zip file. Example usage: unzip_file('submission.zip') """ extract_path = UPath(zip_path).parent with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(extract_path) saved_path = [ UPath(file).path for file in zip_ref.namelist() if UPath(file).suffix == ".zarr" ][0] print(f"Unzipped {zip_path} to {extract_path} / {saved_path}") return extract_path / saved_path
[docs] def save_numpy_class_labels_to_zarr( save_path, test_volume_name, label_name, labels, overwrite=False ): """ Save a single 3D numpy array of class labels to a Zarr-2 file with the required structure. Args: save_path (str): The path to save the Zarr-2 file (ending with <filename>.zarr). test_volume_name (str): The name of the test volume. label_names (str): The names of the labels. labels (np.ndarray): A 3D numpy array of class labels. Example usage: # Generate random class labels, with 0 as background labels = np.random.randint(0, 4, (128, 128, 128)) save_numpy_labels_to_zarr('submission.zarr', 'test_volume', ['label1', 'label2', 'label3'], labels) """ # Create a Zarr-2 file if not UPath(save_path).exists(): os.makedirs(UPath(save_path).parent, exist_ok=True) print(f"Saving to {save_path}") store = zarr.DirectoryStore(save_path) zarr_group = zarr.group(store) # Save the test volume group zarr_group.create_group(test_volume_name, overwrite=overwrite) # Save the labels for i, label_name in enumerate(label_name): print(f"Saving {label_name}") zarr_group[test_volume_name].create_dataset( label_name, data=(labels == i + 1), chunks=(64, 64, 64), # compressor=zarr.Blosc(cname='zstd', clevel=3, shuffle=2), ) print("Done saving")
[docs] def save_numpy_class_arrays_to_zarr( save_path, test_volume_name, label_names, labels, overwrite=False ): """ Save a list of 3D numpy arrays of binary or instance labels to a Zarr-2 file with the required structure. Args: save_path (str): The path to save the Zarr-2 file (ending with <filename>.zarr). test_volume_name (str): The name of the test volume. label_names (list): A list of label names corresponding to the list of 3D numpy arrays. labels (list): A list of 3D numpy arrays of binary labels. Example usage: label_names = ['label1', 'label2', 'label3'] # Generate random binary volumes for each label labels = [np.random.randint(0, 2, (128, 128, 128)) for _ in range len(label_names)] save_numpy_binary_to_zarr('submission.zarr', 'test_volume', label_names, labels) """ # Create a Zarr-2 file if not UPath(save_path).exists(): os.makedirs(UPath(save_path).parent, exist_ok=True) print(f"Saving to {save_path}") store = zarr.DirectoryStore(save_path) zarr_group = zarr.group(store) # Save the test volume group zarr_group.create_group(test_volume_name, overwrite=overwrite) # Save the labels for i, label_name in enumerate(label_names): print(f"Saving {label_name}") zarr_group[test_volume_name].create_dataset( label_name, data=labels[i], chunks=(64, 64, 64), # compressor=zarr.Blosc(cname='zstd', clevel=3, shuffle=2), ) print("Done saving")
[docs] def score_instance( pred_label, truth_label, hausdorff_distance_max=HAUSDORFF_DISTANCE_MAX ) -> dict[str, float]: """ Score a single instance label volume against the ground truth instance label volume. Args: pred_label (np.ndarray): The predicted instance label volume. truth_label (np.ndarray): The ground truth instance label volume. Returns: dict: A dictionary of scores for the instance label volume. Example usage: scores = score_instance(pred_label, truth_label) """ # Relabel the predicted instance labels to be consistent with the ground truth instance labels print("Scoring instance segmentation...") pred_label = relabel(pred_label, connectivity=len(pred_label.shape)) # pred_label = label(pred_label) # Construct the cost matrix for Hungarian matching pred_ids = np.unique(pred_label) truth_ids = np.unique(truth_label) cost_matrix = np.zeros((len(truth_ids), len(pred_ids))) bar = tqdm(pred_ids, desc="Computing cost matrix", leave=True) for j, pred_id in enumerate(bar): if pred_id == 0: # Don't score the background continue pred_mask = pred_label == pred_id these_truth_ids = np.unique(truth_label[pred_mask]) truth_indices = [ np.argmax(truth_ids == truth_id) for truth_id in these_truth_ids ] for i, truth_id in zip(truth_indices, these_truth_ids): if truth_id == 0: # Don't score the background continue truth_mask = truth_label == truth_id cost_matrix[i, j] = jaccard_score(truth_mask.flatten(), pred_mask.flatten()) # Match the predicted instances to the ground truth instances row_inds, col_inds = linear_sum_assignment(cost_matrix, maximize=True) # Contruct the volume for the matched instances matched_pred_label = np.zeros_like(pred_label) for i, j in tqdm(zip(col_inds, row_inds), desc="Relabeled matched instances"): if pred_ids[i] == 0 or truth_ids[j] == 0: # Don't score the background continue pred_mask = pred_label == pred_ids[i] matched_pred_label[pred_mask] = truth_ids[j] hausdorff_distances = [] for truth_id in tqdm(truth_ids, desc="Computing Hausdorff distances"): if truth_id == 0: # Don't score the background continue h_dist = hausdorff_distance( truth_label == truth_id, matched_pred_label == truth_id ) h_dist = min(h_dist, hausdorff_distance_max) hausdorff_distances.append(h_dist) # Compute the scores accuracy = accuracy_score(truth_label.flatten(), matched_pred_label.flatten()) hausdorff_dist = np.mean(hausdorff_distances) normalized_hausdorff_dist = 32 ** ( -hausdorff_dist ) # normalize Hausdorff distance to [0, 1]. 32 is abritrary chosen to have a reasonable range combined_score = (accuracy * normalized_hausdorff_dist) ** 0.5 print(f"Accuracy: {accuracy:.4f}") print(f"Hausdorff Distance: {hausdorff_dist:.4f}") print(f"Normalized Hausdorff Distance: {normalized_hausdorff_dist:.4f}") print(f"Combined Score: {combined_score:.4f}") return { "accuracy": accuracy, "hausdorff_distance": hausdorff_dist, "normalized_hausdorff_distance": normalized_hausdorff_dist, "combined_score": combined_score, }
[docs] def score_semantic(pred_label, truth_label) -> dict[str, float]: """ Score a single semantic label volume against the ground truth semantic label volume. Args: pred_label (np.ndarray): The predicted semantic label volume. truth_label (np.ndarray): The ground truth semantic label volume. Returns: dict: A dictionary of scores for the semantic label volume. Example usage: scores = score_semantic(pred_label, truth_label) """ print("Scoring semantic segmentation...") # Flatten the label volumes and convert to binary pred_label = (pred_label > 0.0).flatten() truth_label = (truth_label > 0.0).flatten() # Compute the scores scores = { "iou": jaccard_score(truth_label, pred_label), "dice_score": 1 - dice(truth_label, pred_label), } print(f"IoU: {scores['iou']:.4f}") print(f"Dice Score: {scores['dice_score']:.4f}") return scores
[docs] def score_label( pred_label_path, truth_path=TRUTH_PATH, instance_classes=INSTANCE_CLASSES ) -> dict[str, float]: """ Score a single label volume against the ground truth label volume. Args: pred_label_path (str): The path to the predicted label volume. Returns: dict: A dictionary of scores for the label volume. Example usage: scores = score_label('pred.zarr/test_volume/label1') """ print(f"Scoring {pred_label_path}...") # Load the predicted and ground truth label volumes label_name = UPath(pred_label_path).name volume_name = UPath(pred_label_path).parent.name pred_label = zarr.open(pred_label_path)[:] truth_label = zarr.open((UPath(truth_path) / volume_name / label_name).path)[:] mask_path = (UPath(truth_path) / volume_name / f"{label_name}_mask").path if mask_path.exists(): # Mask out uncertain regions resulting from low-res ground truth annotations print(f"Masking {label_name} with {mask_path}...") mask = zarr.open(mask_path)[:] pred_label = pred_label * mask truth_label = truth_label * mask # Check if the label volumes have the same shape assert ( pred_label.shape == truth_label.shape ), "The predicted and ground truth label volumes must have the same shape." # Compute the scores if label_name in instance_classes: return score_instance(pred_label, truth_label) else: return score_semantic(pred_label, truth_label)
[docs] def score_volume( pred_volume_path, truth_path=TRUTH_PATH, instance_classes=INSTANCE_CLASSES ) -> dict[str, dict[str, float]]: """ Score a single volume against the ground truth volume. Args: pred_volume_path (str): The path to the predicted volume. Returns: dict: A dictionary of scores for the volume. Example usage: scores = score_volume('pred.zarr/test_volume') """ print(f"Scoring {pred_volume_path}...") pred_volume_path = UPath(pred_volume_path) # Find labels to score pred_labels = [a for a in zarr.open(pred_volume_path.path).array_keys()] volume_name = pred_volume_path.name truth_labels = [ a for a in zarr.open((UPath(truth_path) / volume_name).path).array_keys() ] labels = list(set(pred_labels) & set(truth_labels)) # Score each label scores = { label: score_label( os.path.join(pred_volume_path, label), truth_path=truth_path, instance_classes=instance_classes, ) for label in labels } scores["num_voxels"] = int( np.prod(zarr.open((pred_volume_path / labels[0]).path).shape) ) return scores
[docs] def score_submission( submission_path, result_file=None, truth_path=TRUTH_PATH, instance_classes=INSTANCE_CLASSES, ) -> dict[str, dict[str, dict[str, float]]]: """ Score a submission against the ground truth data. Args: submission_path (str): The path to the zipped submission Zarr-2 file. result_file (str): The path to save the scores. Returns: dict: A dictionary of scores for the submission. Example usage: scores = score_submission('submission.zip') The results json is a dictionary with the following structure: { "volume" (the name of the ground truth volume): { "label" (the name of the predicted class): { (For semantic segmentation) "iou": (the intersection over union score), "dice_score": (the dice score), OR (For instance segmentation) "accuracy": (the accuracy score), "haussdorf_distance": (the haussdorf distance), "normalized_haussdorf_distance": (the normalized haussdorf distance), "combined_score": (the geometric mean of the accuracy and normalized haussdorf distance), } "num_voxels": (the number of voxels in the ground truth volume), } "label_scores": { (the name of the predicted class): { (For semantic segmentation) "iou": (the mean intersection over union score), "dice_score": (the mean dice score), OR (For instance segmentation) "accuracy": (the mean accuracy score), "haussdorf_distance": (the mean haussdorf distance), "combined_score": (the mean geometric mean of the accuracy and haussdorf distance), } "overall_score": (the mean of the combined scores across all classes), } """ print(f"Scoring {submission_path}...") # Unzip the submission submission_path = unzip_file(submission_path) # Find volumes to score print(f"Scoring volumes in {submission_path}...") pred_volumes = [d.name for d in UPath(submission_path).glob("*") if d.is_dir()] print(f"Volumes: {pred_volumes}") print(f"Truth path: {truth_path}") truth_volumes = [d.name for d in UPath(truth_path).glob("*") if d.is_dir()] print(f"Truth volumes: {truth_volumes}") volumes = list(set(pred_volumes) & set(truth_volumes)) if len(volumes) == 0: raise ValueError( "No volumes found to score. Make sure the submission is formatted correctly." ) print(f"Scoring volumes: {volumes}") # Score each volume scores = { volume: score_volume( submission_path / volume, truth_path=truth_path, instance_classes=instance_classes, ) for volume in volumes } # Combine label scores across volumes, normalizing by the number of voxels print("Combining label scores...") label_scores = {} for volume in volumes: for label in scores[volume]: if label == "num_voxels": continue elif label in instance_classes: if label not in label_scores: label_scores[label] = { "accuracy": 0, "hausdorff_distance": 0, "normalized_hausdorff_distance": 0, "combined_score": 0, } label_scores[label]["accuracy"] += ( scores[volume][label]["accuracy"] / scores[volume]["num_voxels"] ) label_scores[label]["hausdorff_distance"] += ( scores[volume][label]["hausdorff_distance"] / scores[volume]["num_voxels"] ) label_scores[label]["normalized_hausdorff_distance"] += ( scores[volume][label]["normalized_hausdorff_distance"] / scores[volume]["num_voxels"] ) label_scores[label]["combined_score"] += ( scores[volume][label]["combined_score"] / scores[volume]["num_voxels"] ) else: if label not in label_scores: label_scores[label] = {"iou": 0, "dice_score": 0} label_scores[label]["iou"] += ( scores[volume][label]["iou"] / scores[volume]["num_voxels"] ) label_scores[label]["dice_score"] += ( scores[volume][label]["dice_score"] / scores[volume]["num_voxels"] ) scores["label_scores"] = label_scores # Compute the overall score print("Computing overall scores...") overall_instance_scores = [] overall_semantic_scores = [] for label in label_scores: if label in instance_classes: overall_instance_scores += [label_scores[label]["combined_score"]] else: overall_semantic_scores += [label_scores[label]["dice_score"]] scores["overall_instance_score"] = np.mean(overall_instance_scores) scores["overall_semantic_score"] = np.mean(overall_semantic_scores) scores["overall_score"] = ( scores["overall_instance_score"] * scores["overall_semantic_score"] ) ** 0.5 # geometric mean # Save the scores if result_file: print(f"Saving scores to {result_file}...") with open(result_file, "w") as f: json.dump(scores, f) else: return scores
if __name__ == "__main__": # When called on the commandline, evaluate the submission # example usage: python evaluate.py submission.zip argparser = argparse.ArgumentParser() argparser.add_argument( "submission_file", help="Path to submission zip file to score" ) argparser.add_argument( "result_file", nargs="?", help="If provided, store submission results in this file. Else print them to stdout", ) argparser.add_argument( "--truth-path", default=TRUTH_PATH, help="Path to zarr containing ground truth" ) args = argparser.parse_args() score_submission(args.submission_file, args.result_file, args.truth_path)