from curses import intrflush
from typing import List, Optional, Tuple, Union
import numpy as np
import re
import pandas as pd
import networkx as nx
from cloudvolume import CloudVolume, Skeleton
from io import StringIO
import os
from brainlit.utils.util import (
check_type,
check_size,
)
from sklearn.metrics import pairwise_distances_argmin_min
import warnings
[docs]class NeuronTrace:
"""Neuron Trace class to handle neuron traces as swcs and s3 skeletons
Arguments
---------
path : str
Path to either s3 bucket (url) or swc file (filepath).
seg_id : int
If s3 bucket path is provided, the segment number to pull, default None.
mip : int
If s3 bucket path is provided, the resolution to use for scaling, default None.
rounding : bool
If s3 is provided, specifies if it should be rounded, default True
read_offset : bool
If swc is provided, whether offset should be read from file, default False.
fill_missing: bool
Always passes directly into 'CloudVolume()' function to fill missing skeleton values with 0s, default True.
use_https : bool
Always passes directly into 'CloudVolume()' function to set use_https to desired value, default True.
Attributes
----------
path : str
Path to either s3 bucket (url) or swc file (filepath)
input_type : bool
Specifies whether input file is 'swc' or 'skel'
df : :class:`pandas.DataFrame`
Indices, coordinates, and parents of each node
args : tuple
Stores arguments for df - offset, color, cc, branch
seg_id : int
If s3 bucket path is provided, the segment number to pull
mip : None,int
If s3 bucket path is provided, the resolution to use for scaling
Example
----------
>>> swc_path = "./data/data_octree/consensus-swcs/2018-08-01_G-002_consensus.swc"
>>> s3_path = "s3://open-neurodata/brainlit/brain1_segments"
>>> seg_id = 11
>>> mip = 2
>>> swc_trace = NeuronTrace(swc_path)
>>> s3_trace = NeuronTrace(s3_path,seg_id,mip)
"""
def __init__(
self,
path: str,
seg_id: int = None,
mip: int = None,
rounding: bool = True,
read_offset: bool = False,
fill_missing: bool = True,
use_https: bool = False,
):
self.path = path
self.input_type = None
self.df = None
self.args = []
self.seg_id = seg_id
self.mip = mip
self.rounding = rounding
self.fill_missing = fill_missing
self.use_https = use_https
check_type(path, str)
check_type(seg_id, (type(None), int))
check_type(mip, (type(None), int))
check_type(read_offset, bool)
check_type(rounding, bool)
if (seg_id == None and type(mip) == int) or (
type(seg_id) == int and mip == None
):
raise ValueError(
"For 'swc' do not input mip or seg_id, and for 'skel', provide both mip and seg_id"
)
# first check if it is a skel
if seg_id != None and mip != None:
cv = CloudVolume(
path, mip=mip, fill_missing=fill_missing, use_https=use_https
)
skeleton = cv.skeleton.get(seg_id)
if type(skeleton) is Skeleton:
self.input_type = "skel"
# else, check if it is a swc by checking if file exists/extension is .swc
elif os.path.isfile(self.path) and os.path.splitext(path)[-1].lower() == ".swc":
self.input_type = "swc"
# if it is not a swc or skeleton, raise error
if self.input_type != "swc" and self.input_type != "skel":
raise ValueError("Did not input 'swc' filepath or 'skel' url")
# next, convert to a dataframe
if self.input_type == "swc" and read_offset == False:
df, offset, color, cc, branch = self._read_swc(self.path)
args = [offset, color, cc, branch]
self.df = df
self.args = args
elif self.input_type == "swc" and read_offset == True:
df, color, cc, branch = self._read_swc_offset(path)
args = [None, color, cc, branch]
self.df = df
self.args = args
elif self.input_type == "skel":
df = self._read_s3(path, seg_id, mip, rounding)
(self.path, seg_id, mip)
self.df = df
# public methods
[docs] def get_df_arguments(self) -> list:
"""Gets arguments for df - offset, color, cc, branch
Returns
-------
self.args : list
list of arguments for df, if found - offset, color, cc, branch
Example
-------
>>> swc_trace.get_df_arguments()
>>> [[73954.8686, 17489.532566, 34340.365689], [1.0, 1.0, 1.0], nan, nan]
"""
return self.args
[docs] def get_df(self) -> pd.DataFrame:
"""Gets the dataframe providing indices, coordinates, and parents of each node
Returns
-------
self.df : :class:`pandas.DataFrame`
dataframe providing indices, coordinates, and parents of each node
Example
-------
>>> swc_trace.get_df()
>>> sample structure x y z r parent
0 1 0 -52.589700 -1.448032 -1.228827 1.0 -1
1 2 0 -52.290940 -1.448032 -1.228827 1.0 1
2 3 0 -51.992181 -1.143616 -0.240423 1.0 2
3 4 0 -51.095903 -1.143616 -0.240423 1.0 3
4 5 0 -50.797144 -0.839201 -0.240423 1.0 4
... ... ... ... ... ... ... ...
148 149 0 45.702088 14.381594 -7.159252 1.0 148
149 150 0 46.000847 14.686010 -7.159252 1.0 149
150 151 0 46.897125 14.686010 -7.159252 1.0 150
151 152 0 47.494643 15.294842 -7.159252 1.0 151
152 153 6 48.092162 15.294842 -7.159252 1.0 152
53 rows × 7 columns
"""
return self.df
[docs] def get_skel(
self, benchmarking: bool = False, origin: np.ndarray = None
) -> Skeleton:
"""Gets a skeleton version of dataframe, if swc input is provided
Arguments
----------
origin : None, numpy array with shape (3,1) (default = None)
origin of coordinate frame in microns, (default: None assumes (0,0,0) origin)
benchmarking : bool
For swc files, specifies whether swc file is from benchmarking dataset, to obtain skeleton ID
Returns
--------
skel : cloudvolume.Skeleton
Skeleton object of given SWC file
Example
-------
>>> swc_trace.get_skel(benchmarking=True)
>>> Skeleton(segid=, vertices=(shape=153, float32), edges=(shape=152, uint32), radius=(153, float32), vertex_types=(153, uint8), vertex_color=(153, float32), space='physical' transform=[[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0], [0.0, 0.0, 1.0, 0.0]])
"""
check_type(origin, (type(None), np.ndarray))
check_type(benchmarking, bool)
if type(origin) == np.ndarray:
check_size(origin)
if self.input_type == "swc":
skel = self._swc2skeleton(self.path, benchmarking, origin)
return skel
elif self.input_type == "skel":
cv = CloudVolume(
self.path,
mip=self.mip,
fill_missing=self.fill_missing,
use_https=self.use_https,
)
skel = cv.skeleton.get(self.seg_id)
return skel
[docs] def get_df_voxel(
self, spacing: np.array, origin: np.array = np.array([0, 0, 0])
) -> pd.DataFrame:
"""Converts coordinates in pd.DataFrame from spatial units to voxel units
Arguments
----------
spacing : :class:`numpy.array`
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z])
origin : :class:`numpy.array`
Origin of the spatial coordinate. Default is (0,0,0). Assumed to be
np.array([x,y,z])
Returns
-------
df_voxel : :class:`pandas.DataFrame`
Indicies, coordinates, and parents of each node in the swc. Coordinates
are in voxel units.
Example
-------
>>> swc_trace.get_df_voxel(spacing=np.asarray([2,2,2]))
>>> sample structure x y z r parent
0 1 0 -26 -1 -1 1.0 -1
1 2 0 -26 -1 -1 1.0 1
2 3 0 -26 -1 0 1.0 2
3 4 0 -26 -1 0 1.0 3
4 5 0 -25 0 0 1.0 4
... ... ... ... ... ... ... ...
148 149 0 23 7 -4 1.0 148
149 150 0 23 7 -4 1.0 149
150 151 0 23 7 -4 1.0 150
151 152 0 24 8 -4 1.0 151
152 153 6 24 8 -4 1.0 152
153 rows × 7 columns
"""
check_type(spacing, np.ndarray)
check_size(spacing)
check_type(origin, np.ndarray)
check_size(origin)
df_voxel = self._df_in_voxel(self.df, spacing, origin)
return df_voxel
[docs] def get_graph(
self, spacing: np.array = None, origin: np.array = None
) -> nx.classes.digraph.DiGraph:
"""Converts dataframe in either spatial or voxel coordinates into a directed graph.
Will convert to voxel coordinates if spacing is specified.
Arguments
----------
spacing : None, :class:`numpy.array` (default = None)
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z]).
Provided if graph should convert to voxel coordinates first. Default is None.
origin : None, :class:`numpy.array` (default = None)
Origin of the spatial coordinate, if converting to voxels. Default is None.
Assumed to be np.array([x,y,z])
Returns
-------
G : :class:`networkx.classes.digraph.DiGraph`
Neuron from swc represented as directed graph. Coordinates x,y,z are
node attributes accessed by keys 'x','y','z' respectively.
Example
-------
>>> swc_trace.get_graph()
>>> <networkx.classes.digraph.DiGraph at 0x7f81a83937f0>
"""
check_type(spacing, (type(None), np.ndarray))
if type(spacing) == np.ndarray:
check_size(spacing)
check_type(origin, (type(None), np.ndarray))
if type(origin) == np.ndarray:
check_size(origin)
# if origin isn't specified but spacing is, set origin to np.array([0, 0, 0])
if type(spacing) == np.ndarray and origin is None:
origin = np.array([0, 0, 0])
# voxel conversion option
if type(spacing) == np.ndarray:
df_voxel = self._df_in_voxel(self.df, spacing, origin)
G = self._df_to_graph(df_voxel)
# no voxel conversion option
else:
G = self._df_to_graph(self.df)
return G
[docs] def get_paths(
self, spacing: np.array = None, origin: np.array = None
) -> List[np.array]:
"""Converts dataframe in either spatial or voxel coordinates into a list of paths.
Will convert to voxel coordinates if spacing is specified.
Arguments
----------
spacing : None, :class:`numpy.array` (default = None)
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z]).
Provided if graph should convert to voxel coordinates first. Default is None.
origin : None, :class:`numpy.array`
Origin of the spatial coordinate, if converting to voxels. Default is None.
Assumed to be np.array([x,y,z])
Returns
-------
paths : list
List of Nx3 numpy.array. Rows of the array are 3D coordinates in voxel
units. Each array is one path.
Example
-------
>>> swc_trace.get_paths()[0][1:10]
>>> array([[-52, -1, -1],
[-51, -1, 0],
[-51, -1, 0],
[-50, 0, 0],
[-50, 0, 0],
[-49, 0, 0],
[-48, 0, 0],
[-46, 0, 0],
[-46, 0, 0]], dtype=object)
"""
check_type(spacing, (type(None), np.ndarray))
if type(spacing) == np.ndarray:
check_size(spacing)
check_type(origin, (type(None), np.ndarray))
if type(origin) == np.ndarray:
check_size(origin)
# if origin isn't specified but spacing is, set origin to np.array([0, 0, 0])
if type(spacing) == np.ndarray and origin is None:
origin = np.array([0, 0, 0])
# voxel conversion option
if type(spacing) == np.ndarray:
df_voxel = self._df_in_voxel(self.df, spacing, origin)
G = self._df_to_graph(df_voxel)
# no voxel conversion option
else:
G = self._df_to_graph(self.df)
paths = self._graph_to_paths(G)
return paths
[docs] def generate_df_subset(
self,
vox_in_img_list: list,
subneuron_start: int = None,
subneuron_end: int = None,
) -> pd.DataFrame:
"""Read a new subset dataframe in coordinates in img spacing.
Specify specific range of vertices from dataframe if desired
Arguments
----------
vox_in_img_list : list
List of voxels
subneuron_start : None, int (default = None)
Provides start index, if specified, to apply function to a portion of the dataframe
Default is None.
subneuron_end : None, int (default = None)
Provides end index, if specified, to apply function to a portion of the dataframe
Default is None.
Returns
-------
df : :class:`pandas.DataFrame`
Indicies, coordinates (in img spacing) and parents of each node.
Coordinates are in spatial units.
Example
-------
>>> #swc input, subneuron_start and subneuron_end specified
>>> subneuron_start = 5
>>> subneuron_end = 8
>>> #generate vox_in_img_list
>>> my_list = []
>>>for i in range(subneuron_end-subneuron_start):
my_list.append(10)
>>> vox_in_img_list_2 = list([my_list,my_list,my_list])
>>>swc_trace.generate_df_subset(vox_in_img_list_2,subneuron_start,subneuron_end)
>>> sample structure x y z r parent
5 6 0 10 10 10 1.0 5
6 7 0 10 10 10 1.0 6
7 8 0 10 10 10 1.0 7
"""
check_type(vox_in_img_list, list)
check_type(subneuron_start, (type(None), int))
check_type(subneuron_end, (type(None), int))
if (subneuron_start == None and type(subneuron_end) == int) or (
type(subneuron_start) == int and subneuron_end == None
):
raise ValueError(
"Provide both starting and ending vertices to use for the subneuron"
)
# no subneuron range specified
df = self.df
# subneuron range specified
if subneuron_start != None and subneuron_end != None:
subneuron_df = self.df[subneuron_start:subneuron_end]
df = subneuron_df
df_new = self._generate_df_subset(df, vox_in_img_list)
return df_new
[docs] def get_bfs_subgraph(
self,
node_id: int,
depth: int,
df: pd.DataFrame = None,
spacing: np.array = None,
origin: np.array = None,
) -> Tuple[nx.classes.digraph.DiGraph, nx.classes.digraph.DiGraph, List[np.array]]:
"""
Creates a spanning subgraph from a seed node and parent graph using BFS.
Arguments
----------
node_id : int
The id of the node to use as a seed.
If df is not None this become the node index.
depth : int
The max depth for BFS to traven in each direction.
df : None, DataFrame (default = None)
Dataframe storing indices.
In some cases indexing by row number is preferred.
spacing : None, :class:`numpy.array` (default = None)
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z]).
Provided if graph should convert to voxel coordinates first. Default is None.
origin : :class:`numpy.array`
Origin of the spatial coordinate, if converting to voxels. Default is None.
Assumed to be np.array([x,y,z])
Returns
-------
G_sub : :class:`networkx.classes.digraph.DiGraph`
Subgraph
tree : DiGraph
The tree returned by BFS.
paths : list
List of Nx3 numpy.array. Rows of the array are 3D coordinates in voxel
units. Each array is one path.
Example
-------
>>> #swc input, specify node_id and depth
>>> swc_trace.get_bfs_subgraph(node_id=11,depth=2)
>>>(<networkx.classes.digraph.DiGraph at 0x7f7f2ce65670>,
<networkx.classes.digraph.DiGraph at 0x7f7f2ce65370>,
array([array([[4727, 4440, 3849],
[4732, 4442, 3850],
[4739, 4455, 3849]]),
array([[4732, 4442, 3850],
[4749, 4439, 3856]])], dtype=object))
"""
check_type(node_id, (list, int))
check_type(depth, int)
check_type(df, (type(None), pd.core.frame.DataFrame))
check_type(spacing, (type(None), np.ndarray))
if type(spacing) == np.ndarray:
check_size(spacing)
check_type(origin, (type(None), np.ndarray))
if type(origin) == np.ndarray:
check_size(origin)
# if origin isn't specified but spacing is, set origin to np.array([0, 0, 0])
if type(spacing) == np.ndarray and origin is None:
origin = np.array([0, 0, 0])
# voxel conversion option
if type(spacing) == np.ndarray:
df_voxel = self._df_in_voxel(self.df, spacing, origin)
G = self._df_to_graph(df_voxel)
# no voxel conversion option
else:
G = self._df_to_graph(self.df)
G_sub, tree = self._get_bfs_subgraph(G, node_id, depth, df)
paths = self._graph_to_paths(G_sub)
return G_sub, tree, paths
[docs] def get_sub_neuron(
self,
bounding_box: Union[tuple, list, None],
spacing: np.array = None,
origin: np.array = None,
) -> nx.classes.digraph.DiGraph:
"""Returns sub-neuron with node coordinates bounded by start and end
Arguments
----------
bounding_box : tuple or list or None
Defines a bounding box around a sub-region around the neuron. Length 2
tuple/list. First element is the coordinate of one corner (inclusive)
and second element is the coordinate of the opposite corner (exclusive).
Both coordinates are numpy.array([x,y,z])in voxel units.
spacing : None, :class:`numpy.array` (default = None)
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z]).
Provided if graph should convert to voxel coordinates first. Default is None.
origin : :class:`numpy.array`
Origin of the spatial coordinate, if converting to voxels. Default is None.
Assumed to be np.array([x,y,z])
Returns
-------
G_sub : :class:`networkx.classes.digraph.DiGraph`
Neuron from swc represented as directed graph. Coordinates x,y,z are
node attributes accessed by keys 'x','y','z' respectively.
Example
-------
>>> bounding_box=[[1,2,4],[1,2,3]]
>>> #swc input, no spacing and origin
>>> swc_trace.get_sub_neuron(bounding_box)
>>> <networkx.classes.digraph.DiGraph at 0x7f81a95d1e50>
"""
check_type(bounding_box, (tuple, list))
if len(bounding_box) != 2:
raise ValueError("Bounding box must be length 2")
check_type(spacing, (type(None), np.ndarray))
check_type(spacing, (type(None), np.ndarray))
if type(spacing) == np.ndarray:
check_size(spacing)
check_type(origin, (type(None), np.ndarray))
if type(origin) == np.ndarray:
check_size(origin)
# if origin isn't specified but spacing is, set origin to np.array([0, 0, 0])
if type(spacing) == np.ndarray and origin is None:
origin = np.array([0, 0, 0])
# voxel conversion option
if type(spacing) == np.ndarray:
df_voxel = self._df_in_voxel(self.df, spacing, origin)
G = self._df_to_graph(df_voxel)
# no voxel conversion option
else:
G = self._df_to_graph(self.df)
G_sub = self._get_sub_neuron(G, bounding_box)
return G_sub
[docs] def get_sub_neuron_paths(
self,
bounding_box: Union[tuple, list, None],
spacing: np.array = None,
origin: np.array = None,
) -> List[np.array]:
"""Returns sub-neuron with node coordinates bounded by start and end
Arguments
----------
bounding_box : tuple or list or None
Defines a bounding box around a sub-region around the neuron. Length 2
tuple/list. First element is the coordinate of one corner (inclusive)
and second element is the coordinate of the opposite corner (exclusive).
Both coordinates are numpy.array([x,y,z])in voxel units.
spacing : None, :class:`numpy.array` (default = None)
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z]).
Provided if graph should convert to voxel coordinates first. Default is None.
origin : :class:`numpy.array`
Origin of the spatial coordinate, if converting to voxels. Default is None.
Assumed to be np.array([x,y,z])
Returns
-------
paths : list
List of Nx3 numpy.array. Rows of the array are 3D coordinates in voxel
units. Each array is one path.
Example
-------
>>> bounding_box=[[1,2,4],[1,2,3]]
>>> #swc input, no spacing and origin
>>> swc_trace.get_sub_neuron_paths(bounding_box)
>>> array([], dtype=object)
"""
check_type(bounding_box, (tuple, list))
if len(bounding_box) != 2:
raise ValueError("Bounding box must be length 2")
check_type(spacing, (type(None), np.ndarray))
check_type(spacing, (type(None), np.ndarray))
if type(spacing) == np.ndarray:
check_size(spacing)
check_type(origin, (type(None), np.ndarray))
if type(origin) == np.ndarray:
check_size(origin)
# if origin isn't specified but spacing is, set origin to np.array([0, 0, 0])
if type(spacing) == np.ndarray and origin is None:
origin = np.array([0, 0, 0])
# voxel conversion option
if type(spacing) == np.ndarray:
df_voxel = self._df_in_voxel(self.df, spacing, origin)
G = self._df_to_graph(df_voxel)
# no voxel conversion option
else:
G = self._df_to_graph(self.df)
G_sub = self._get_sub_neuron(G, bounding_box)
paths = self._graph_to_paths(G_sub, round=self.rounding)
return paths
[docs] @staticmethod
def ssd(pts1: np.array, pts2: np.array) -> float:
"""Compute significant spatial distance metric between two traces as defined in APP1.
Args:
pts1 (np.array): array containing coordinates of points of trace 1. shape: npoints x ndims
pts2 (np.array): array containing coordinates of points of trace 1. shape: npoints x ndims
Returns:
[float]: significant spatial distance as defined by APP1
Example
-------
>>> pts1 = swc_trace.get_paths()[0][1:10]
>> pts2 = swc_trace.get_paths()[0][11:20]
>>> NeuronTrace.ssd(pts1,pts2)
>>>6.247937554557103
"""
check_type(pts1, np.ndarray)
check_type(pts2, np.ndarray)
_, dists1 = pairwise_distances_argmin_min(pts1, pts2)
dists1 = dists1[dists1 >= 2]
_, dists2 = pairwise_distances_argmin_min(pts2, pts1)
dists2 = dists2[dists2 >= 2]
# If there are is no significant distance between the 2 sets
if len(dists1) == 0 and len(dists2) == 0:
ssd = 0
# Else, calculate the mean
else:
dists = np.concatenate([dists1, dists2])
ssd = np.mean(dists)
return ssd
# private methods
def _read_swc(
self, path: str
) -> Tuple[pd.DataFrame, List[float], List[int], int, int]:
"""
Read a single swc file
Arguments:
path {string} -- path to file
raw {bool} -- whether you are passing the file directly
Returns:
df {pandas dataframe} -- indices, coordinates, and parents of each node
offset {list of floats} -- offset value of fragment
color {list of ints} -- color
cc {int} -- cc value, from file name
branch {int} -- branch number, from file name
"""
# check input
file = open(path, "r")
in_header = True
offset_found = False
header_length = -1
offset = np.nan
color = np.nan
cc = np.nan
branch = np.nan
while in_header:
line = file.readline().split()
if "OFFSET" in line:
offset_found = True
idx = line.index("OFFSET") + 1
offset = [float(line[i]) for i in np.arange(idx, idx + 3)]
elif "COLOR" in line:
idx = line.index("COLOR") + 1
line = line[idx]
line = line.split(",")
color = [float(line[i]) for i in np.arange(len(line))]
elif "NAME" in line:
idx = line.index("NAME") + 1
name = line[idx]
name = re.split(r"_|-|\.", name)
try:
idx = name.index("cc") + 1
cc = int(name[idx])
idx = name.index("branch") + 1
branch = int(name[idx])
except ValueError:
pass
elif line[0] != "#":
in_header = False
header_length += 1
if not offset_found:
warnings.warn("No offset information found in: " + path)
offset = [float(0) for i in range(3)]
# read coordinates
df = pd.read_table(
path,
names=["sample", "structure", "x", "y", "z", "r", "parent"],
skiprows=header_length,
delimiter="\s+",
)
return df, offset, color, cc, branch
def _read_swc_offset(self, path: str) -> Tuple[pd.DataFrame, List[int], int, int]:
df, offset, color, cc, branch = self._read_swc(path)
df["x"] = df["x"] + offset[0]
df["y"] = df["y"] + offset[1]
df["z"] = df["z"] + offset[2]
return df, color, cc, branch
def _read_s3(
self, s3_path: str, seg_id: int, mip: int, rounding: Optional[bool] = True
):
"""Read a s3 bucket path to a skeleton object
into a pandas dataframe.
Parameters
----------
s3_path : str
String representing the path to the s3 bucket
seg_id : int
The segement number to pull
mip : int
The resolution to use for scaling
rounding: bool, Optional
True is default, false if swc shouldn't be rounded
Returns
-------
df : :class:`pandas.DataFrame`
Indicies, coordinates, and parents of each node in the swc.
Coordinates are in spatial units.
"""
# TODO check header length
# check input
cv = CloudVolume(
s3_path, mip=mip, fill_missing=self.fill_missing, use_https=self.use_https
)
skeleton = cv.skeleton.get(seg_id)
swc_string = skeleton.to_swc()
string_io = StringIO(swc_string)
splitted_string = swc_string.split("\n")
in_h = True
h_len = -1
while in_h:
h_len += 1
line = splitted_string[h_len]
if len(line) == 0 or line[0] != "#":
in_h = False
df = pd.read_table(
string_io,
names=["sample", "structure", "x", "y", "z", "r", "parent"],
skiprows=h_len,
sep=" "
# delim_whitespace=True,
)
# round swc files when reading
if rounding == True:
res = cv.scales[mip]["resolution"]
df["x"] = np.round(df["x"] / res[0])
df["y"] = np.round(df["y"] / res[1])
df["z"] = np.round(df["z"] / res[2])
return df
def _generate_df_subset(self, swc_df, vox_in_img_list):
"""Read a new subset of swc dataframe in coordinates in img spacing.
Parameters
----------
swc_df : pd.DataFrame
DataFrame containing information from swc file
vox_in_img_list: list
List of voxels
Returns
-------
df : :class:`pandas.DataFrame`
Indicies, coordinates (in img spacing) and parents of each node in the swc.
Coordinates are in spatial units.
"""
# check input
df_new = swc_df.copy()
df_new["x"], df_new["y"], df_new["z"] = (
vox_in_img_list[:][0],
vox_in_img_list[:][1],
vox_in_img_list[:][2],
)
return df_new
def _space_to_voxel(
self,
spatial_coord: np.array,
spacing: np.array,
origin: np.array = np.array([0, 0, 0]),
) -> np.array:
"""Converts coordinate from spatial units to voxel units.
Parameters
----------
spatial_coord : :class:`numpy.array`
3D coordinate in spatial units. Assumed to be np.array[(x,y,z)]
spacing : :class:`numpy.array`
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z])
origin : :class:`numpy.array`
Origin of the spatial coordinate. Default is (0,0,0). Assumed to be
np.array([x,y,z])
Returns
-------
voxel_coord : :class:`numpy.array`
Coordinate in voxel units. Assumed to be np.array([x,y,z])
"""
if np.any(spacing == 0):
raise ValueError(f"Zero detected in spacing: {spacing}")
voxel_coord = np.round(np.divide(spatial_coord - origin, spacing))
voxel_coord = voxel_coord.astype(np.int64)
return voxel_coord
def _df_in_voxel(
self,
df: pd.DataFrame,
spacing: np.array,
origin: np.array = np.array([0, 0, 0]),
) -> pd.DataFrame:
"""Converts coordinates in pd.DataFrame representing swc from spatial units
to voxel units
Parameters
----------
df : :class:`pandas.DataFrame`
Indicies, coordinates, and parents of each node in the swc. Coordinates
are in spatial units.
spacing : :class:`numpy.array`
Conversion factor (spatial units/voxel). Assumed to be np.array([x,y,z])
origin : :class:`numpy.array`
Origin of the spatial coordinate. Default is (0,0,0). Assumed to be
np.array([x,y,z])
Returns
-------
df_voxel : :class:`pandas.DataFrame`
Indicies, coordinates, and parents of each node in the swc. Coordinates
are in voxel units.
"""
x = []
y = []
z = []
df_voxel = df.copy()
for index, row in df_voxel.iterrows():
vox = self._space_to_voxel(row[["x", "y", "z"]].to_numpy(), spacing, origin)
x.append(vox[0])
y.append(vox[1])
z.append(vox[2])
df_voxel["x"] = x
df_voxel["y"] = y
df_voxel["z"] = z
return df_voxel
def _df_to_graph(self, df, round=False):
"""Converts dataframe form of neuron trace into a directed graph
Parameters
----------
df_voxel : :class:`pandas.DataFrame`
Indices, coordinates, and parents of each node in the swc.
round : boolean
Whether coordinates/distances should be rounded to integers.
Returns
-------
G : :class:`networkx.classes.digraph.DiGraph`
Neuron from swc represented as directed graph. Coordinates x,y,z are
node attributes accessed by keys 'x','y','z' respectively.
"""
G = nx.DiGraph()
# add nodes
for index, row in df.iterrows():
id = int(row["sample"])
coord = [row["x"], row["y"], row["z"]]
if round:
coord = [int(c) for c in coord]
G.add_node(id)
G.nodes[id]["x"] = coord[0]
G.nodes[id]["y"] = coord[1]
G.nodes[id]["z"] = coord[2]
# add edges
for index, row in df.iterrows():
child = int(row["sample"])
child_coord = [row["x"], row["y"], row["z"]]
parent = int(row["parent"])
if parent > min(df["parent"]):
parent_row = df[df["sample"] == parent]
parent_coord = [parent_row["x"], parent_row["y"], parent_row["z"]]
dist = np.linalg.norm(np.subtract(child_coord, parent_coord))
if round:
dist = int(dist)
G.add_edge(parent, child, distance=dist)
return G
def _get_sub_neuron(
self, G: nx.classes.digraph.DiGraph, bounding_box: Union[tuple, list, None]
) -> nx.classes.digraph.DiGraph:
"""Returns sub-neuron with node coordinates bounded by start and end
Parameters
----------
G : :class:`networkx.classes.digraph.DiGraph`
Neuron from swc represented as directed graph. Coordinates x,y,z are
node attributes accessed by keys 'x','y','z' respectively.
bounding_box : tuple or list or None
Defines a bounding box around a sub-region around the neuron. Length 2
tuple/list. First element is the coordinate of one corner (inclusive) and second element is the coordinate of the opposite corner (exclusive). Both coordinates are numpy.array([x,y,z])in voxel units.
Returns
-------
G_sub : :class:`networkx.classes.digraph.DiGraph`
Neuron from swc represented as directed graph. Coordinates x,y,z are
node attributes accessed by keys 'x','y','z' respectively.
"""
G_sub = G.copy() # make copy of input G
start = bounding_box[0]
end = bounding_box[1]
# remove nodes that are not neighbors of nodes bounded by start and end
for node in list(G_sub.nodes):
neighbors = list(G_sub.successors(node)) + list(G_sub.predecessors(node))
remove = True
for id in neighbors + [node]:
x = G_sub.nodes[id]["x"]
y = G_sub.nodes[id]["y"]
z = G_sub.nodes[id]["z"]
if x >= start[0] and y >= start[1] and z >= start[2]:
if x < end[0] and y < end[1] and z < end[2]:
remove = False
if remove:
G_sub.remove_node(node)
# set origin to start of bounding box
for id in list(G_sub.nodes):
G_sub.nodes[id]["x"] = G_sub.nodes[id]["x"] - start[0]
G_sub.nodes[id]["y"] = G_sub.nodes[id]["y"] - start[1]
G_sub.nodes[id]["z"] = G_sub.nodes[id]["z"] - start[2]
return G_sub
def _graph_to_paths(self, G, round=False):
"""Converts neuron represented as a directed graph with no cycles into a
list of paths.
Parameters
----------
G : :class:`networkx.classes.digraph.DiGraph`
Neuron from swc represented as directed graph. Coordinates x,y,z are
node attributes accessed by keys 'x','y','z' respectively.
Returns
-------
paths : list
List of Nx3 numpy.array. Rows of the array are 3D coordinates in voxel
units. Each array is one path.
"""
G_cp = G.copy() # make copy of input G
branches = []
while len(G_cp.edges) != 0: # iterate over branches
# get longest branch
longest = nx.algorithms.dag.dag_longest_path(
G_cp, weight="distance"
) # list of nodes on the path
branches.append(longest)
# remove longest branch
for idx, e in enumerate(longest):
if idx < len(longest) - 1:
G_cp.remove_edge(longest[idx], longest[idx + 1])
# convert branches into list of paths
paths = []
for branch in branches:
# get vertices in branch as n by 3 numpy.array; n = length of branches
if round:
dtype = "int"
else:
dtype = "float"
path = np.zeros((len(branch), 3), dtype=dtype)
for idx, node in enumerate(branch):
coord = [G_cp.nodes[node][c] for c in ["x", "y", "z"]]
if round:
coord = [np.int64(c) for c in coord]
path[idx, :] = coord
paths.append(path)
return paths
def _get_bfs_subgraph(
self,
G: nx.classes.digraph.DiGraph,
node_id: int,
depth: int,
df: pd.DataFrame = None,
) -> Tuple[nx.classes.digraph.DiGraph, nx.classes.digraph.DiGraph]:
"""
Creates a spanning subgraph from a seed node and parent graph using BFS.
Parameters
----------
G : :class:`networkx.classes.digraph.DiGraph`
Neuron from swc represented as directed graph.
node_id : int
The id of the node to use as a seed.
If df is not None this become the node index.
depth : int
The max depth for BFS to traven in each direction.
df : None, DataFrame (default = None)
Dataframe storing indices.
In some cases indexing by row number is preferred.
Returns
-------
G_sub : :class:`networkx.classes.digraph.DiGraph`
Subgraph
tree : DiGraph
The tree returned by BFS.
"""
if df is not None:
node_id = int(df.iloc[node_id]["sample"])
G_undir = G.to_undirected()
tree = nx.bfs_tree(G_undir, node_id, depth_limit=depth) # forward BFS
G_sub = nx.subgraph(G, list(tree.nodes))
return G_sub, tree
def _swc2skeleton(
self, swc_file: str, benchmarking: bool = False, origin: np.array = None
) -> Skeleton:
"""Converts swc file into Skeleton object
Arguments:
swc_file {str} -- path to SWC file
Keyword Arguments:
origin {numpy array with shape (3,1)} -- origin of coordinate frame in microns, (default: None assumes (0,0,0) origin)
Returns:
skel {cloudvolume.Skeleton} -- Skeleton object of given SWC file
"""
with open(swc_file, "r") as f:
contents = f.read()
# get every line that starts with a hashtag
comments = [i.split(" ") for i in contents.split("\n") if i.startswith("#")]
offset = np.array([float(j) for i in comments for j in i[2:] if "OFFSET" in i])
color = [float(j) for i in comments for j in i[2].split(",") if "COLOR" in i]
# set alpha to 0.0 so skeleton is opaque
color.append(0.0)
color = np.array(color, dtype="float32")
skel = Skeleton.from_swc(contents)
# physical units
# space can be 'physical' or 'voxel'
skel.space = "physical"
# hard coding parsing the id from the filename
idx = swc_file.find("G")
if benchmarking == True:
idx1 = swc_file.find(
"_", swc_file.find("_") + 1
) # finding second occurence of "_"
idx2 = swc_file.find(".")
skel.id = swc_file[idx1 + 1 : idx2]
else:
skel.id = int(swc_file[idx + 2 : idx + 5])
# hard coding changing data type of vertex_types
skel.extra_attributes[-1]["data_type"] = "float32"
skel.extra_attributes.append(
{"id": "vertex_color", "data_type": "float32", "num_components": 4}
)
# add offset to vertices
# and shift by origin
skel.vertices += offset
if origin is not None:
skel.vertices -= origin
# convert from microns to nanometers
skel.vertices *= 1000
skel.vertex_color = np.zeros((skel.vertices.shape[0], 4), dtype="float32")
skel.vertex_color[:, :] = color
return skel