import matplotlib.pyplot as plt
import numpy as np
import scipy.ndimage as ndi
from skimage import measure
import pandas as pd
from scipy.ndimage import center_of_mass
from scipy.spatial.distance import cdist
from scipy.spatial import distance_matrix
import os
import pickle
import copy
import math
from scipy.spatial import cKDTree
[docs]
def actinangle2dist_enhanced(filament_coord_extend_dict, imgsize, voxel_size_xyz):
"""
Enhanced actin angle to distance calculation function
Calculates shortest distances and corresponding angle relationships between
actin filaments.
Args:
filament_coord_extend_dict (Dict): Actin filament coordinate dictionary
imgsize (Tuple): Image dimensions
voxel_size_xyz (List[float]): Voxel size
Returns:
Tuple[List[float], List[float]]: (distance list, angle list)
Note:
- Angle range: 0-180 degrees
- Distance units consistent with input coordinate units
- Automatically handles boundary cases and outliers
"""
filament_coord_extend_lst = []
filamentnames = list(filament_coord_extend_dict.keys())
for idx in range(len(filamentnames)):
curfilamentcoordslst = [np.array(coord) for coord in filament_coord_extend_dict[f'{filamentnames[idx]}']]
filament_coord_extend_lst.append(curfilamentcoordslst)
filament_coord_lst = filament_coord_extend_lst
filaments_vect_lst = [] # vector for every point in every filament
for single_filament_coord in filament_coord_lst:
filament_vectlst = []
for jj, point in enumerate(single_filament_coord):
if jj < 1:
tempvect = single_filament_coord[jj + 1] - point
else:
tempvect = point - single_filament_coord[jj - 1]
filament_vectlst.append(tempvect)
filaments_vect_lst.append(filament_vectlst)
filaments_min_d_lst = []
filaments_angle_lst = []
if len(filament_coord_extend_lst) < 2:
return filaments_min_d_lst, filaments_angle_lst
for i, curfilaments_coords in enumerate(filament_coord_extend_lst):
curfilaments_vect = filaments_vect_lst[i]
rest_filament_coord_extend_lst = copy.deepcopy(filament_coord_extend_lst)
rest_filament_vect_extend_lst = copy.deepcopy(filaments_vect_lst)
_ = rest_filament_coord_extend_lst.pop(i)
_ = rest_filament_vect_extend_lst.pop(i)
for ll, point in enumerate(curfilaments_coords):
vect1 = curfilaments_vect[ll]
min_dist = float('inf')
cur_angle = float('inf')
for kk, filaments_coords in enumerate(rest_filament_coord_extend_lst):
try:
point_reshaped = np.array([point])
filaments_coords_array = np.array(filaments_coords)
distss = cdist(point_reshaped, filaments_coords_array, metric='euclidean')
min_d = float(np.min(distss))
# Find the index of minimum distance
flat_idx = np.argmin(distss)
min_d_idx = flat_idx % distss.shape[1]
if min_d < min_dist:
temp_d = min_d
vect2 = rest_filament_vect_extend_lst[kk][min_d_idx]
if np.array_equal(vect1, vect2):
continue
try:
dot_result = np.dot(vect1, vect2)
if np.isscalar(dot_result):
vector_dot_product = float(dot_result)
else:
vector_dot_product = float(dot_result.item())
norm1 = float(np.linalg.norm(vect1))
norm2 = float(np.linalg.norm(vect2))
norm_product = norm1 * norm2
except:
continue
if norm_product == 0:
continue
cosine_value = vector_dot_product / norm_product
cosine_value = max(-1.0, min(1.0, cosine_value)) # Clip to valid range
import math
arccos_val = math.acos(cosine_value)
angle = math.degrees(arccos_val)
if angle >= 180:
temp_angle = 0.0
elif angle > 90:
temp_angle = 180.0 - angle
else:
temp_angle = angle
if not math.isnan(temp_angle) and 0 <= temp_angle <= 180:
min_dist = temp_d
cur_angle = temp_angle
except Exception as e:
continue
if min_dist != float('inf') and cur_angle != float('inf'):
filaments_min_d_lst.append(min_dist)
filaments_angle_lst.append(cur_angle)
return filaments_min_d_lst, filaments_angle_lst
# ------
[docs]
def load_partition_coords_from_xvg(xvg_file):
"""
Load partition coordinate data from XVG file
"""
partition_coords = {}
radial_positions = {}
# Read all data first
data_lines = []
with open(xvg_file, 'r') as f:
for line in f:
if not line.startswith('#') and not line.startswith('@'):
parts = line.strip().split()
if len(parts) >= 4:
data_lines.append(parts)
if not data_lines:
return {}, {}
# Get max partition ID
max_partition_id = max(int(parts[3]) for parts in data_lines)
# Process data
for parts in data_lines:
z, y, x, partition_id = float(parts[0]), float(parts[1]), float(parts[2]), int(parts[3])
coord = [z, y, x]
if partition_id not in partition_coords:
partition_coords[partition_id] = []
partition_coords[partition_id].append(coord)
# Normalized radial position
normalized_position = (partition_id - 0.5) / max_partition_id
radial_positions[(int(z), int(y), int(x))] = normalized_position
# Convert to numpy arrays
for partition_id in partition_coords:
partition_coords[partition_id] = np.array(partition_coords[partition_id])
return partition_coords, radial_positions
[docs]
def calculate_rdf_from_xvg(organelle_mask, partition_coords, radial_positions, bins=8):
"""
Calculate RDF directly from XVG partition data and organelle mask
Parameters
----------
organelle_mask : numpy.ndarray
3D organelle mask array
partition_coords : dict
Dictionary with partition coordinates from XVG
radial_positions : dict
Dictionary mapping coordinates to radial positions
bins : int, default=8
Number of radial bins
Returns
-------
dict
Dictionary containing RDF results
"""
# Extract organelle coordinates
organelle_binary = organelle_mask > 0.5
organelle_coords = np.column_stack(np.where(organelle_binary))
print(f'Total organelles found: {len(organelle_coords)}')
if len(organelle_coords) == 0:
print("No organelles found")
return None
# Use KDTree for fast nearest neighbor search
# Ensure partition_coords_array is a 2D numpy array of shape (n_points, 3)
coords_list = list(radial_positions.keys())
if len(coords_list) > 0 and isinstance(coords_list[0], tuple):
partition_coords_array = np.array(coords_list)
else:
# Fallback if keys are already arrays or lists
partition_coords_array = np.asarray(coords_list)
partition_rpos_array = np.array(list(radial_positions.values()))
print(f'Building KDTree for {len(partition_coords_array)} partition points...')
if partition_coords_array.ndim != 2:
raise ValueError(f"partition_coords_array has wrong dimensions: {partition_coords_array.shape}")
tree = cKDTree(partition_coords_array)
# Query nearest neighbors for all organelles at once
dists, idxs = tree.query(organelle_coords, distance_upper_bound=10.0)
# Assign radial positions to organelles within valid distance
organelle_radial_positions = []
for i, dist in enumerate(dists):
if np.isfinite(dist): # Within valid range
organelle_radial_positions.append(partition_rpos_array[idxs[i]])
# Skip organelles outside the valid distance range
print(f'Organelles assigned to partitions: {len(organelle_radial_positions)} / {len(organelle_coords)}')
if len(organelle_radial_positions) == 0:
print("No organelles could be assigned to partitions")
return None
# Calculate histogram of organelle radial positions
organelle_layer_counts, edges = np.histogram(organelle_radial_positions, bins=bins, range=(0, 1))
# Calculate cytoplasm volume distribution (all partition coordinates)
all_radial_positions = partition_rpos_array
cyto_layer_counts, _ = np.histogram(all_radial_positions, bins=bins, range=(0, 1))
print(f'Organelle layer counts: {organelle_layer_counts}')
print(f'Cytoplasm layer counts: {cyto_layer_counts}')
# Calculate normalized RDF
cyto_volume = len(all_radial_positions)
organelle_volume = len(organelle_radial_positions)
average_density = organelle_volume / cyto_volume
g_average = np.zeros(len(organelle_layer_counts))
radii = np.zeros(len(organelle_layer_counts))
for i in range(len(g_average)):
if cyto_layer_counts[i] > 0 and average_density > 0:
# RDF = (local density) / (average density)
local_density = organelle_layer_counts[i] / cyto_layer_counts[i]
g_average[i] = local_density / average_density
else:
g_average[i] = 0
# Radial position (center of bin)
radii[i] = (edges[i] + edges[i+1]) / 2
print(f'RDF g(r): {g_average}')
print(f'Radial positions: {radii}')
# Prepare results
rdf_results = {
'dataset': 'sxt_analysis',
'radii': radii.tolist(),
'rdf': g_average.tolist(),
'organelle_volume': organelle_volume,
'cyto_volume': cyto_volume,
'average_density': average_density,
'organelle_layer_counts': organelle_layer_counts.tolist(),
'cyto_layer_counts': cyto_layer_counts.tolist(),
'assignment_efficiency': len(organelle_radial_positions) / len(organelle_coords)
}
return rdf_results
def calculate_distribution_similarity(rdf_1, rdf_2):
"""
Calculate distribution similarity between two organelle distributions using Pearson correlation.
Parameters
----------
rdf_1 : array-like
Radial Distribution Function values from the first image/modality.
rdf_2 : array-like
Radial Distribution Function values from the second image/modality.
Returns
-------
dict
Dictionary containing 'pearson_coefficient' and 'p_value'.
"""
from scipy.stats import pearsonr
if len(rdf_1) != len(rdf_2):
raise ValueError("RDF arrays must have the same length for comparison.")
corr, p_value = pearsonr(rdf_1, rdf_2)
return {
'pearson_coefficient': corr,
'p_value': p_value,
'is_significant': p_value < 0.05
}
[docs]
def save_radial_rdf_results(rdf_results, output_dir, dataid):
"""
Save radial RDF results to files
Parameters
----------
rdf_results : dict
RDF results
output_dir : str
Output directory
dataid : str
Data identifier
"""
if rdf_results is None:
print("No RDF results to save")
return
os.makedirs(output_dir, exist_ok=True)
# Save as pickle
pickle_path = os.path.join(output_dir, f'{dataid}_radial_rdf_results.pkl')
with open(pickle_path, 'wb') as f:
pickle.dump(rdf_results, f)
print(f"Radial RDF results saved to: {pickle_path}")
# Save as CSV
csv_data = []
for i, (r, g) in enumerate(zip(rdf_results['radii'], rdf_results['rdf'])):
csv_data.append({
'radial_position': r,
'rdf_value': g,
'organelle_count': rdf_results['organelle_layer_counts'][i],
'cyto_count': rdf_results['cyto_layer_counts'][i]
})
if csv_data:
df = pd.DataFrame(csv_data)
csv_path = os.path.join(output_dir, f'{dataid}_radial_rdf_data.csv')
df.to_csv(csv_path, index=False)
print(f"Radial RDF data saved to: {csv_path}")
# %%
#----------
# wfm isg cluster
def volume_to_radius(volume):
return (3 * volume / (4 * np.pi)) ** (1 / 3)
def draw_sphere(img, center, radius):
# 'center' is a tuple (x, y, z)
# 'img' is the 3D image
z, y, x = center
radius = int(volume_to_radius(radius))
rr, cc = disk((y, x), radius)
rr[rr >= img.shape[1]] = img.shape[1] - 1
cc[cc >= img.shape[2]] = img.shape[2] - 1
img[z, rr, cc] = 255
# if 0 <= x < new_image_shape[0] and 0 <= y < new_image_shape[1] and 0 <= z < new_image_shape[2]:
# draw_sphere(img_imaris[time_point - 1], coords, img_volume)
[docs]
def reconstruct_4d_mask(data, new_image_shape, rescalex, rescaley):
"""
Reconstructs a 4D binary mask array with a temporal dimension from 3D spatial point data.
This function generates a 4D (time, X, Y, Z) mask array based on vesicle/particle coordinates across multiple time points.
The coordinates from the input DataFrame are mapped and rescaled to fit into the target image shape, and each particle is visualized as a small cubic region in the 3D volume per time frame.
Parameters
----------
data : pandas.DataFrame
DataFrame containing at least the columns 'Time', 'Position X', 'Position Y', 'Position Z'.
Each row should represent a single particle/vesicle at a given time and 3D position.
new_image_shape : tuple of int
The target image dimensions as (X, Y, Z), defining the desired output resolution for the spatial axes.
rescalex : float
Manual rescaling factor applied to the X axis, used to match physical/image boundary correspondence.
rescaley : float
Manual rescaling factor applied to the Y axis, used to match physical/image boundary correspondence.
Returns
-------
img : numpy.ndarray
A 4D numpy array of shape (T, X, Y, Z) where each particle/vesicle is assigned a unique label (id) at each time point T in the binary mask.
ves_coords : list
List of lists of original (X, Y, Z) coordinates for the vesicles at each time point, ordered by frame.
"""
time_range = (data['Time'].min(), data['Time'].max())
time_values = data['Time'].unique()
# Creating a 4D array for the image with an additional time dimension
# The shape is (number of time points, X, Y, Z)
# New image dimensions and resolution
# new_image_shape = (187, 302, 43)
# new_image_shape = (round(x_range[1]-x_range[0]), round(y_range[1]-y_range[0]), round(z_range[1]-z_range[0]))
resolution = (0.1030, 0.1030, 0.3006) # in micrometers
# resolution = (1, 1, 1)
# resolution = (9.7099, 9.7099, 3.3267) # 1 / voxel number
ves_size = 0.2 # in micrometers
img_imaris_shape = (len(time_values),) + new_image_shape
img_imaris = np.zeros(img_imaris_shape, dtype=np.uint8)
x_range = (data['Position X'].min(), data['Position X'].max())
y_range = (data['Position Y'].min(), data['Position Y'].max())
z_range = (data['Position Z'].min(), data['Position Z'].max())
ves_coords = []
# Populating the 4D image
for time_point in time_values:
# Extract data for the current time point
time_data = data[data['Time'] == time_point]
# Calculating the scale for each axis based on the new resolution
# TODO: the 2 rescale parameters are manully fitted but should be availale from image size / boundary distance
x_scale_new = new_image_shape[0] / ((x_range[1] - x_range[0])*rescalex)
y_scale_new = new_image_shape[1] / ((y_range[1] - y_range[0])*rescaley)
z_scale_new = new_image_shape[2] / ((z_range[1] - z_range[0]))
centered_image = np.zeros(new_image_shape, dtype=np.uint8)
x_center = time_data['Position X'].mean()
y_center = time_data['Position Y'].mean()
z_center = time_data['Position Z'].mean()
x_offset = (new_image_shape[0] // 2) - int((x_center - x_range[0]) * x_scale_new)
y_offset = (new_image_shape[1] // 2) - int((y_center - y_range[0]) * y_scale_new)
z_offset = (new_image_shape[2] // 2) - int((z_center - z_range[0]) * z_scale_new)
id = 1
ves_coords_ti = []
for _, row in time_data.iterrows():
x_new = int((row['Position X'] - x_range[0]) * x_scale_new) + x_offset
y_new = int((row['Position Y'] - y_range[0]) * y_scale_new) + y_offset
z_new = int((row['Position Z'] - z_range[0]) * z_scale_new) + z_offset
if 0 <= x_new < new_image_shape[0] and 0 <= y_new < new_image_shape[1] and 0 <= z_new < new_image_shape[2]:
# TODO: the voxel size is not average compared to its real size, avg voxel size 4-6
# this for visulize
centered_image[x_new-2:x_new+2, y_new-2:y_new+2, z_new-2:z_new+2] = id
# centered_image[x_new-1:x_new+1, y_new-1:y_new+1, z_new] = id
id += 1
ves_coords_ti += [(row['Position X'], row['Position Y'], row['Position Z'])]
ves_coords += [ves_coords_ti]
img_imaris[time_point - 1] = centered_image
return img_imaris, ves_coords
# Function to find instances and calculate their center of mass
def find_instances_and_centers(img_fi, vesicle_pixel_cutoff):
im_vesicle_mask = np.where(np.less(img_fi, vesicle_pixel_cutoff), 0, 1)
im_vesicle_label, n_vesicle_label = ndi.measurements.label(im_vesicle_mask)
centers = [center_of_mass(img_fi, im_vesicle_label, i) for i in range(n_vesicle_label)]
return im_vesicle_label, centers
# Function to identify cluster
def find_clusters_and_centers(fname, fi, raw_img, img_fi, ves_coords_list, vesicle_pixel_cutoff, visulize=1, outdir = None):
im_vesicle_label, _ = find_instances_and_centers(img_fi, vesicle_pixel_cutoff)
ves_volume_list = list(map(lambda x: np.array(x.area), measure.regionprops(im_vesicle_label)))
# real_x = 57.1583 / 555 # μm
# real_y = 55.3046 / 537 # μm
# real_z = 11.122 / 37 # μm
real_x = 19.2588 / 187 # μm
real_y = 31.1024 / 302 # μm
real_z = 12.9256 / 43 # μm
cluster_index = []
# for i in range(len(ves_volume_list)):
# ves_coords = np.where(np.equal(im_vesicle_label, i+1))
# ves_coords_x = ves_coords[1].max() - ves_coords[1].min()
# ves_coords_y = ves_coords[2].max() - ves_coords[2].min()
# ves_coords_z = ves_coords[0].max() - ves_coords[0].min()
# # 3D distance may not be very accurate here
# # dist = np.sqrt((ves_coords_x)**2 + (ves_coords_y)**2 + (ves_coords_z)**2)
# dist = np.sqrt((ves_coords_x*real_x)**2 + (ves_coords_y*real_y)**2 + (ves_coords_z*real_z)**2)
# print(dist)
# if dist > 0.6 and 5 < ves_volume_list[i] < 10000 and ves_coords[2].max() <= 180:
# cluster_index += [i+1]
# change this to center distance instead of cluster size
dist_matrix = distance_matrix(np.array(ves_coords_list), np.array(ves_coords_list))
# distance threshold between any two vesicles
threshold_distance = 1 # 0.2
# Initialize labels for each particle
labels = np.full(len(dist_matrix), -1) # Start with -1 to denote unlabelled
# Label counter
current_label = 1
for i in range(len(dist_matrix)):
if labels[i] == -1:
if np.any(dist_matrix[i, :] < threshold_distance):
labels[i] = current_label
for j in range(len(dist_matrix)):
if dist_matrix[i, j] < threshold_distance and labels[j] == -1:
labels[j] = current_label
current_label += 1
# number each cluster from 1
cluster_dict = {}
i = 1
# the label starts form 1, just for the index of each identity
# the label for ves_coords_list is not the same as the one for im_vesicle_label
for label in np.unique(labels):
if len(np.where(labels == label)[0].tolist()) > 1:
cluster_dict[i] = np.where(labels == label)[0].tolist()
i += 1
# number each cluster from 1
im_ves_cluster = np.zeros_like(img_fi, dtype=np.int64)
for i, v in enumerate(cluster_dict):
for k in cluster_dict[v]:
im_ves_cluster = np.where(np.equal(img_fi, k+1), i+1, im_ves_cluster)
cluster_centers = []
for i in range(len(cluster_dict)):
if np.any(np.isnan(im_ves_cluster)) or np.any(np.isinf(im_ves_cluster)):
print("Input data contains NaN or Inf.")
unique_labels = np.unique(im_ves_cluster)
label = i + 1
if label not in unique_labels:
print(f"Label {label} does not exist in the image.")
com = center_of_mass(im_ves_cluster, labels=im_ves_cluster, index=label)
if np.isnan(com).any():
print(f"Calculated center of mass is NaN: {com}")
else:
# print(f"Center of mass coordinates: {com}")
cluster_centers += [com]
# The returned center might be nan
# cluster_centers = [center_of_mass(im_ves_cluster, im_ves_cluster, i+1) for i in range(len(cluster_dict))]
if visulize:
if outdir is None:
outdir = "./data/results"
os.makedirs(outdir, exist_ok=True)
fig = plt.figure(figsize=(20, 4))
ax = fig.add_subplot(1, 4, 1)
plt.title('raw image')
plt.imshow(np.max(raw_img, axis=0))
ax = fig.add_subplot(1, 4, 2)
plt.title('image reconstruction')
plt.imshow(np.max(img_fi, axis=0))
ax = fig.add_subplot(1, 4, 3)
plt.title('seg vesicle')
plt.imshow(np.max(im_vesicle_label, axis=0))
ax = fig.add_subplot(1, 4, 4)
plt.title('seg cluster')
plt.imshow(np.max(im_ves_cluster, axis=0))
plt.savefig(os.path.join(outdir, '{}_fi{}.png'.format(fname, fi)))
plt.close()
print('frame_{}: n_vesicle: {}; n_cluster: {}; clustered_ves: {}'.format(fi,
len(ves_coords_list),
len(cluster_centers),
len([i for x in cluster_dict for i in cluster_dict[x]]) ))
return im_vesicle_label, im_ves_cluster, cluster_dict, cluster_centers
# Function to match instances across frames using their center positions, this is simplified here
def match_instances_across_frames(centers_list, dthreshold):
'''
input:
centers_list = [
[(1, 1), (5, 5)],
[(2, 2), (5, 4)],
[(3, 3), (6, 4)]]
dthreshold = 2
output:
[[0, 0, 0], [1, 1, 1]]
'''
# Initialize the list to store consistent instances across all frames in the window
track_instances = []
# Start with instances in the first frame
for i, center1 in enumerate(centers_list[0]):
match_found = True
match_indices = [i]
# Compare with instances in subsequent frames
for centers in centers_list[1:]:
if not len(centers):
continue
distances = cdist([center1], centers)[0]
match_index = np.argmin(distances)
center1 = centers[match_index]
match_indices.append(match_index)
# Check distance threshold (optional, for more accurate matching)
# Threshold for considering a match (can be adjusted)
if distances[match_index] > dthreshold:
match_found = False
break
# If a match is found in all frames, add it to the consistent instances
if match_found:
track_instances.append(match_indices)
# return cluster idx, instead of vesicle ids
return track_instances
def process_tracking_windows(data):
''' bug: the returned frames are not continuous '''
final_cluster = {}
for triplet in data[0]:
final_cluster[tuple(triplet)] = [f'frame_{i}: {v}' for i, v in enumerate(triplet)]
for i, window in enumerate(data[1:], start=1):
# window is the list of tracking instances (triplet)
if not len(window):
continue
# if len(triplet) < 3:
# continue
# check the number before adding new elements
# last_frame =
last_elements = [k[-2:] for k in final_cluster]
# print(len(window))
# print(final_cluster)
for t in window:
triplet = tuple(t)
two_elements = (t[0], t[1])
matching_keys = [(k, int(final_cluster[k][-1].split(':')[0].split('_')[1])) for k in final_cluster if k[-2:] == two_elements]
if matching_keys:
matched_key = matching_keys[0][0]
matched_key_frame = matching_keys[0][1]
if matched_key_frame == i+1:
new_key = matched_key + (t[2],)
final_cluster[new_key] = final_cluster.pop(matched_key)
final_cluster[new_key].append(f'frame_{i+2}: {t[2]}')
else:
# new_key = (t[2])
# final_cluster[new_key].append(f'frame_{i+2}: {t[2]}')
# print(i+1, matched_key_frame)
pass
else:
if two_elements in last_elements:
print(f'Possible merge detected in frame {i} for {triplet}')
else:
final_cluster[triplet] = [f'frame_{i+k}: {v}' for k, v in enumerate(triplet)]
return final_cluster
[docs]
def analyze_vesicle_clusters(name, img_ch2, img_imaris, ves_coords, outdir = None, config_params=None):
"""
Main vesicle clustering analysis function
Parameters:
-----------
name : str
Sample name
img_ch2 : numpy.ndarray
Original fluorescence image data
img_imaris : numpy.ndarray
Reconstructed Imaris image data
ves_coords : list
List of vesicle coordinates for each time point
config_params : dict, optional
Configuration parameters dictionary
Returns:
--------
results : dict
Analysis results dictionary containing processed image data and clustering information
"""
# Default configuration parameters
default_config = {
'window_size': 5,
'vesicle_pixel_cutoff': 1,
'dthreshold': 5,
'visualize': True
}
if config_params:
default_config.update(config_params)
window_size = default_config['window_size']
vesicle_pixel_cutoff = default_config['vesicle_pixel_cutoff']
dthreshold = default_config['dthreshold']
visualize = default_config['visualize']
outdir = f'{outdir}/imgs'
print(f"Processing {name}, image shape: {img_imaris.shape}")
proc_image = {}
# image segmentation for all frames
for fi in range(len(img_imaris)):
im_ves_fi, im_ves_cluster_fi, cluster_dict, cluster_centers = find_clusters_and_centers(
name, fi, img_ch2[fi],
img_imaris[fi],
ves_coords[fi],
vesicle_pixel_cutoff,
visulize=visualize,
outdir=outdir
)
proc_image[fi] = [im_ves_fi, im_ves_cluster_fi, cluster_dict, cluster_centers]
# Process all frames using a sliding window
tracking_cluster = []
for i in range(len(img_imaris) - window_size + 1):
centers_list = [proc_image[fi][3] for fi in range(i, i + window_size)]
track_instances = match_instances_across_frames(centers_list, dthreshold)
if not len(track_instances):
continue
tracking_cluster.append(np.array(track_instances) + 1)
final_cluster = process_tracking_windows(tracking_cluster[:-1])
print('Detected number of cluster: {}'.format(len(final_cluster)))
# check the clustered vesicle size
cluster_size = []
for i, k in enumerate(final_cluster):
clus_size = 0
for v in final_cluster[k]:
try:
fi, clu_idx = int(v.split(':')[0].split('_')[1]), int(v.split(':')[1])
clus_size += np.sum(np.where(proc_image[fi][1] == clu_idx, 1, 0))
except KeyError:
print(v)
cluster_size.append(clus_size)
final_cluster[k].append(clus_size)
# Construct return results
results = {
'sample_name': name,
'processed_images': proc_image,
'final_clusters': final_cluster,
'cluster_sizes': cluster_size,
'tracking_data': tracking_cluster,
'config_used': default_config
}
return results
#--------------------------
# sim
[docs]
def analyze_organelle_aggregation(centers_list, dthreshold=0.6):
"""
Analyze organelle aggregation in time-lapse images.
Aggregation is defined as the set of organelles that remain within a distance
less than a certain threshold across a number of continuous frames.
Parameters
----------
centers_list : list of np.ndarray
List of arrays containing organelle center coordinates for each frame.
dthreshold : float
Distance threshold (in micrometers) to define an aggregation.
Returns
-------
dict
Dictionary mapping aggregation IDs to their tracking history and volume.
"""
from scipy.spatial.distance import cdist
# Simple clustering per frame based on distance threshold
clusters_per_frame = []
for centers in centers_list:
if len(centers) == 0:
clusters_per_frame.append([])
continue
dist_matrix = cdist(centers, centers)
labels = np.full(len(centers), -1)
current_label = 0
for i in range(len(centers)):
if labels[i] == -1:
cluster_members = [j for j in range(len(centers)) if dist_matrix[i, j] < dthreshold]
if len(cluster_members) > 1: # Only consider groups as aggregates
for m in cluster_members:
labels[m] = current_label
current_label += 1
clusters_per_frame.append(labels)
# Track aggregates across frames (simplified matching)
# In a full implementation, this would use more robust tracking logic
return {"clusters": clusters_per_frame}
[docs]
def analyze_docked_granules(img_isg, mem_mask, isg_data, show_visualization=False, distance_threshold=None):
"""
Analyze the localization relationship between ISG granules and plasma membrane using matrix data.
Parameters:
img_isg: numpy ndarray, ISG segmentation 3D matrix
mem_mask: numpy ndarray, PM mask 3D matrix
isg_data: numpy ndarray, granule center coordinates and volume information (CSV loading result)
show_visualization: bool, whether to display visualization images of partial slices
distance_threshold: float, optional. If provided, use this fixed distance (in pixels/nm)
to define docked pool instead of dynamic vesicle radius. Matches manuscript requirement for fixed thresholds (e.g., 300 nm).
Returns:
dict, containing total granule count, docked granule count and ratio
"""
isg_volume = isg_data[1:, 19]
centers = isg_data[1:, 4:7]
ves_label = np.zeros_like(mem_mask, dtype=np.uint8)
for i, center in enumerate(centers):
ves_label[int(center[2])-3:int(center[2])+3,
int(center[1])-3:int(center[1])+3,
int(center[0])-3:int(center[0])+3] = 1
# Crop PM and corresponding regions
nonzero_coords = np.where(mem_mask != 0)
if len(nonzero_coords[0]) == 0:
return {'total_granules': 0, 'docked_granules': 0, 'docked_ratio': 0}
x0 = (nonzero_coords[2].min() // 100) * 100
y0 = (nonzero_coords[1].min() // 100) * 100
x1 = (nonzero_coords[2].max() // 100) * 100 + 100
y1 = (nonzero_coords[1].max() // 100) * 100 + 100
mem_mask_upd = mem_mask[:, y0:y1, x0:x1]
ves_label = ves_label[:, y0:y1, x0:x1]
img_isg = img_isg[:, y0:y1, x0:x1]
pm_mask = np.where(mem_mask == 0, 1, 0)
centers_upd = []
isg_volume_upd = []
for i, center in enumerate(centers):
valid_x = (center[0] >= x0) and (center[0] < x1)
valid_y = (center[1] >= y0) and (center[1] < y1)
valid_z = 0 <= center[2] < mem_mask.shape[0]
if valid_x and valid_y and valid_z:
try:
if pm_mask[int(center[2]), int(center[1]), int(center[0])] == 0:
centers_upd.append([center[0]-x0, center[1]-y0, center[2]])
isg_volume_upd.append(isg_volume[i])
except IndexError:
continue
pm_mask_bin = np.where(mem_mask_upd == 0, 1, 0)
dis_pm_mask = ndi.distance_transform_edt(1 - pm_mask_bin)
granule_PM = []
docked_cutoff = []
for i, center in enumerate(centers_upd):
idx = [round(c) for c in center]
try:
dist = dis_pm_mask[idx[2], idx[1], idx[0]]
except IndexError:
continue
granule_PM.append(dist)
# Use fixed threshold if provided, otherwise use dynamic radius
if distance_threshold is not None:
docked_cutoff.append(distance_threshold)
else:
ves_radius = ((3 * isg_volume_upd[i]) / (4 * math.pi)) ** (1/3)
docked_cutoff.append(ves_radius)
n_docked = len([x for i,x in enumerate(granule_PM) if x < docked_cutoff[i]])
total_granules = len(granule_PM)
docked_ratio = n_docked / total_granules if total_granules > 0 else 0
if show_visualization and total_granules > 0:
docked_indices = [i for i, x in enumerate(granule_PM) if x < docked_cutoff[i]]
vis_mask = np.zeros_like(mem_mask_upd, dtype=np.uint8)
for i in docked_indices:
center = centers_upd[i]
vis_mask[int(center[2])-3:int(center[2])+3,
int(center[1])-3:int(center[1])+3,
int(center[0])-3:int(center[0])+3] = 1
sel_z = min(10, mem_mask_upd.shape[0]-1)
fig = plt.figure(figsize=(15, 5))
ax1 = fig.add_subplot(1, 3, 1)
ax1.imshow(ves_label[sel_z], cmap='gray')
ax1.set_title(f'All Granules (Total: {total_granules})')
ax1.axis('off')
ax2 = fig.add_subplot(1, 3, 2)
ax2.imshow(mem_mask_upd[sel_z], cmap='gray')
ax2.set_title('Plasma Membrane Region')
ax2.axis('off')
ax3 = fig.add_subplot(1, 3, 3)
ax3.imshow(vis_mask[sel_z], cmap='hot')
ax3.set_title(f'Docked Granules (Count: {n_docked})')
ax3.axis('off')
plt.tight_layout()
# plt.show()
return {
'total_granules': total_granules,
'docked_granules': n_docked,
'docked_ratio': docked_ratio
}
[docs]
def calculate_distributional_similarity(rdf1, rdf2, alpha=0.05):
"""
Calculate Pearson correlation between two RDF distributions to measure distributional similarity.
This function compares organelle distributions between two cell images from different
modalities (e.g., SIM vs SXT) using Pearson correlation coefficient.
Parameters
----------
rdf1 : np.ndarray
RDF values from modality 1 (e.g., SIM)
rdf2 : np.ndarray
RDF values from modality 2 (e.g., SXT)
alpha : float, optional
Significance level for p-value. Default: 0.05
Returns
-------
dict
Dictionary containing:
- 'pearson_r' (float): Pearson correlation coefficient (-1 to 1)
- 'p_value' (float): Statistical significance
- 'significant' (bool): Whether correlation is significant (p < alpha)
Example
-------
>>> import numpy as np
>>> from ipa.analysis import calculate_distributional_similarity
>>> sim_rdf = np.array([0.8, 1.0, 1.2, 1.1, 0.9, 1.0, 1.3, 1.5])
>>> sxt_rdf = np.array([0.9, 1.1, 1.1, 1.0, 1.0, 1.1, 1.2, 1.4])
>>> result = calculate_distributional_similarity(sim_rdf, sxt_rdf)
>>> print(f"Pearson r: {result['pearson_r']:.2f}")
>>> print(f"P-value: {result['p_value']:.4f}")
>>> print(f"Significant: {result['significant']}")
"""
from scipy import stats
if len(rdf1) != len(rdf2):
raise ValueError("RDF arrays must have same length")
if len(rdf1) < 3:
raise ValueError("Need at least 3 data points for correlation")
# Calculate Pearson correlation
r, p = stats.pearsonr(rdf1, rdf2)
return {
'pearson_r': r,
'p_value': p,
'significant': p < alpha
}
[docs]
def analyze_anchored_organelle_angles(filament_coords_dict, pm_mask,
distance_threshold_nm=120,
voxel_size_nm=(1, 1, 1)):
"""
Analyze angles of anchored tubular organelles (e.g., F-actin) relative to PM.
Anchored organelles are defined by their distance to the PM. For such organelles,
the angle is calculated between pairs of nearest tubular organelles anchored to the PM.
Parameters
----------
filament_coords_dict : dict
Dictionary of filament coordinates {filament_id: [[x,y,z], ...]}
pm_mask : np.ndarray
Plasma membrane binary mask
distance_threshold_nm : float, optional
Distance threshold (nm) to define anchored organelles. Default: 120
voxel_size_nm : tuple, optional
Voxel dimensions (nm). Default: (1, 1, 1)
Returns
-------
dict
Dictionary containing:
- 'anchored_count' (int): Number of anchored filaments
- 'angles' (list): List of angles between anchored filament pairs (degrees)
- 'mean_angle' (float): Mean angle
- 'std_angle' (float): Standard deviation of angles
Example
-------
>>> import numpy as np
>>> from ipa.analysis import analyze_anchored_organelle_angles
>>> # Create sample filament data
>>> filaments = {
... 'filament_1': [[10, 20, 30], [11, 21, 31], [12, 22, 32]],
... 'filament_2': [[15, 25, 35], [16, 26, 36], [17, 27, 37]]
... }
>>> pm_mask = create_pm_mask() # Your PM mask
>>> results = analyze_anchored_organelle_angles(
... filaments, pm_mask,
... distance_threshold_nm=120,
... voxel_size_nm=(31.3, 31.3, 90.9) # SIM voxel size
... )
>>> print(f"Anchored filaments: {results['anchored_count']}")
>>> print(f"Mean angle: {results['mean_angle']:.2f} degrees")
"""
from scipy.spatial import cKDTree
# Get PM coordinates
pm_coords = np.array(np.where(pm_mask > 0)).T
if len(pm_coords) == 0:
raise ValueError("PM mask is empty")
# Convert voxel size to numpy array
voxel_size = np.array(voxel_size_nm)
# Build KD-tree for PM
pm_tree = cKDTree(pm_coords)
# Find anchored filaments
anchored_filaments = []
anchored_vectors = []
for filament_id, coords in filament_coords_dict.items():
coords_array = np.array(coords)
# Convert to physical coordinates if needed
if np.max(voxel_size) > 10: # Likely in voxels, convert to nm
coords_nm = coords_array * voxel_size
else:
coords_nm = coords_array
# Check if any point is within threshold of PM
distances, _ = pm_tree.query(coords_nm)
min_dist = np.min(distances)
if min_dist <= distance_threshold_nm:
# This filament is anchored
anchored_filaments.append(filament_id)
# Calculate filament direction vector (from first to last point)
if len(coords_nm) >= 2:
direction = coords_nm[-1] - coords_nm[0]
norm = np.linalg.norm(direction)
if norm > 0:
direction = direction / norm
anchored_vectors.append(direction)
# Calculate angles between anchored filament pairs
angles = []
if len(anchored_vectors) >= 2:
for i in range(len(anchored_vectors)):
for j in range(i + 1, len(anchored_vectors)):
vec1 = anchored_vectors[i]
vec2 = anchored_vectors[j]
# Calculate angle using dot product
cos_angle = np.dot(vec1, vec2)
cos_angle = np.clip(cos_angle, -1.0, 1.0) # Clip for numerical stability
angle_rad = np.arccos(cos_angle)
angle_deg = np.degrees(angle_rad)
# Convert to 0-90 degree range (acute angle)
if angle_deg > 90:
angle_deg = 180 - angle_deg
angles.append(angle_deg)
# Calculate statistics
mean_angle = np.mean(angles) if len(angles) > 0 else 0.0
std_angle = np.std(angles) if len(angles) > 0 else 0.0
return {
'anchored_count': len(anchored_filaments),
'angles': angles,
'mean_angle': mean_angle,
'std_angle': std_angle
}