Source code for brainlit.utils.session

# Reference: https://github.com/neurodata/mouselight_code/blob/region_growing/src/ngl_pipeline.py

import numpy as np
from cloudvolume import CloudVolume
from cloudvolume.lib import Bbox
from cloudvolume.exceptions import InfoUnavailableError
from brainlit.utils import Neuron_trace
from brainlit.algorithms.generate_fragments import tube_seg
import warnings
import networkx as nx
from typing import Optional, List, Union, Tuple
from brainlit.utils.util import (
    check_type,
    check_precomputed,
    check_iterable_type,
    check_iterable_nonnegative,
)
from collections.abc import Iterable

Bounds = Union[Bbox, Tuple[int, int, int, int, int, int]]


[docs]class NeuroglancerSession: """Utility class which pulls and pushes data. Arguments: url: Precompued path either to a file URI or url URI. Defaults to mouselight brain1. mip: Resolution level to pull and push data at. Defaults to 0, the highest resolution. url_segments: Precomputed path to segmentation data. Optional, default None. fill_missing: Always passes directly into 'CloudVolume()' function to fill missing segent/image values with 0s. use_https: Always passes directly into 'CloudVolume()' function to set use_https to the desired value. Attributes: url: CloudVolumePrecomputedPath to image data. url_segments: CloudVolumePrecomputedPath to segmentation data. Optional, default None. Automatically tries precomputed path url+"_segments" if None. cv (CloudVolumePrecomputed): CloudVolume object for image data. cv_segments (CloudVolumePrecomputed): CloudVolume object for segmentation data. Optional, default None. cv_annotations (CloudVolumePrecomputed): CloudVolume object for segmentation data. Optional, default None. mip: Resolution level. chunk_size: The chunk size of the volume at the specified mip, given as (x, y, z). scales: The resolution of the volume at the specified mip, given as (x, y, z). """ def __init__( self, url: str, # = "s3://open-neurodata/brainlit/brain1" mip: int = 0, url_segments: Optional[str] = None, fill_missing: bool = True, use_https: bool = False, ): check_precomputed(url) check_type(mip, (int, np.integer)) self.url = url self.use_https = use_https self.cv = CloudVolume( url, parallel=False, fill_missing=fill_missing, use_https=self.use_https ) if mip < 0 or mip >= len(self.cv.scales): raise ValueError(f"{mip} should be between 0 and {len(self.cv.scales)}.") self.mip = mip self.fill_missing = fill_missing self.chunk_size = self.cv.scales[self.mip]["chunk_sizes"][0] self.scales = self.cv.scales[self.mip]["resolution"] self.url_segments = url_segments if url_segments is None: try: # default is to add _segments self.cv_segments = CloudVolume( url + "_segments", parallel=False, fill_missing=fill_missing, use_https=self.use_https, ) self.url_segments = url + "_segments" except InfoUnavailableError: warnings.warn( UserWarning( f"Segmentation volume not found at {self.url_segments}, defaulting to None." ) ) self.cv_segments = None else: check_precomputed(url_segments) self.cv_segments = CloudVolume( url_segments, parallel=False, fill_missing=fill_missing, use_https=self.use_https, ) def _get_voxel(self, seg_id: int, v_id: int) -> Tuple[int, int, int]: """Gets coordinates of segment vertex, in voxel space. Arguments: seg_id: The id of the segment to use. v_id: The id of the vertex to use from the given segment. Returns: voxel: The voxel coordinates in (x, y, z) voxel space. """ check_type(seg_id, (int, np.integer)) check_type(v_id, (int, np.integer)) if self.cv_segments is None: raise ValueError("Cannot get voxel without segmentation data") seg = self.cv_segments.skeleton.get(seg_id).vertices if v_id < 0 or v_id >= len(seg): raise ValueError(f"{v_id} should be between 0 and {len(seg)}.") vertex = seg[v_id] voxel = np.round( np.divide(vertex, self.cv_segments.scales[self.mip]["resolution"]) ).astype(int) return voxel
[docs] def set_url_segments(self, seg_url: str): """Sets the url_segments and cv_segments attributes. Arguments: seg_url: CloudvolumePrecomputedPath to segmentation data. """ check_precomputed(seg_url) self.url_segments = seg_url self.cv_segments = CloudVolume( self.url_segments, parallel=False, fill_missing=self.fill_missing, use_https=self.use_https, )
[docs] def get_segments( self, seg_id: int, bbox: Optional[Bounds] = None, rounding: Optional[bool] = True, ) -> nx.Graph: """Get a graph of a segmentation annotation within a bounding box. Arguments: seg_id The segement to pull. bbox: The bounding box object, default None. If None, uses entire volume. rounding: Optional, default True. Whether you want S3 file to be rounded or not. Returns: G: A networkx subgraph from the specified segment and bounding box. """ check_type(seg_id, (int, np.integer)) check_type(rounding, bool) if self.cv_segments is None: raise ValueError("Cannot get segments without segmentation data.") s3_trace = Neuron_trace.NeuronTrace( self.url_segments, seg_id, self.mip, rounding, use_https=self.use_https ) G = s3_trace.get_graph() paths = s3_trace.get_paths() if bbox is not None: if isinstance(bbox, Bbox): bbox = bbox.to_list() check_iterable_type(bbox, (int, np.integer)) check_iterable_nonnegative(bbox) G = s3_trace.get_sub_neuron([bbox[:3], bbox[3:]]) paths = s3_trace.get_sub_neuron_paths([bbox[:3], bbox[3:]]) return [G, paths]
[docs] def create_tubes( self, seg_id: Union[int, float], bbox: Bounds, radius: Optional[int] = None, ): """Creates voxel-wise foreground/background labels associated with a particular neuron trace, within a given bounding box of voxel coordinates. Arguments: seg_id: The id of the .swc file. bbox: The bounding box to draw tubes within. radius: Euclidean distance threshold used to draw tubes, default None = 1 px thick. rounding: Optional, bool, default is True. False if no swc rounding. Returns: labels: A volume within the bounding box, with 1 on tubes and 0 elsewhere. """ if self.cv_segments is None: raise ValueError("Cannot get segments without segmentation data.") check_type(seg_id, int) if radius is not None: check_type(radius, (int, np.integer, float, np.double)) if radius <= 0: raise ValueError("Radius must be positive.") # s3_trace = NeuronTrace(self.url_segments,seg_id,self.mip,rounding) # paths = s3_trace.get_paths(bbox) G_paths = self.get_segments(seg_id, bbox) paths = G_paths[1] if isinstance(bbox, Bbox): bbox = bbox.to_list() check_iterable_type(bbox, (int, np.integer)) check_iterable_nonnegative(bbox) labels = tube_seg.tubes_from_paths( np.subtract(bbox[3:], bbox[:3]), paths, radius ) return labels
[docs] def pull_voxel( self, seg_id: int, v_id: int, radius: int = 1 ) -> Tuple[np.ndarray, Bbox, np.ndarray]: """Pull a subvolume around a specified skeleton vertex with of shape [2r+1, 2r+1, 2r+1], in voxels. Arguments: seg_id: ID of the segment to use, depends on data in s3. v_id: ID of the vertex to use, depends on the segment. radius: Radius of pulled volume around central voxel, in voxels. Optional, default is 1 (3x3 volume is pulled, centered at the vertex). Returns: img: A 2*nx+1 X 2*ny+1 X 2*nz+1 volume. bounds: Bounding box object which contains the bounds of the volume. vox_in_img: List of coordinates which locate the initial point in the subvolume. """ check_type(radius, (int, np.integer)) if radius < 0: raise ValueError(f"{radius} should be nonnegative.") voxel = self._get_voxel(seg_id, v_id) # does type checking for seg_id and v_id bounds = Bbox(voxel, voxel) seed = bounds.to_list() shape = [radius] * 3 bounds = Bbox(np.subtract(seed[:3], shape), np.add(np.add(seed[3:], shape), 1)) img = self.pull_bounds_img(bounds) # img = self.cv.download(bounds, mip=self.mip) vox_in_img = voxel - np.array(bounds.to_list()[:3]) return np.squeeze(np.array(img)), bounds, vox_in_img
[docs] def pull_vertex_list( self, seg_id: int, v_id_list: List[int], buffer: List[int] = [1, 1, 1], expand: bool = False, ) -> Tuple[np.ndarray, Bbox, List[Tuple[int, int, int]]]: """Pull a subvolume containing all listed vertices. Arguments: seg_id: ID of the segment to use, depends on data in s3. v_id_list: list of vertex IDs to use. buffer: Buffer around the bounding box (in voxels). Can be int or list of ints. Default [1, 1, 1], set to [0, 0, 0] if expand is True. expand: Flag whether to expand subvolume to closest set of chunks. Returns: img: The image volume containing all vertices. bounds: Bounding box object which contains the bounds of the volume. vox_in_img_list: List of coordinates which locate the vertices in the volume. """ check_type(seg_id, (int, np.integer)) check_iterable_type(v_id_list, (int, np.integer)) check_type(expand, bool) if expand: buffer = 0 if not isinstance(buffer, Iterable): buffer = [buffer] * 3 check_iterable_type(buffer, (int, np.integer)) check_iterable_nonnegative(buffer) voxel_list = [self._get_voxel(seg_id, i) for i in v_id_list] if len(voxel_list) == 1: # edge case of 1 vertex bounds = Bbox(voxel_list[0] - buffer, voxel_list[0] + buffer + 1) else: voxel_list = np.array(voxel_list) lower = list(np.min(voxel_list, axis=0) - buffer) higher = list(np.max(voxel_list, axis=0) + buffer + 1) bounds = Bbox(lower, higher) if expand: bounds = bounds.expand_to_chunk_size(self.chunk_size) vox_in_img_list = np.array(voxel_list) - bounds.to_list()[:3] img = self.pull_bounds_img(bounds) return img, bounds, vox_in_img_list
[docs] def pull_chunk( self, seg_id: int, v_id: int, radius: int = 0, ) -> Tuple[np.ndarray, Bbox, Tuple[int, int, int]]: """Pull a subvolume around a specified skeleton vertex according to chunk size. Each data set has a specified chunk size, which can be found by calling self.cv.info. Arguments: seg_id: ID of the segment to use, depends on data in s3. v_id: ID of the vertex to use, depends on the segment. radius: Radius of pulled volume around central chunk, in chunks. Optional, default is 0 (single chunk which contains the voxel). Returns: img: A chunk_size[0]*2*nx X chunk_size[1]*2*ny X chunk_size[2]*2*nz volume. bounds: Bounding box object which contains the bounds of the volume. vox_in_img: List of coordinates which locate the initial point in the volume. """ check_type(seg_id, (int, np.integer)) check_type(v_id, (int, np.integer)) check_type(radius, (int, np.integer)) if radius < 0: raise ValueError(f"Radius of {radius} should be nonnegative.") voxel = self._get_voxel(seg_id, v_id) bounds = Bbox(voxel, voxel + 1).expand_to_chunk_size(self.chunk_size) seed = bounds.to_list() shape = [ self.chunk_size[0] * radius, self.chunk_size[1] * radius, self.chunk_size[2] * radius, ] bounds = Bbox(np.subtract(seed[:3], shape), np.add(seed[3:], shape)) img = self.pull_bounds_img(bounds) vox_in_img = voxel - np.array(bounds.to_list()[:3]) return img, bounds, vox_in_img
[docs] def pull_bounds_img(self, bounds: Bounds) -> np.ndarray: """Pull a volume around a specified bounding box. Works on image channels. Arguments: bounds: Bounding box, or tuple containing (x0, y0, z0, x1, y1, z1) bounds. Returns: img: Volume pulled according to the bounding box. """ if isinstance(bounds, Bbox): bounds = bounds.to_list() check_iterable_type(bounds, (int, np.integer)) check_iterable_nonnegative(bounds) img = self.cv.download(Bbox(bounds[:3], bounds[3:]), mip=self.mip) img = np.squeeze(np.array(img)) if len(img.shape) < 3: # in case a dimension with size 1 got squeezed away sz = np.subtract(bounds[3:], bounds[:3]) for i, s in enumerate(sz): if s == 1: img = np.expand_dims(img, axis=i) return img
[docs] def pull_bounds_seg(self, bounds: Bounds) -> np.ndarray: """Pull a volume around a specified bounding box. Works on annotation channels. Arguments: bounds: Bounding box, or tuple containing (x0, y0, z0, x1, y1, z1) bounds. Returns: img: Volume pulled according to the bounding box. """ raise NotImplementedError("Annotation channels not supported.")
# if isinstance(bounds, Bbox): # bounds = bounds.to_list() # check_iterable_type(bounds, (int, np.integer)) # check_iterable_nonnegative(bounds) # if self.cv_annotations is None: # raise ValueError("Cannot pull from undefined annotation layer.") # img = self.cv_annotations[Bbox(bounds[:3], bounds[3:])] # return np.squeeze(np.array(img))
[docs] def push( self, img: np.ndarray, bounds: Bounds, ): """Push a volume to an annotation channel. Arguments: img : Volume to push bounds : Bounding box or tuple containing (x0, y0, z0, x1, y1, z1) bounds. """ raise NotImplementedError("Annotation channels not supported.")
# if not isinstance(img, np.ndarray): # raise TypeError(f"Image should be numpy array..") # if (img == 0).all(): # raise ValueError(f"Should not push an empty volume of all 0.") # if isinstance(bounds, Bbox): # bounds = bounds.to_list() # check_iterable_type(bounds, (int, np.integer)) # check_iterable_nonnegative(bounds) # if self.cv_annotations is None: # raise ValueError("Cannot pull from undefined annotation layer.") # self.cv_annotations[Bbox(bounds[:3], bounds[3:])] = img.astype("uint64")