Source code for brainlit.utils.upload

import math
from cloudvolume import CloudVolume, Skeleton, storage
from cloudvolume.frontends.precomputed import CloudVolumePrecomputed
import numpy as np
import joblib
from joblib import Parallel, delayed, cpu_count
import joblib
from glob import glob
import argparse
from psutil import virtual_memory
from typing import Optional, Sequence, Union, Tuple, List
import contextlib

import tifffile as tf
from pathlib import Path
from brainlit.utils.Neuron_trace import NeuronTrace
from brainlit.utils.benchmarking_params import (
    brain_offsets,
    vol_offsets,
    scales,
    type_to_date,
)


import time
from tqdm.auto import tqdm
from brainlit.utils.util import (
    tqdm_joblib,
    check_type,
    check_iterable_type,
    check_size,
    check_precomputed,
    check_binary_path,
)


def get_volume_info(
    image_dir: str,
    num_resolutions: int,
    channel: Optional[int] = 0,
    extension: Optional[str] = "tif",
    benchmarking: Optional[bool] = False,
) -> Tuple[List, List, List, Tuple, List]:
    """Get filepaths along the octree-format image directory

    Arguments:
        image_dir: Filepath to HIGHEST LEVEL(lowest res) of octree dir.
        num_resolutions: Number of resolutions for which downsampling has been done.
        channel: Channel number to upload.
        extension: File extension of image files.
    Returns:
        files_ordered: List of file paths, 1st dim contains list for each res.
        paths_bin: List of binary paths, 1st dim contains lists for each res.
        vox_size: List of highest resolution voxel sizes (nm).
        tiff_dims: (x,y,z) voxel dimensions for a single tiff image.
    """
    check_type(image_dir, str)
    check_type(num_resolutions, (int, np.integer))
    check_type(channel, (int, np.integer))
    check_type(extension, str)
    if num_resolutions < 1:
        raise ValueError(f"Number of resolutions should be > 0, not {num_resolutions}")
    check_type(channel, (int, np.integer))
    if channel < 0:
        raise ValueError(f"Channel should be >= 0, not {channel}")
    check_type(extension, str)
    if extension not in ["tif"]:
        raise ValueError(f"{extension} should be 'tif'")

    def RepresentsInt(s):
        try:
            int(s)
            return True
        except ValueError:
            return False

    p = Path(image_dir)

    if benchmarking == True:
        files = [i.parts for i in list(p.glob(f"*.{extension}"))]
    else:
        files = [i.parts for i in p.rglob(f"*.{channel}.{extension}")]

    parent_dirs = len(p.parts)

    files_ordered = [
        [i for i in files if len(i) == j + parent_dirs + 1]
        for j in range(num_resolutions)
    ]
    paths_bin = [
        [[f"{int(j)-1:03b}" for j in k if len(j) == 1 and RepresentsInt(j)] for k in i]
        for i in files_ordered
    ]
    for i, resolution in enumerate(files_ordered):
        for j, filepath in enumerate(resolution):
            files_ordered[i][j] = str(Path(*filepath))

    if benchmarking == True:
        img_size = np.squeeze(tf.imread(str(p / files[0][parent_dirs]))).T.shape
        vox_size = img_size

        # Getting scaling parameters
        f = p.parts[4].split("_")
        image = f[0]
        date = type_to_date[image]
        num = int(f[1])
        scale = scales[date]
        brain_offset = brain_offsets[date]
        vol_offset = vol_offsets[date][num]
        origin = np.add(brain_offset, vol_offset)

    else:

        img_size = np.squeeze(tf.imread(str(p / "default.0.tif"))).T.shape
        transform = open(str(p / "transform.txt"), "r")
        vox_size = [
            float(s[4:].rstrip("\n")) * (0.5 ** (num_resolutions - 1))
            for s in transform.readlines()
            if "s" in s
        ]
        transform = open(str(p / "transform.txt"), "r")
        origin = [
            int(o[4:].rstrip("\n")) / 1000 for o in transform.readlines() if "o" in o
        ]

    return files_ordered, paths_bin, vox_size, img_size, origin


