import sys
import uuid
import warnings
from collections import OrderedDict, defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from rich.console import Console
from darwin.utils import attempt_decode
console = Console()
try:
import cc3d
import nibabel as nib
from scipy.ndimage import zoom
except ImportError:
import_fail_string = r"""
You must install darwin-py with pip install darwin-py\[medical]
in order to import using nifti format
"""
console.print(import_fail_string)
sys.exit(1)
import numpy as np
from jsonschema import validate
from upolygon import find_contours
import darwin.datatypes as dt
from darwin.importer.formats.nifti_schemas import nifti_import_schema
[docs]
def parse_path(
path: Path,
legacy_remote_file_slot_affine_maps: Dict[Path, Dict[str, Any]] = {},
pixdims_and_primary_planes: Dict[Path, Dict[str, Tuple[List[float], str]]] = {},
) -> Optional[List[dt.AnnotationFile]]:
"""
Parses the given ``nifti`` file and returns a ``List[dt.AnnotationFile]`` with the parsed
information.
Parameters
----------
path : Path
The ``Path`` to the ``nifti`` file.
legacy_remote_file_slot_affine_maps : Optional[Dict[Path, Dict[str, Any]]]
A dictionary of remote file full paths to their slot affine maps
remote_file_pixdims : Optional[Dict[Path, Dict[str, Tuple[List[float], str]]]]
A dictionary of remote file full paths to their pixel dimensions and primary plane
Returns
-------
Optional[List[dt.AnnotationFile]]
Returns ``None`` if the given file is not in ``json`` format, or ``List[dt.AnnotationFile]``
otherwise.
"""
if not isinstance(path, Path):
path = Path(path)
if path.suffix != ".json":
console.print(
"Skipping file: {} (not a json file)".format(path), style="bold yellow"
)
return None
data = attempt_decode(path)
try:
validate(data, schema=nifti_import_schema)
except Exception:
console.print(
"Skipping file: {} (invalid json file, see schema for details)".format(
path
),
style="bold yellow",
)
return None
nifti_annotations = data.get("data")
if nifti_annotations is None or nifti_annotations == []:
console.print(
"Skipping file: {} (no data found)".format(path), style="bold yellow"
)
return None
annotation_files = []
for nifti_annotation in nifti_annotations:
remote_file_path = Path(nifti_annotation["image"])
if not str(remote_file_path).startswith("/"):
remote_file_path = Path("/" + str(remote_file_path))
annotation_file = _parse_nifti(
Path(nifti_annotation["label"]),
Path(nifti_annotation["image"]),
path,
class_map=nifti_annotation.get("class_map"),
mode=nifti_annotation.get("mode", "image"),
slot_names=nifti_annotation.get("slot_names", []),
is_mpr=nifti_annotation.get("is_mpr", False),
remote_file_path=remote_file_path,
legacy_remote_file_slot_affine_maps=legacy_remote_file_slot_affine_maps,
pixdims_and_primary_planes=pixdims_and_primary_planes,
)
annotation_files.append(annotation_file)
return annotation_files
def _parse_nifti(
nifti_path: Path,
filename: Path,
json_path: Path,
class_map: Dict,
mode: str,
slot_names: List[str],
is_mpr: bool,
remote_file_path: Path,
legacy_remote_file_slot_affine_maps: Dict[Path, Dict[str, Any]] = {},
pixdims_and_primary_planes: Dict[Path, Dict[str, Tuple[List[float], str]]] = {},
) -> dt.AnnotationFile:
img = process_nifti(
nib.load(nifti_path),
remote_file_path=remote_file_path,
legacy_remote_file_slot_affine_maps=legacy_remote_file_slot_affine_maps,
)
legacy = remote_file_path in legacy_remote_file_slot_affine_maps
processed_class_map = process_class_map(class_map)
video_annotations = []
if mode == "instances": # For each instance produce a video annotation
for class_name, class_idxs in processed_class_map.items():
if class_name == "background":
continue
class_img = np.isin(img, class_idxs).astype(np.uint8)
cc_img, num_labels = cc3d.connected_components(class_img, return_N=True)
for instance_id in range(1, num_labels):
_video_annotations = get_polygon_video_annotations(
cc_img,
class_idxs=[instance_id],
class_name=class_name,
slot_names=slot_names,
is_mpr=is_mpr,
pixdims_and_primary_planes=pixdims_and_primary_planes,
remote_file_path=remote_file_path,
legacy=legacy,
)
if _video_annotations:
video_annotations += _video_annotations
elif mode == "video": # For each class produce a single video annotation
for class_name, class_idxs in processed_class_map.items():
if class_name == "background":
continue
_video_annotations = get_polygon_video_annotations(
img,
class_idxs=class_idxs,
class_name=class_name,
slot_names=slot_names,
is_mpr=is_mpr,
pixdims_and_primary_planes=pixdims_and_primary_planes,
remote_file_path=remote_file_path,
legacy=legacy,
)
if _video_annotations is None:
continue
video_annotations += _video_annotations
elif mode == "mask":
video_annotations = get_mask_video_annotations(
img,
processed_class_map,
slot_names,
pixdims_and_primary_planes=pixdims_and_primary_planes,
remote_file_path=remote_file_path,
isotropic=legacy,
)
if mode in ["video", "instances"]:
annotation_classes = {
dt.AnnotationClass(class_name, "polygon", "polygon")
for class_name in class_map.values()
}
elif mode == "mask":
annotation_classes = {
dt.AnnotationClass(class_name, "mask", "mask")
for class_name in class_map.values()
}
remote_path = "/" if filename.parent == "." else filename.parent
filename = Path(filename.name)
return dt.AnnotationFile(
path=json_path,
filename=str(filename),
remote_path=str(remote_path),
annotation_classes=annotation_classes,
annotations=video_annotations,
slots=[
dt.Slot(
name=slot_name,
type="dicom",
source_files=[dt.SourceFile(file_name=str(filename), url=None)],
)
for slot_name in slot_names
],
)
[docs]
def get_polygon_video_annotations(
volume: np.ndarray,
class_name: str,
class_idxs: List[int],
slot_names: List[str],
is_mpr: bool,
pixdims_and_primary_planes: Dict[Path, Dict[str, Tuple[List[float], str]]],
remote_file_path: Path,
legacy: bool = False,
) -> Optional[List[dt.VideoAnnotation]]:
if not is_mpr:
if remote_file_path in pixdims_and_primary_planes:
item_pixdims_and_primary_planes = pixdims_and_primary_planes[
remote_file_path
]
if slot_names:
slot_name = slot_names[0]
else:
slot_name = list(item_pixdims_and_primary_planes.keys())[0]
slot_pixims_and_primary_plane = item_pixdims_and_primary_planes[slot_name]
primary_plane = slot_pixims_and_primary_plane[1]
pixdims = slot_pixims_and_primary_plane[0]
else:
pixdims = [1.0, 1.0, 1.0]
primary_plane = "AXIAL"
if primary_plane == "AXIAL":
view_idx = 2
elif primary_plane == "CORONAL":
view_idx = 1
elif primary_plane == "SAGITTAL":
view_idx = 0
return nifti_to_video_polygon_annotation(
volume,
class_name,
class_idxs,
slot_names,
pixdims,
view_idx,
legacy=legacy,
)
elif is_mpr and len(slot_names) == 3:
video_annotations = []
for view_idx, slot_name in enumerate(slot_names):
pixdims = pixdims_and_primary_planes[remote_file_path][slot_name][0]
_video_annotations = nifti_to_video_polygon_annotation(
volume,
class_name,
class_idxs,
[slot_name],
pixdims,
legacy=legacy,
view_idx=view_idx,
)
video_annotations += _video_annotations
return video_annotations
else:
raise Exception("If is_mpr is True, slot_names must be of length 3")
[docs]
def get_mask_video_annotations(
volume: np.ndarray,
processed_class_map: Dict,
slot_names: List[str],
pixdims_and_primary_planes: Dict[Path, Dict[str, Tuple[List[float], str]]],
remote_file_path: Path,
isotropic: bool = False,
) -> Optional[List[dt.VideoAnnotation]]:
"""
The function takes a volume and a class map and returns a list of video annotations
We write a single raster layer for the volume but K mask annotations, where K is the number of classes.
"""
if remote_file_path in pixdims_and_primary_planes:
item_pixdims_and_primary_planess = pixdims_and_primary_planes[remote_file_path]
if slot_names:
slot_name = slot_names[0]
else:
slot_name = list(item_pixdims_and_primary_planess.keys())[0]
slot_pixdims_and_primary_planes = item_pixdims_and_primary_planess[slot_name]
primary_plane = slot_pixdims_and_primary_planes[1]
pixdims = slot_pixdims_and_primary_planes[0]
else:
pixdims = [1.0, 1.0, 1.0]
primary_plane = "AXIAL"
frame_annotations = OrderedDict()
all_mask_annotations = defaultdict(lambda: OrderedDict())
# This is a dictionary of class_names to generated mask_annotation_ids
mask_annotation_ids = {
class_name: str(uuid.uuid4()) for class_name in processed_class_map.keys()
}
# We need to create a new mapping dictionary where the keys are the mask_annotation_ids
# and the values are the new integers which we use in the raster layer
if primary_plane == "AXIAL":
view_idx = 2
elif primary_plane == "CORONAL":
view_idx = 1
elif primary_plane == "SAGITTAL":
view_idx = 0
mask_annotation_ids_mapping = {}
map_from_nifti_idx_to_raster_idx = {0: 0} # 0 is the background class
for i in range(volume.shape[view_idx]):
if view_idx == 2:
slice_mask = volume[:, :, i].astype(np.uint8)
elif view_idx == 1:
slice_mask = volume[:, i, :].astype(np.uint8)
elif view_idx == 0:
slice_mask = volume[i, :, :].astype(np.uint8)
for raster_idx, (class_name, class_idxs) in enumerate(
processed_class_map.items()
):
if class_name == "background":
continue
class_mask = np.isin(slice_mask, class_idxs).astype(np.uint8).copy()
if class_mask.sum() == 0:
continue
all_mask_annotations[class_name][i] = dt.make_mask(
class_name, subs=None, slot_names=slot_names
)
all_mask_annotations[class_name][i].id = mask_annotation_ids[class_name]
mask_annotation_ids_mapping[mask_annotation_ids[class_name]] = (
raster_idx + 1
)
for class_idx in class_idxs:
map_from_nifti_idx_to_raster_idx[class_idx] = raster_idx + 1
# Now that we've created all the mask annotations, we need to create the raster layer
# We only map the mask_annotation_ids which appear in any given frame.
axial_size_after_isotropic_scaling = get_new_axial_size(volume, pixdims)
for i in range(volume.shape[view_idx]):
if view_idx == 2:
slice_mask = volume[:, :, i].astype(np.uint8)
elif view_idx == 1:
slice_mask = volume[:, i, :].astype(np.uint8)
elif view_idx == 0:
slice_mask = volume[i, :, :].astype(np.uint8)
if isotropic:
slice_mask = zoom(
slice_mask,
(
axial_size_after_isotropic_scaling[0] / volume.shape[0],
axial_size_after_isotropic_scaling[1] / volume.shape[1],
),
)
# We need to convert from nifti_idx to raster_idx
slice_mask = np.vectorize(
lambda key: map_from_nifti_idx_to_raster_idx.get(key, 0)
)(slice_mask)
dense_rle = convert_to_dense_rle(slice_mask)
raster_annotation = dt.make_raster_layer(
class_name="__raster_layer__",
mask_annotation_ids_mapping=mask_annotation_ids_mapping,
total_pixels=slice_mask.size,
dense_rle=dense_rle,
slot_names=slot_names,
)
frame_annotations[i] = raster_annotation
all_frame_ids = list(frame_annotations.keys())
if not all_frame_ids:
return None
if len(all_frame_ids) == 1:
segments = [[all_frame_ids[0], all_frame_ids[0] + 1]]
elif len(all_frame_ids) > 1:
segments = [[min(all_frame_ids), max(all_frame_ids)]]
raster_video_annotation = dt.make_video_annotation(
frame_annotations,
keyframes={f_id: True for f_id in all_frame_ids},
segments=segments,
interpolated=False,
slot_names=slot_names,
)
mask_video_annotations = []
for class_name, mask_annotations in all_mask_annotations.items():
mask_video_annotation = dt.make_video_annotation(
mask_annotations,
keyframes={f_id: True for f_id in all_frame_ids},
segments=segments,
interpolated=False,
slot_names=slot_names,
)
mask_video_annotation.id = mask_annotation_ids[class_name]
mask_video_annotations.append(mask_video_annotation)
return [raster_video_annotation] + mask_video_annotations
[docs]
def nifti_to_video_polygon_annotation(
volume: np.ndarray,
class_name: str,
class_idxs: List[int],
slot_names: List[str],
pixdims: List[float],
view_idx: int,
legacy: bool = False,
) -> Optional[List[dt.VideoAnnotation]]:
frame_annotations = OrderedDict()
for i in range(volume.shape[view_idx]):
if view_idx == 2:
slice_mask = volume[:, :, i].astype(np.uint8)
_pixdims = [pixdims[0], pixdims[1]]
elif view_idx == 1:
slice_mask = volume[:, i, :].astype(np.uint8)
_pixdims = [pixdims[0], pixdims[2]]
elif view_idx == 0:
slice_mask = volume[i, :, :].astype(np.uint8)
_pixdims = [pixdims[1], pixdims[2]]
class_mask = np.isin(slice_mask, class_idxs).astype(np.uint8).copy()
if class_mask.sum() == 0:
continue
polygon = mask_to_polygon(
mask=class_mask,
class_name=class_name,
pixdims=_pixdims,
legacy=legacy,
)
if polygon is None:
continue
frame_annotations[i] = polygon
all_frame_ids = list(frame_annotations.keys())
if not all_frame_ids:
return None
if len(all_frame_ids) == 1:
segments = [[all_frame_ids[0], all_frame_ids[0] + 1]]
elif len(all_frame_ids) > 1:
segments = [[min(all_frame_ids), max(all_frame_ids) + 1]]
video_annotation = dt.make_video_annotation(
frame_annotations,
keyframes={f_id: True for f_id in all_frame_ids},
segments=segments,
interpolated=False,
slot_names=slot_names,
)
return [video_annotation]
[docs]
def mask_to_polygon(
mask: np.ndarray, class_name: str, pixdims: List[float], legacy: bool = False
) -> Optional[dt.Annotation]:
def adjust_for_pixdims(x, y, pixdims):
if legacy:
if pixdims[1] > pixdims[0]:
return {"x": y, "y": x * pixdims[1] / pixdims[0]}
elif pixdims[1] < pixdims[0]:
return {"x": y * pixdims[0] / pixdims[1], "y": x}
else:
return {"x": y, "y": x}
else:
return {"x": y * pixdims[0], "y": x * pixdims[1]}
_labels, external_paths, _internal_paths = find_contours(mask)
if len(external_paths) > 1:
paths = []
for external_path in external_paths:
# skip paths with less than 2 points
if len(external_path) // 2 <= 2:
continue
path = [
adjust_for_pixdims(x, y, pixdims)
for x, y in zip(external_path[0::2], external_path[1::2])
]
paths.append(path)
if len(paths) > 1:
polygon = dt.make_polygon(class_name, paths)
elif len(paths) == 1:
polygon = dt.make_polygon(
class_name,
point_paths=paths[0],
)
else:
return None
elif len(external_paths) == 1:
external_path = external_paths[0]
if len(external_path) < 6:
return None
polygon = dt.make_polygon(
class_name,
point_paths=[
adjust_for_pixdims(x, y, pixdims)
for x, y in zip(external_path[0::2], external_path[1::2])
],
)
else:
return None
return polygon
[docs]
def process_class_map(class_map):
"""
This function takes a class_map and returns a dictionary with the class names as keys and
all the corresponding class indexes as values.
"""
processed_class_map = defaultdict(list)
for key, value in class_map.items():
processed_class_map[value].append(int(key))
return processed_class_map
[docs]
def affine_to_spacing(
affine: np.ndarray, r: int = 3, dtype=float, suppress_zeros: bool = True
) -> np.ndarray:
"""
Copied over from monai.data.utils - https://docs.monai.io/en/stable/_modules/monai/data/utils.html
Computing the current spacing from the affine matrix.
Args:
affine: a d x d affine matrix.
r: indexing based on the spatial rank, spacing is computed from `affine[:r, :r]`.
dtype: data type of the output.
suppress_zeros: whether to surpress the zeros with ones.
Returns:
an `r` dimensional vector of spacing.
"""
spacing = np.sqrt(np.sum(affine[:r, :r] * affine[:r, :r], axis=0))
if suppress_zeros:
spacing[spacing == 0] = 1.0
return spacing
[docs]
def process_nifti(
input_data: nib.nifti1.Nifti1Image,
ornt: Optional[List[List[float]]] = [[0.0, -1.0], [1.0, -1.0], [2.0, -1.0]],
remote_file_path: Path = Path("/"),
legacy_remote_file_slot_affine_maps: Dict[Path, Dict[str, Any]] = {},
) -> Tuple[np.ndarray, Tuple[float], str]:
"""
Converts a NifTI object of any orientation to the passed ornt orientation.
The default ornt is LPI.
Files that were uploaded before the `MED_2D_VIEWER` feature are `legacy`. Non-legacy
files are uploaded and re-oriented to the `LPI` orientation. Legacy files
files were treated differently:
- Legacy NifTI files were re-oriented to `LPI`, but their
affine was stored as `RAS`, which is the opposite orientation. However, because
their pixel data is stored in `LPI`, we can treat them the same way as non-legacy
files.
- Legacy DICOM files were not always re-oriented to `LPI`. We therefore use the
affine of the dataset item from `slot_affine_map` to re-orient the NifTI file to
be imported
Args:
input_data: nibabel NifTI object.
ornt: (n,2) orientation array. It defines a transformation to LPI
ornt[N,1] is a flip of axis N of the array, where 1 means no flip and -1 means flip.
ornt[:,0] is the transpose that needs to be done to the implied array, as in arr.transpose(ornt[:,0]).
remote_file_path: Path
The full path of the remote file
legacy_remote_file_slot_affine_maps: Dict[Path, Dict[str, Any]]
A dictionary of remote file full paths to their slot affine maps
Returns:
data_array: pixel array with orientation ornt.
"""
img = correct_nifti_header_if_necessary(input_data)
orig_ax_codes = nib.orientations.aff2axcodes(img.affine)
orig_ornt = nib.orientations.axcodes2ornt(orig_ax_codes)
is_dicom = remote_file_path.suffix.lower() == ".dcm"
if remote_file_path in legacy_remote_file_slot_affine_maps and is_dicom:
slot_affine_map = legacy_remote_file_slot_affine_maps[remote_file_path]
affine = slot_affine_map[next(iter(slot_affine_map))] # Take the 1st slot
ax_codes = nib.orientations.aff2axcodes(affine)
ornt = nib.orientations.axcodes2ornt(ax_codes)
transform = nib.orientations.ornt_transform(orig_ornt, ornt)
reoriented_img = img.as_reoriented(transform)
data_array = reoriented_img.get_fdata()
return data_array
[docs]
def convert_to_dense_rle(raster: np.ndarray) -> List[int]:
dense_rle, prev_val, cnt = [], None, 0
for val in raster.T.flat:
if val == prev_val:
cnt += 1
else:
if prev_val is not None:
dense_rle.extend([int(prev_val), int(cnt)])
prev_val, cnt = val, 1
dense_rle.extend([int(prev_val), int(cnt)])
return dense_rle
[docs]
def get_new_axial_size(
volume: np.ndarray,
pixdims: List[float],
isotropic: bool = False,
) -> Tuple[int, int]:
"""Get the new size of the Axial plane after resizing to isotropic pixel dimensions.
Args:
volume: Input volume.
pixdims: The pixel dimensions / spacings of the volume.
isotropic: bool, default: True
If True, the function will attempt to resize the annotations to isotropic pixel dimensions.
If False, the function will not attempt to resize the annotations to isotropic pixel dimensions.
Returns:
Tuple[int, int]: The new size of the Axial plane.
"""
original_size = volume.shape
if not isotropic:
return original_size[0], original_size[1]
original_spacing = pixdims
min_spacing = min(pixdims[0], pixdims[1])
return (
int(round(original_size[0] * (original_spacing[0] / min_spacing))),
int(round(original_size[1] * (original_spacing[1] / min_spacing))),
)