import os
import sys
import json
import numpy as np
import mrcfile
from typing import Dict, List, Tuple
import pandas as pd
import matplotlib
# matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
import scipy.ndimage as ndi
from skimage import measure, morphology
from skimage import segmentation
import json
# Standard package imports replacing sys.path.append
from .common.parser import arg
from .common import preprocess, distances_generator, outplot
[docs]
def calculate_tubular_length(skeleton_mask, voxel_size=(1.0, 1.0, 1.0), return_individual=False):
"""
Calculate the total length of tubular organelles by summing the distances
between adjacent skeleton voxels.
Parameters
----------
skeleton_mask : np.ndarray
3D binary mask representing the skeleton of the tubular structure.
voxel_size : tuple
Physical size of each voxel (z, y, x) in nm or um.
return_individual : bool
If True, also returns a list of lengths for each individual connected component.
NOTE: This is computationally expensive for large masks as it performs BFS on coordinates.
Returns
-------
float or tuple
Total length if return_individual is False.
(Total length, List of individual lengths) if return_individual is True.
"""
from scipy.spatial import KDTree
# Get coordinates of all skeleton voxels
coords = np.array(np.where(skeleton_mask > 0)).T
if len(coords) < 2:
return 0.0 if not return_individual else (0.0, [])
# Convert voxel coordinates to physical coordinates
phys_coords = coords * np.array(voxel_size)
# Use KDTree to find nearest neighbors and estimate path length
tree = KDTree(phys_coords)
# Find the nearest neighbor for each point (k=2 because k=1 is the point itself)
distances, _ = tree.query(phys_coords, k=2)
# The second column contains the distance to the nearest actual neighbor
nn_distances = distances[:, 1]
# Sum all unique connections. Since each connection is counted twice (A->B and B->A),
# we divide by 2.
total_length = np.sum(nn_distances) / 2.0
if not return_individual:
return total_length
# --- Individual Length Calculation (Computationally Intensive) ---
# To avoid labeling the entire huge array, we perform BFS on the coordinate set
individual_lengths = []
visited = np.zeros(len(coords), dtype=bool)
# Build a local graph using KDTree for connectivity check
# We consider points connected if they are within a small physical distance threshold
max_step_dist = np.max(voxel_size) * 1.5
for i in range(len(coords)):
if visited[i]:
continue
# Start a new component
queue = [i]
visited[i] = True
comp_indices = [i]
head = 0
while head < len(queue):
curr_idx = queue[head]
head += 1
# Find neighbors within threshold
nearby_indices = tree.query_ball_point(phys_coords[curr_idx], r=max_step_dist)
for n_idx in nearby_indices:
if not visited[n_idx]:
visited[n_idx] = True
queue.append(n_idx)
comp_indices.append(n_idx)
# Calculate length for this component
if len(comp_indices) > 1:
comp_phys = phys_coords[comp_indices]
comp_tree = KDTree(comp_phys)
comp_dists, _ = comp_tree.query(comp_phys, k=2)
comp_len = np.sum(comp_dists[:, 1]) / 2.0
individual_lengths.append(comp_len)
else:
individual_lengths.append(0.0)
return total_length, individual_lengths
# print("Successfully imported original modules")
def save_actin_vectors(actin_vectors: List, output_file: str):
"""
Save actin vector data to JSON file
Converts actin vector data to serializable format and saves it.
Args:
actin_vectors (List): Actin vector data list
output_file (str): Output file path
"""
actin_vectors_save = []
for filament in actin_vectors:
filament_save = []
for vector in filament:
if isinstance(vector, np.ndarray):
filament_save.append(vector.tolist())
else:
filament_save.append(list(vector))
actin_vectors_save.append(filament_save)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(actin_vectors_save, f, indent=2)
[docs]
def actin_to_actin_analysis(data_id: str, mask_file: str, actin_file: str, config: Dict) -> Dict:
"""
Calculate distance analysis between actin filaments
This function analyzes spatial relationships between actin filaments, calculates
shortest distances between filamentous structures, and generates corresponding
statistical information and visualization results.
Args:
data_id (str): Dataset identifier for naming output files
mask_file (str): Vesicle mask file path (.mrc format)
actin_file (str): Actin point coordinates file path (.json format)
config (Dict): Configuration parameters dictionary containing:
- voxel_size_xyz (List[float]): Voxel size [x, y, z] in Angstroms
- shift_bias (List[float]): Coordinate offset [x, y, z]
- visualization (bool): Whether to generate visualization charts
- save_intermediate (bool): Whether to save intermediate result files
- output_dir (str, optional): Output directory path
Returns:
Dict: Analysis result dictionary containing:
- data_id (str): Dataset identifier
- total_actin_filaments (int): Total number of actin filaments
- total_distances (int): Total number of calculated distances
- distance_stats (Dict): Distance statistics (mean, std, min/max, etc.)
- status (str): Analysis status ('success' or 'error')
- output_directory (str): Output directory path
- vector_file (str, optional): Vector data file path
- distance_file (str, optional): Distance data file path
- plot_file (str, optional): Visualization chart file path
Raises:
FileNotFoundError: Raised when input files do not exist
ImportError: Raised when required modules are not available
"""
results = {}
# 1. Load data
print(f"Loading data for {data_id}...")
# Load vesicle mask
if not os.path.exists(mask_file):
raise FileNotFoundError(f"Mask file not found: {mask_file}")
with mrcfile.open(mask_file, permissive=True) as mrc:
vesicle_mask = mrc.data
# Load actin points
if not os.path.exists(actin_file):
raise FileNotFoundError(f"JSON file not found: {actin_file}")
with open(actin_file, 'r') as f:
actin_data = json.load(f)
# 2. Get configuration parameters
shift_bias = config.get('shift_bias', [0.0, 0.0, 0.0])
voxel_size_xyz = config.get('voxel_size_xyz', [1.0, 1.0, 1.0])
print(f"Image size: {vesicle_mask.shape}")
print(f"Voxel size: {voxel_size_xyz}")
# 3. Apply offset and calculate distances
print("Calculating actin-to-actin distances...")
imgsize = vesicle_mask.shape
# Try to use original distances_generator functions
if distances_generator is not None:
# Apply offset
if hasattr(distances_generator, 'shift_bias'):
actin_data_shifted = distances_generator.shift_bias(actin_data, shift_bias)
else:
actin_data_shifted = actin_data
print("Warning: shift_bias function not available, using original data")
# Use original functions for calculation
actin_vectors = distances_generator.actin_to_actin_distance(actin_data_shifted, imgsize, voxel_size_xyz)
distances = distances_generator.actin_to_actin_distance2(actin_data_shifted, imgsize, voxel_size_xyz)
print(f"Calculated using original functions: {len(actin_vectors)} filaments, {len(distances)} distances")
else:
raise ImportError("distances_generator not available")
# 4. Create output directory with data_id
base_output_dir = config.get('output_dir', os.path.dirname(mask_file))
data_id_output_dir = os.path.join(base_output_dir, data_id)
os.makedirs(data_id_output_dir, exist_ok=True)
# 5. Process results
results['data_id'] = data_id
results['total_actin_filaments'] = len(actin_vectors)
results['total_distances'] = len(distances)
results['image_shape'] = list(vesicle_mask.shape)
results['output_directory'] = data_id_output_dir
if distances:
results['distance_stats'] = {
'count': len(distances),
'mean': float(np.mean(distances)),
'std': float(np.std(distances)),
'min': float(np.min(distances)),
'max': float(np.max(distances)),
'median': float(np.median(distances)),
'percentile_25': float(np.percentile(distances, 25)),
'percentile_75': float(np.percentile(distances, 75))
}
print(f"Distance statistics: {results['distance_stats']}")
# 6. Save intermediate results
if config.get('save_intermediate', False):
# Save vector data
if actin_vectors:
vector_file = os.path.join(data_id_output_dir, f"{data_id}_actin_vectors.json")
save_actin_vectors(actin_vectors, vector_file)
results['vector_file'] = vector_file
print(f"Saved vectors to: {vector_file}")
# Save distance data
if distances:
distance_file = os.path.join(data_id_output_dir, f"{data_id}_distances.csv")
df = pd.DataFrame({'Distance': distances})
df.to_csv(distance_file, index=False)
results['distance_file'] = distance_file
print(f"Saved distances to: {distance_file}")
# 7. Generate visualization
if config.get('visualization', False) and distances:
print("Generating visualization...")
# Use original outplot.vis_actin_surround_actin function
vector_count = sum(len(actin) for actin in actin_vectors) if actin_vectors else 0
outplot.vis_actin_surround_actin(actin_vectors, data_id, len(actin_vectors), vector_count)
plot_file = os.path.join(data_id_output_dir, f"{data_id}_actin_analysis.png")
plt.show()
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
plt.close()
results['plot_file'] = plot_file
print(f"Visualization saved using original function: {plot_file}")
elif config.get('visualization', False):
print("Warning: No distances to visualize")
results['status'] = 'success'
print(f"Analysis completed successfully for {data_id}")
return results
[docs]
def actin_angle_distance_pair_enhanced_analysis(data_id: str, actin_file: str, vesicle_file: str, config: Dict) -> Dict:
"""
Enhanced Actin angle-distance pair analysis function with custom actinangle2dist implementation
Args:
data_id: Data ID for the analysis
actin_file: Path to actin points file (.json)
vesicle_file: Path to vesicle mask file (.mrc)
config: Configuration parameters dictionary
Returns:
Dict: Dictionary containing analysis results
"""
results = {}
# 1. Load data
print(f"Loading data for enhanced actin angle-distance analysis: {data_id}...")
# Load actin points
if not os.path.exists(actin_file):
raise FileNotFoundError(f"Actin file not found: {actin_file}")
with open(actin_file, 'r') as f:
actin_data = json.load(f)
# Load vesicle mask
if not os.path.exists(vesicle_file):
raise FileNotFoundError(f"Vesicle file not found: {vesicle_file}")
with mrcfile.open(vesicle_file, permissive=True) as mrc:
vesicle_mask = mrc.data
print(f"Actin filaments count: {len(actin_data)}")
print(f"Vesicle mask shape: {vesicle_mask.shape}")
# 2. Get configuration parameters
shift_bias = config.get('shift_bias', [0.0, 0.0, 0.0])
voxel_size_xyz = config.get('voxel_size_xyz', [1.0, 1.0, 1.0])
# 3. Calculate angle-distance pairs using enhanced algorithm
print("Calculating actin angle-distance pairs using enhanced algorithm...")
if distances_generator is not None:
# Apply offset
actin_data_shifted = distances_generator.shift_bias(actin_data, shift_bias)
# Calculate angle-distance pairs using custom function
distances, angles = actinangle2dist_enhanced(actin_data_shifted, vesicle_mask.shape, voxel_size_xyz)
print(f"Calculated using enhanced function: {len(distances)} distance-angle pairs")
else:
raise ImportError("distances_generator not available")
# 4. Create output directory with data_id
base_output_dir = config.get('output_dir', os.path.dirname(actin_file))
data_id_output_dir = os.path.join(base_output_dir, data_id)
os.makedirs(data_id_output_dir, exist_ok=True)
# 5. Process results
results['data_id'] = data_id
results['total_actin_filaments'] = len(actin_data)
results['total_pairs'] = len(distances)
results['output_directory'] = data_id_output_dir
if distances and angles:
results['distance_stats'] = {
'count': len(distances),
'mean': float(np.mean(distances)),
'std': float(np.std(distances)),
'min': float(np.min(distances)),
'max': float(np.max(distances)),
'median': float(np.median(distances)),
'percentile_25': float(np.percentile(distances, 25)),
'percentile_75': float(np.percentile(distances, 75))
}
results['angle_stats'] = {
'count': len(angles),
'mean': float(np.mean(angles)),
'std': float(np.std(angles)),
'min': float(np.min(angles)),
'max': float(np.max(angles)),
'median': float(np.median(angles)),
'percentile_25': float(np.percentile(angles, 25)),
'percentile_75': float(np.percentile(angles, 75))
}
print(f"Distance statistics: {results['distance_stats']}")
print(f"Angle statistics: {results['angle_stats']}")
# 6. Save intermediate results
if config.get('save_intermediate', False):
if distances and angles:
# Save distance-angle pairs
pair_file = os.path.join(data_id_output_dir, f"{data_id}_actin_angle_distance_pairs.csv")
pair_df = pd.DataFrame({'Distances': distances, 'Angles': angles})
pair_df.to_csv(pair_file, index=False)
results['pair_file'] = pair_file
print(f"Saved angle-distance pairs to: {pair_file}")
# 7. Generate visualization
if config.get('visualization', False) and distances and angles:
print("Generating visualization...")
# Check if we have valid data to plot
if len(distances) > 0:
# Use original plotting function
if outplot is not None and hasattr(outplot, 'vis_actindistangle'):
outplot.vis_actindistangle(distances, angles, data_id, len(actin_data), len(distances))
plot_file = os.path.join(data_id_output_dir, f"{data_id}_actin_angle_distance_analysis.png")
plt.show()
plt.savefig(plot_file, dpi=300, bbox_inches='tight')
plt.close()
results['plot_file'] = plot_file
print(f"Visualization saved using original function: {plot_file}")
else:
print(f"Warning: {data_id} only has one or no filaments, cannot generate angle-distance plot")
elif config.get('visualization', False):
print("Warning: No distance-angle pairs to visualize")
results['status'] = 'success'
print(f"Enhanced actin angle-distance analysis completed successfully for {data_id}")
return results
#---------------
# sim