def create_cloud_volume(
    precomputed_path: str,
    img_size: Sequence[int],
    voxel_size: Sequence[Union[int, float]],
    num_resolutions: int,
    chunk_size: Optional[Sequence[int]] = None,
    parallel: Optional[bool] = False,
    layer_type: Optional[str] = "image",
    dtype: Optional[str] = None,
    commit_info: Optional[bool] = True,
) -> CloudVolumePrecomputed:
    """Create CloudVolume object and info file.

    Handles both image volumes and segmentation volumes from octree structure.

    Arguments:
        precomputed_path: cloudvolume path
        img_size: x, y, z voxel dimensions of tiff images.
        voxel_size: x, y, z dimensions of highest res voxel size (nm).
        num_resolutions: The number of resolutions to upload.
        chunk_size: The size of chunks to use for upload. If None, uses img_size/2.
        parallel: Whether to upload chunks in parallel.
        layer_type: The type of cloudvolume object to create.
        dtype: The data type of the volume. If None, uses default for layer type.
        commit_info: Whether to create an info file at the path, defaults to True.
    Returns:
        vol: Volume designated for upload.
    """
    # defaults
    if chunk_size is None:
        chunk_size = [int(i / 4) for i in img_size]  # /2 took 42 hrs
    if dtype is None:
        if layer_type == "image":
            dtype = "uint16"
        elif layer_type == "segmentation" or layer_type == "annotation":
            dtype = "uint64"
        else:
            raise ValueError(
                f"layer type is {layer_type}, when it should be image or str"
            )

    # check inputs
    check_precomputed(precomputed_path)
    check_size(img_size, allow_float=False)
    check_size(voxel_size)
    check_type(num_resolutions, (int, np.integer))
    if num_resolutions < 1:
        raise ValueError(f"Number of resolutions should be > 0, not {num_resolutions}")
    check_size(chunk_size)
    check_type(parallel, bool)
    check_type(layer_type, str)
    if layer_type not in ["image", "segmentation", "annotation"]:
        raise ValueError(
            f"{layer_type} should be 'image', 'segmentation', or 'annotation'"
        )
    check_type(dtype, str)
    if dtype not in ["uint16", "uint64"]:
        raise ValueError(f"{dtype} should be 'uint16' or 'uint64'")
    check_type(commit_info, bool)

    info = CloudVolume.create_new_info(
        num_channels=1,
        layer_type=layer_type,
        data_type=dtype,  # Channel images might be 'uint8'
        encoding="raw",  # raw, jpeg, compressed_segmentation, fpzip, kempressed
        resolution=voxel_size,  # Voxel scaling, units are in nanometers
        voxel_offset=[0, 0, 0],  # x,y,z offset in voxels from the origin
        chunk_size=chunk_size,  # units are voxels
        volume_size=[i * 2 ** (num_resolutions - 1) for i in img_size],
    )
    vol = CloudVolume(precomputed_path, info=info, parallel=parallel)
    [
        vol.add_scale((2**i, 2**i, 2**i), chunk_size=chunk_size)
        for i in range(num_resolutions)
    ]
    if commit_info:
        vol.commit_info()
    if layer_type == "image" or layer_type == "annotation":
        vols = [
            CloudVolume(precomputed_path, mip=i, parallel=parallel)
            for i in range(num_resolutions - 1, -1, -1)
        ]
    elif layer_type == "segmentation":
        info.update(skeletons="skeletons")

        skel_info = {
            "@type": "neuroglancer_skeletons",
            "transform": [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0],
            "vertex_attributes": [
                {"id": "radius", "data_type": "float32", "num_components": 1},
                {"id": "vertex_types", "data_type": "float32", "num_components": 1},
                {"id": "vertex_color", "data_type": "float32", "num_components": 4},
            ],
        }
        with storage.SimpleStorage(vol.cloudpath) as stor:
            stor.put_json(str(Path("skeletons") / "info"), skel_info)
        vols = [vol]
    return vols


