Source code for brainlit.utils.write

import aicspylibczi
import numpy as np
import zarr
from tqdm import tqdm
import dask.array as da
from ome_zarr.writer import write_image
from ome_zarr.io import parse_url
from typing import List
from pathlib import Path
from joblib import Parallel, delayed
import os
from cloudvolume import CloudVolume
import json
from skimage.measure import block_reduce


def _read_czi_slice(czi, C, Z):
    """Reads a slice of a czi object, handling whether the czi is a mosaic or not.

    Args:
        czi (aicspylibczi.CziFile): czi object
        C (int): channel
        Z (int): z slice

    Returns:
        np.array: array of the image data
    """
    if czi.is_mosaic():
        slice = np.squeeze(czi.read_mosaic(C=C, Z=Z, scale_factor=1))
    else:
        slice, _ = czi.read_image(C=C, Z=Z)
        slice = np.squeeze(slice)
    return slice


def _write_zrange_thread(zarr_path, czi_path, channel, zs):
    czi = aicspylibczi.CziFile(czi_path)

    zarr_fg = zarr.open(zarr_path)
    for z in zs:
        zarr_fg[z, :, :] = _read_czi_slice(czi, C=channel, Z=z)


[docs]def czi_to_zarr( czi_path: str, out_dir: str, fg_channel: int = 0, parallel: int = 1 ) -> List[str]: """Convert 4D czi image to a zarr file(s) at a given directory. Single channel image will produce a single zarr, two channels will produce two. Args: czi_path (str): Path to czi image. out_dir (str): Path to directory where zarr(s) will be written. fg_channel (int): Index of foreground channel. parallel (int): Number of cpus to use to write zarr. Returns: list: paths to zarrs that were written """ zarr_paths = [] czi = aicspylibczi.CziFile(czi_path) slice1 = _read_czi_slice(czi, C=0, Z=0) C = czi.get_dims_shape()[0]["C"][1] H = slice1.shape[0] W = slice1.shape[1] Z = czi.get_dims_shape()[0]["Z"][1] sz = np.array([Z, H, W], dtype="int") chunk_z = 10 chunk_sz = (chunk_z, 200, 200) print(f"Writing {C} zarrs of shape {sz} from czi with dims {czi.get_dims_shape()}") fg_path = Path(out_dir) / "fg.zarr" zarr_paths.append(fg_path) zarr_fg = zarr.open(fg_path, mode="w", shape=sz, chunks=chunk_sz, dtype="uint16") if parallel == 1: for z in tqdm(np.arange(Z), desc="Saving slices foreground..."): zarr_fg[z, :, :] = _read_czi_slice(czi, C=fg_channel, Z=z) elif isinstance(parallel, int) and parallel > 1: z_blocks = [ np.arange(i, np.amin([i + chunk_z, sz[0]])) for i in range(0, sz[0], chunk_z) ] Parallel(n_jobs=parallel, backend="threading")( delayed(_write_zrange_thread)(fg_path, czi_path, channel=fg_channel, zs=zs) for zs in tqdm(z_blocks, desc="Saving slices foreground...") ) else: raise ValueError(f"parallel must be positive integer, not {parallel}") for c in range(C): if c == fg_channel: continue bg_path = Path(out_dir) / f"channel_{c}.zarr" zarr_paths.append(bg_path) zarr_bg = zarr.open( bg_path, mode="w", shape=sz, chunks=chunk_sz, dtype="uint16" ) if parallel == 1: for z in tqdm(np.arange(Z), desc="Saving slices background..."): zarr_bg[z, :, :] = _read_czi_slice(czi, C=c, Z=z) elif parallel > 1: Parallel(n_jobs=parallel, backend="threading")( delayed(_write_zrange_thread)(bg_path, czi_path, channel=c, zs=zs) for zs in tqdm(z_blocks, desc="Saving slices background...") ) return zarr_paths
[docs]def zarr_to_omezarr(zarr_path: str, out_path: str, res: list): """Convert 3D zarr to ome-zarr. Args: zarr_path (str): Path to zarr. out_path (str): Path of ome-zarr to be created. res (list): List of xyz resolution values in nanometers. Raises: ValueError: If zarr to be written already exists. ValueError: If conversion is not 3D array. """ if os.path.exists(out_path): raise ValueError( f"{out_path} already exists, please delete the existing file or change the name of the ome-zarr to be created." ) print(f"Converting {zarr_path} to ome-zarr") z = zarr.open(zarr_path) if len(z.shape) != 3: raise ValueError("Conversion only supported for 3D arrays") dra = da.from_zarr(zarr_path) store = parse_url(out_path, mode="w").store root = zarr.group(store=store) write_image(image=dra, group=root, axes="zxy") _edit_ome_metadata(out_path, res)
def _write_slice_ome(z: int, lvl: int, z_in_path: str, zgr_path: str): z_in = zarr.open(z_in_path) zgr = zarr.open_group(zgr_path) z_out = zgr[str(lvl)] im_slice = np.squeeze(z_in[z, :, :]) if lvl > 0: im_ds = block_reduce(im_slice, block_size=2**lvl) else: im_ds = im_slice z_out[z, :, :] = im_ds def zarr_to_omezarr_single(zarr_path: str, out_path: str, res: list, parallel: int = 1): """Convert 3D zarr to ome-zarr manually. Chunk size in z is 1. Args: zarr_path (str): Path to zarr. out_path (str): Path of ome-zarr to be created. res (list): List of xyz resolution values in nanometers. parallel (int): Number of cores to use. Raises: ValueError: If zarr to be written already exists. ValueError: If conversion is not 3D array. """ if os.path.exists(out_path): raise ValueError( f"{out_path} already exists, please delete the existing file or change the name of the ome-zarr to be created." ) zra = zarr.open(zarr_path) sz0 = zra.shape if len(sz0) != 3: raise ValueError("Conversion only supported for 3D arrays") zgr = zarr.group(out_path) for lvl in tqdm(range(5), desc="Writing different levels..."): im_slice = np.squeeze(zra[0, :, :]) if lvl > 0: im_ds = block_reduce(im_slice, block_size=2**lvl) else: im_ds = im_slice chunk_size = [1, np.amin((200, im_ds.shape[0])), np.amin((200, im_ds.shape[1]))] zra_lvl = zgr.create( str(lvl), shape=(sz0[0], im_ds.shape[0], im_ds.shape[1]), chunks=chunk_size, dtype=zra.dtype, dimension_separator="/", ) if parallel == 1: for z in tqdm(range(sz0[0]), desc="Writing slices...", leave=False): _write_slice_ome(z, lvl, zarr_path, out_path) else: Parallel(n_jobs=parallel, backend="threading")( delayed(_write_slice_ome)( z, lvl, z_in_path=zarr_path, zgr_path=out_path ) for z in tqdm(range(sz0[0]), desc="Saving slices...") ) axes = [] for dim in ["z", "x", "y"]: axes.append({"name": dim, "type": "space", "unit": "micrometer"}) datasets = [] for lvl in range(5): datasets.append( { "path": str(lvl), "coordinateTransformations": [ { "type": "scale", "scale": [ res[2] / 1000, res[0] * 2**lvl / 1000, res[1] * 2**lvl / 1000, ], } ], } ) json_data = { "multiscales": [ {"axes": axes, "datasets": datasets, "name": "/", "version": "0.4"} ] } with open(Path(out_path) / ".zattrs", "w") as f: json.dump(json_data, f, indent=4) def _edit_ome_metadata(out_path: str, res: list): res = np.divide([res[-1], res[0], res[1]], 1000) ome_zarr = zarr.open( out_path, "r+", ) metadata_edit = ome_zarr.attrs["multiscales"] for i in range(3): metadata_edit[0]["axes"][i]["unit"] = "micrometer" for i, dataset in enumerate(metadata_edit[0]["datasets"]): new_res = list( np.multiply(dataset["coordinateTransformations"][0]["scale"], res) ) metadata_edit[0]["datasets"][i]["coordinateTransformations"][0][ "scale" ] = new_res ome_zarr.attrs["multiscales"] = metadata_edit def write_trace_layer(parent_dir: str, res: list): """Write precomputed layer (info file) for trace skeletons associated with an ome zarr file. Args: parent_dir (str): Path to directory which holds fg_ome.zarr and where traces layer will be written. res (list): List of xyz resolution values in nanometers. """ if isinstance(parent_dir, str): parent_dir = Path(parent_dir) traces_dir = parent_dir / "traces" z = zarr.open_array(parent_dir / "fg_ome.zarr" / "0") volume_size = [z.shape[1], z.shape[2], z.shape[0]] chunk_size = [z.chunks[1], z.chunks[2], z.chunks[0]] outpath = f"precomputed://file://" + str(traces_dir) info = CloudVolume.create_new_info( num_channels=1, layer_type="segmentation", data_type="uint16", encoding="raw", resolution=res, # Voxel scaling, units are in nanometers voxel_offset=[0, 0, 0], # x,y,z offset in voxels from the origin # Pick a convenient size for your underlying chunk representation # Powers of two are recommended, doesn't need to cover image exactly chunk_size=chunk_size, # units are voxels volume_size=volume_size, # e.g. a cubic millimeter dataset skeletons="skeletons", ) vol = CloudVolume(outpath, info=info, compress=False) vol.commit_info() vol.skeleton.meta.commit_info() # remove vertex type attribute because it is a uint8 and incompatible with neuroglancer info_path = traces_dir / "skeletons/info" with open(info_path) as f: data = json.load(f) for i, attr in enumerate(data["vertex_attributes"]): if attr["id"] == "vertex_types": data["vertex_attributes"].pop(i) break with open(info_path, "w") as f: json.dump(data, f)