[docs]def get_data_ranges( bin_path: List[List[str]], chunk_size: Tuple[int, int, int] ) -> Tuple[List[int], List[int], List[int]]: """Get ranges (x,y,z) for chunks to be stitched together in volume Arguments: bin_path: Binary paths to files. chunk_size: The size of chunk to get ranges over. Returns: x_range: x-coord int bounds. y_range: y-coord int bounds. z_range: z-coord int bounds. """ for b in bin_path: check_binary_path(b) check_size(chunk_size) x_curr, y_curr, z_curr = 0, 0, 0 tree_level = len(bin_path) print(bin_path) for idx, i in enumerate(bin_path): print(i) scale_factor = 2 ** (tree_level - idx - 1) x_curr += int(i[2]) * chunk_size[0] * scale_factor y_curr += int(i[1]) * chunk_size[1] * scale_factor # flip z axis so chunks go anterior to posterior z_curr += int(i[0]) * chunk_size[2] * scale_factor x_range = [x_curr, x_curr + chunk_size[0]] y_range = [y_curr, y_curr + chunk_size[1]] z_range = [z_curr, z_curr + chunk_size[2]] return x_range, y_range, z_range
def process(file_path: str, bin_path: List[str], vol: CloudVolumePrecomputed): """The parallelizable method to upload data. Loads the image into memory, and pushes it to specific ranges in the CloudVolume. Arguments: file_path: Path to the image file. bin_path: Binary path to the image file. vol: CloudVolume object to upload. """ check_type(file_path, str) check_binary_path(bin_path) check_type(vol, CloudVolumePrecomputed) array = tf.imread(file_path).T ranges = get_data_ranges(bin_path, vol.scales[-1]["size"]) vol[ ranges[0][0] : ranges[0][1], ranges[1][0] : ranges[1][1], ranges[2][0] : ranges[2][1], ] = array return def upload_volumes( input_path: str, precomputed_path: str, num_mips: int, parallel: bool = False, chosen: int = -1, benchmarking: Optional[bool] = False, continue_upload: Optional[Tuple[int, int]] = (0, 0), ): """Uploads image data from local to a precomputed path. Specify num_mips for additional resolutions. If `chosen` is used, an info file will not be generated. Arguments: input_path: The filepath to the root directory of the octree image data. precomputed_path: CloudVolume precomputed path or url. num_mips: The number of resolutions to upload. parallel: Whether to upload in parallel. Default is False. chosen: If not -1, uploads only that specific mip. Default is -1. benchmarking: For scaling purposes, true if uploading benchmarking data. Default is False. continue_upload: Used to continue an upload. Default (0, 0). The tuple (layer_idx, iter) containing layer index and iter to start from. """ check_type(input_path, str) check_precomputed(precomputed_path) check_type(num_mips, (int, np.integer)) if num_mips < 1: raise ValueError(f"Number of resolutions should be > 0, not {num_mips}") check_type(parallel, bool) check_type(chosen, int) check_type(benchmarking, bool) check_iterable_type(continue_upload, int) if chosen < -1 or chosen >= num_mips: raise ValueError(f"{chosen} should be -1, or between 0 and {num_mips-1}") if chosen != -1: commit_info = False else: commit_info = True if benchmarking == True: (files_ordered, bin_paths, vox_size, img_size, _) = get_volume_info( input_path, num_mips, benchmarking=True ) vols = create_cloud_volume( precomputed_path, img_size, vox_size, num_mips, chunk_size=[int(i) for i in img_size], parallel=parallel, layer_type="image", commit_info=commit_info, ) else: (files_ordered, bin_paths, vox_size, img_size, _) = get_volume_info( input_path, num_mips, ) vols = create_cloud_volume( precomputed_path, img_size, vox_size, num_mips, parallel=parallel, layer_type="image", commit_info=commit_info, ) num_procs = min( math.floor( virtual_memory().total / (img_size[0] * img_size[1] * img_size[2] * 8) ), cpu_count(), ) # skip already uploaded layers vols2 = vols[continue_upload[0] :] files_ordered2 = files_ordered[continue_upload[0] :] bin_paths2 = bin_paths[continue_upload[0] :] # skip already uploaded files on current layer files_ordered2[0] = files_ordered2[0][continue_upload[1] :] bin_paths2[0] = bin_paths2[0][continue_upload[1] :] start = time.time() if chosen == -1: for mip, vol in enumerate(vols2): try: with tqdm_joblib( tqdm( desc=f"Creating precomputed volume at layer index {mip+continue_upload[0]}", total=len(files_ordered2[mip]), ) ) as progress_bar: Parallel(num_procs, timeout=1800)( delayed(process)( f, b, vols2[mip], ) for f, b in zip( files_ordered2[mip], bin_paths2[mip], ) ) print( f"\nFinished layer index {mip+continue_upload[0]}, took {time.time()-start} seconds" ) start = time.time() except Exception as e: print(e) print( f"timed out on a chunk on layer index {mip+continue_upload[0]}. moving on to the next step of pipeline" ) else: try: with tqdm_joblib( tqdm( desc=f"Creating precomputed volume at mip {chosen}", total=len(files_ordered[chosen][continue_upload[1] :]), ) ) as progress_bar: Parallel(num_procs, timeout=1800, verbose=0)( delayed(process)( f, b, vols[chosen], ) for f, b in zip( files_ordered[chosen][continue_upload[1] :], bin_paths[chosen][continue_upload[1] :], ) ) print(f"\nFinished layer index {chosen}, took {time.time()-start} seconds") except Exception as e: print(e) print(f"timed out on a chunk on layer index {chosen}.") def create_skel_segids( swc_dir: str, origin: Sequence[Union[int, float]], benchmarking: Optional[bool] = False, ) -> Tuple[Skeleton, List[int]]: """Create skeletons to be uploaded as precomputed format Arguments: swc_dir: Path to consensus swc files. origin: x,y,z coordinate of coordinate frame in space in mircons. benchmarking: Optional, scales swc benchmarking data. Returns: skeletons: .swc skeletons to be pushed to bucket. segids: List of ints for each swc's label. """ check_type(swc_dir, str) check_size(origin) check_type(benchmarking, bool) p = Path(swc_dir) files = [str(i) for i in p.glob("*.swc")] if len(files) == 0: raise FileNotFoundError(f"No .swc files found in {swc_dir}.") skeletons = [] segids = [] for i in tqdm(files, desc="converting swcs to neuroglancer format..."): swc_trace = NeuronTrace(path=i) skel = swc_trace.get_skel(benchmarking, origin=np.asarray(origin)) skeletons.append(skel) segids.append(skeletons[-1].id) return skeletons, segids def upload_segments( input_path: str, precomputed_path: str, num_mips: int, benchmarking: Optional[bool] = False, ) -> None: """Uploads segmentation data from local to precomputed path. Arguments: input_path: The filepath to the root directory of the octree data with consensus-swcs folder. precomputed_path: CloudVolume precomputed path or url. num_mips: The number of resolutions to upload (for info file). benchmarking: Optional, scales swc benchmarking data. """ check_type(input_path, str) check_precomputed(precomputed_path) check_type(num_mips, (int, np.integer)) if num_mips < 1: raise ValueError(f"Number of resolutions should be > 0, not {num_mips}") if benchmarking == True: # Getting swc scaling parameters f = Path(input_path).parts[4].split("_") image = f[0] date = type_to_date[image] scale = scales[date] (_, _, vox_size, img_size, origin) = get_volume_info( input_path, num_mips, benchmarking=True, ) chunk_size = [int(i) for i in img_size] else: (_, _, vox_size, img_size, origin) = get_volume_info( input_path, num_mips, ) chunk_size = None vols = create_cloud_volume( precomputed_path, img_size, vox_size, num_mips, layer_type="segmentation", chunk_size=chunk_size, ) swc_dir = Path(input_path) / "consensus-swcs" segments, segids = create_skel_segids(str(swc_dir), origin, benchmarking) for skel in segments: if benchmarking == True: skel.vertices /= scale # Dividing vertices by scale factor vols[0].skeleton.upload(skel) def upload_annotations(input_path: str, precomputed_path: str, num_mips: int) -> None: """Uploads empty annotation volume.""" (_, _, vox_size, img_size, origin) = get_volume_info( input_path, num_mips, ) create_cloud_volume( precomputed_path, img_size, vox_size, num_mips, layer_type="annotation", ) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Convert local volume into precomputed volume on S3." ) parser.add_argument( "input_path", help="Path to directory containing stitched tiles named sequentially.", ) parser.add_argument( "precomputed_path", help="Path to location where precomputed volume should be stored. Example: s3://<bucket>/<experiment>/<channel>", ) parser.add_argument( "layer_type", help="Layer type to upload. One of ['image', 'segmentation']", ) parser.add_argument( "--extension", help="Extension of stitched files. default is tif", default="tif", type=str, ) parser.add_argument( "--channel", help="Channel to upload to", default=0, type=int, ) parser.add_argument( "--num_resolutions", help="Number of resoltions for which downsampling has been done. Default: 7", default=2, type=int, ) parser.add_argument( "--chosen_res", help="Specified resolution to upload. 0 is highest. Default uploads all", default=-1, type=int, ) args = parser.parse_args() if args.layer_type == "image": upload_volumes( args.input_path, args.precomputed_path, args.num_resolutions, chosen=args.chosen_res, ) elif args.layer_type == "segmentation": upload_segments( args.input_path, args.precomputed_path, args.num_resolutions, ) else: upload_segments( args.input_path, args.precomputed_path + "_segments", args.num_resolutions, ) upload_volumes( args.input_path, args.precomputed_path, args.num_resolutions, chosen=args.chosen_res, )