ParaDigMa Pipeline Orchestrator Tutorial
This tutorial demonstrates how to use the pipeline orchestrator run_paradigma(), which serves as the main entry point for running ParaDigMa analysis pipelines. The orchestrator coordinates multiple analysis steps and can process different formats of sensor data.
Overview
The run_paradigma() function is called an orchestrator because it coordinates multiple analysis steps depending on the user input. It can process:
Gait analysis: Arm swing quantification from IMU data
Tremor analysis: Tremor detection and quantification from gyroscope data
Pulse rate estimation: Pulse rate analysis from PPG data
Key Features
Multi-pipeline support: Run multiple analyses simultaneously
Flexible data input: Works with both prepared and raw sensor data
Multiple data formats: Supports Verily, Axivity, Empatica, and custom formats
Robust processing: Automatic data preparation and error handling
Data Requirements
The orchestrator accepts either:
Prepared data: Prepared according to the Data preparation tutorial
Raw data: Automatically processed (note: this feature has a limited scope)
Let’s explore different usage scenarios with examples.
Import required modules
import json
import logging
from pathlib import Path
from paradigma.constants import TimeUnit
from paradigma.load import load_data_files
from paradigma.orchestrator import run_paradigma
1. Single pipeline with prepared data
Let’s start with a simple example using prepared PPG data for pulse rate analysis.
The function load_data_files attempts to load data of any or multiple of the following formats:
‘parquet’, ‘csv’, ‘pkl’, ‘pickle’, ‘json’, ‘avro’, ‘cwa’. You can load the data in your preferred
ways, but note that the output should be of format Dict[str, pd.DataFrame]:
{
'file_1': df_1,
'file_2': df_2,
...,
'file_n': df_n
}
Alternatively, you can provide:
A single DataFrame: Will be processed with key
'df_1'A list of DataFrames: Each will get keys like
'df_1','df_2', etc.
This means ParaDigMa can run multiple files in sequence. This is useful when you have multiple files spanning a week, and you want aggregations to be computed across all files.
path_to_ppg_data = Path('../../example_data/verily/ppg')
dfs_ppg = load_data_files(
data_path=path_to_ppg_data,
file_patterns='json'
)
print(f"Loaded {len(dfs_ppg)} PPG files:")
for filename in dfs_ppg.keys():
df = dfs_ppg[filename]
print(f" - {filename}: {len(df)} samples, {len(df.columns)} columns")
print(f"\nFirst 5 rows of {list(dfs_ppg.keys())[0]}:")
dfs_ppg[list(dfs_ppg.keys())[0]].head()
Output Control
When running ParaDigMa, you can control where results are saved and what intermediate results to store:
Output Directory:
Default:
output_dirdefaults to"./output"Custom: Specify your own path like
output_dir="./my_results"No storage: Files are only saved if
save_intermediateis not empty
Store Intermediate Results:
The save_intermediate parameter accepts a list of strings:
save_intermediate=['preprocessing', 'quantification', 'aggregation']
Valid options are:
'preparation': Save prepared data'preprocessing': Save preprocessed signals'classification': Save classification results'quantification': Save quantified measures'aggregation': Save aggregated results
If save_intermediate=[] (empty list), no files are saved - results are only returned in memory.
Also, set the correct units of the time column. For all options, please check the API reference.
Logging Control
ParaDigMa uses Python’s standard logging module to provide progress updates and diagnostics. You can control the verbosity level and optionally provide a custom logger for advanced use cases.
Basic Logging Levels:
import logging
# Default - shows progress and important information
run_paradigma(..., logging_level=logging.INFO)
# Detailed - shows additional processing details
run_paradigma(..., logging_level=logging.DEBUG)
# Quiet - only warnings and errors
run_paradigma(..., logging_level=logging.WARNING)
# Silent - only errors
run_paradigma(..., logging_level=logging.ERROR)
Custom Logger (Advanced):
For full control over logging (custom formatting, multiple handlers, etc.), provide your own logger:
# Create custom logger with your configuration
custom_logger = logging.getLogger('my_analysis')
custom_logger.setLevel(logging.DEBUG)
custom_logger.addHandler(...) # Add your handlers
# Pass it to run_paradigma
run_paradigma(..., custom_logger=custom_logger)
When a custom logger is provided, the logging_level parameter is ignored.
pipeline = 'pulse_rate'
# Example 1: Using default output directory with storage
results_single_pipeline = run_paradigma(
dfs=dfs_ppg,
pipelines=pipeline,
skip_preparation=True,
time_input_unit=TimeUnit.RELATIVE_S,
save_intermediate=['quantification', 'aggregation'], # Files saved to ./output
logging_level=logging.WARNING # Only show warnings and errors
)
print(results_single_pipeline['metadata'][pipeline])
print(results_single_pipeline['aggregations'][pipeline])
results_single_pipeline['quantifications'][pipeline].head()
# Example 2: No file storage - results only in memory
results_no_storage = run_paradigma(
dfs=dfs_ppg,
pipelines=pipeline,
skip_preparation=True,
time_input_unit=TimeUnit.RELATIVE_S,
save_intermediate=[], # No files saved
logging_level=logging.WARNING # Only show warnings and errors
)
print("Results returned without file storage:")
print(f" Quantifications: {len(results_no_storage['quantifications'][pipeline])} rows")
print(f" Aggregations: {results_no_storage['aggregations'][pipeline]}")
Example: No File Storage
If you only want to work with results in memory without saving any files, use an empty save_intermediate list:
Note that run_paradigma currently does not accept accelerometer data as a supplement to the pulse
rate pipeline for signal quality analysis. If you want to do these analyses, please check out the
Pulse rate analysis
tutorial for more info.
2. Multi-pipeline with prepared data
One of the key features of the orchestrator is the ability to run multiple analysis pipelines simultaneously on the same data. This is more efficient than running them separately.
Results Structure
The multi-pipeline orchestrator returns a nested structure that organizes results by pipeline:
{
'quantifications': {
'gait': DataFrame, # Gait segment-level quantifications
'tremor': DataFrame # Tremor window-level quantifications
},
'aggregations': {
'gait': {...}, # Aggregated gait metrics
'tremor': {...} # Aggregated tremor metrics
},
'metadata': {
'gait': {...}, # Gait analysis metadata
'tremor': {...} # Tremor analysis metadata
},
'errors': [...] # List of errors encountered (empty if successful)
}
The errors list tracks any failures during processing. Each error contains:
stage: Where the error occurred (loading, preparation, pipeline_execution, aggregation)error: Error messagefile: Filename (if file-specific)pipeline: Pipeline name (if pipeline-specific)
Check for errors after processing:
if results['errors']:
print(f"Warning: {len(results['errors'])} error(s) occurred")
for error in results['errors']:
print(f" - {error['stage']}: {error['error']}")
# Load prepared IMU data
path_to_imu_data = Path('../../example_data/verily/imu')
dfs_imu = load_data_files(
data_path=path_to_imu_data,
file_patterns='json'
)
print(f"Loaded {len(dfs_imu)} IMU files:")
for filename in dfs_imu.keys():
df = dfs_imu[filename]
print(f" - {filename}: {len(df)} samples, {len(df.columns)} columns")
print(f"\nFirst 5 rows of {list(dfs_imu.keys())[0]}:")
dfs_imu[list(dfs_imu.keys())[0]].head()
# Run gait and tremor analysis on the prepared data
# Using custom output directory
results_multi_pipeline = run_paradigma(
output_dir=Path('./output_multi'),
dfs=dfs_imu, # Pre-loaded data
skip_preparation=True, # Data is already prepared
pipelines=['gait', 'tremor'], # Multiple pipelines (list format)
watch_side='left', # Required for gait analysis
save_intermediate=['quantification'], # Store quantifications only
logging_level=logging.WARNING # Only show warnings and errors
)
# Explore the results structure
print("Detailed Results Analysis:")
# Gait results
arm_swing_quantified = results_multi_pipeline['quantifications']['gait']
arm_swing_aggregates = results_multi_pipeline['aggregations']['gait']
arm_swing_meta = results_multi_pipeline['metadata']['gait']
print(f"\nArm swing quantification ({len(arm_swing_quantified)} windows):")
print(
f" Columns: {list(arm_swing_quantified.columns[:5])}... "
f"({len(arm_swing_quantified.columns)} total)"
)
print(f" Files: {arm_swing_quantified['file_key'].unique()}")
print(f"\nArm swing aggregation ({len(arm_swing_aggregates)} time ranges):")
print(f" Gait segment categories: {list(arm_swing_aggregates.keys())}")
print(f" Aggregates: {list(arm_swing_aggregates['0_10'].keys())}")
print(f" Metadata first gait segment: {arm_swing_meta[1]}")
# Tremor results
tremor_quantified = results_multi_pipeline['quantifications']['tremor']
tremor_aggregates = results_multi_pipeline['aggregations']['tremor']
tremor_meta = results_multi_pipeline['metadata']['tremor']
print(f"\nTremor quantification ({len(tremor_quantified)} windows):")
print(
f" Columns: {list(tremor_quantified.columns[:5])}... "
f"({len(tremor_quantified.columns)} total)"
)
print(f" Files: {tremor_quantified['file_key'].unique()}")
print(f"\nTremor aggregation ({len(tremor_aggregates)} time ranges):")
print(f" Aggregates: {list(tremor_aggregates.keys())}")
print(f" Metadata first tremor segment: {tremor_meta}")
3. Raw Data Processing
The orchestrator can also process raw sensor data automatically. This includes data preparation steps like format standardization, unit conversion, and orientation correction. Note that this feature has been developed on limited data examples, and therefore may not function as expected on newly observed data.
Column Mapping for Custom Data Formats
If your raw data uses different column names than ParaDigMa’s standard naming convention, use the column_mapping parameter to map your column names to the expected ones.
Standard ParaDigMa column names:
Required for all pipelines:
time: Timestamp column
For IMU pipelines (gait, tremor):
accelerometer_x,accelerometer_y,accelerometer_z: Accelerometer axesgyroscope_x,gyroscope_y,gyroscope_z: Gyroscope axes
For PPG pipeline (pulse_rate):
ppg: PPG signal
Example mapping:
column_mapping = {
'timestamp': 'time', # Your 'timestamp' column → ParaDigMa 'time' column
'acceleration_x': 'accelerometer_x', # Your 'acceleration' columns → ParaDigMa 'accelerometer' columns'
'acceleration_y': 'accelerometer_y',
'acceleration_z': 'accelerometer_z',
'rotation_x': 'gyroscope_x', # Your 'rotation' columns → ParaDigMa 'gyroscope' columns
'rotation_y': 'gyroscope_y',
'rotation_z': 'gyroscope_z',
}
path_to_raw_data = Path('../../example_data/axivity')
device_orientation = ["-x", "-y", "z"] # Sensor was worn upside-down
pipeline = 'gait'
# Working with raw data - this requires data preparation
# Using custom output directory
results_end_to_end = run_paradigma(
output_dir=Path('./output_raw'),
data_path=path_to_raw_data, # Point to data folder
skip_preparation=False, # ParaDigMa will prepare the data
pipelines=pipeline,
watch_side="left",
time_input_unit=TimeUnit.RELATIVE_S, # Specify time unit for raw data
accelerometer_units='g',
gyroscope_units='deg/s',
target_frequency=100.0,
device_orientation=device_orientation,
save_intermediate=['aggregation'], # Only save aggregations
logging_level=logging.WARNING, # Only show warnings and errors
)
print(
f"\nMetadata:\n"
f"{json.dumps(results_end_to_end['metadata'][pipeline][1], indent=2)}"
)
print(
f"\nAggregations:\n"
f"{json.dumps(results_end_to_end['aggregations'][pipeline], indent=2)}"
)
print("\nQuantifications (first 5 rows; each row represents a single arm swing):")
results_end_to_end['quantifications'][pipeline].head()
4. Auto-Segmentation for Non-Contiguous Data
When working with sensor data, you may encounter gaps or interruptions in the recording (e.g., battery died, device removed, multiple recording sessions). The orchestrator can automatically detect these gaps and split the data into contiguous segments for processing.
When to Use Auto-Segmentation
Use split_by_gaps=True when:
Your data has recording interruptions or gaps
You’re getting “Time array is not contiguous” errors
You want to process multiple recording sessions in one file
Data spans multiple days with breaks
Understanding Data Segments vs Gait Segments
Important distinction:
Data segments (
data_segment_nr): Contiguous recording chunks separated by temporal gapsCreated during data preparation
Example: 4 segments if recording had 3 interruptions
Gait segments (
gait_segment_nr): Detected gait bouts within the dataCreated during gait pipeline analysis
Example: 52 gait bouts detected across all data segments
Only applicable to gait analysis
The orchestrator will:
Detect gaps larger than
max_gap_seconds(default: 1.5 seconds)Split data into contiguous data segments
Discard segments shorter than
min_segment_seconds(default: 1.5 seconds)Add a
data_segment_nrcolumn to track which recording chunk each data point belongs toProcess each data segment independently through the pipeline
Combine results with
gait_segment_nrfor detected gait bouts (gait pipeline only)
Example: Gait-up Physilog Data with Gaps
This example uses data from a Gait-up Physilog 4 device with 3 large gaps (up to ~20 minutes). The data is already in Parquet format with standard column names, but timestamps are non-contiguous.
# Load Gait-up Physilog data with non-contiguous timestamps
path_to_physilog_data = Path('../../example_data/gait_up_physilog')
dfs_physilog = load_data_files(
data_path=path_to_physilog_data,
file_patterns='parquet'
)
print(f"Loaded {len(dfs_physilog)} Gait-up Physilog file(s):")
for filename, df in dfs_physilog.items():
print(f" - {filename}: {len(df)} samples, {len(df.columns)} columns")
print(f" Time range: {df['time'].min():.1f}s to {df['time'].max():.1f}s")
print(f" Duration: {(df['time'].max() - df['time'].min()):.1f}s")
# Check for gaps
time_diffs = df['time'].diff().dropna()
large_gaps = time_diffs[time_diffs > 1.0]
if len(large_gaps) > 0:
print(
f"Contains {len(large_gaps)} gap(s) > 1s "
f"(largest: {large_gaps.max():.1f}s)"
)
# Check for NaN values (common in real-world data)
nan_counts = df.isnull().sum()
if nan_counts.sum() > 0:
print(f"Contains {nan_counts.sum()} NaN values")
# Clean DataFrames with NaN values (after iteration to avoid SettingWithCopyWarning)
for filename in list(dfs_physilog.keys()):
df = dfs_physilog[filename]
df_clean = df.dropna().reset_index(drop=True)
if len(df_clean) < len(df):
print(
f"Dropping {len(df) - len(df_clean)} rows with NaN values "
f"from file {filename}"
)
dfs_physilog[filename] = df_clean
# Example: Processing non-contiguous data with auto-segmentation
# Data already has standard column names and units, but needs segmentation
results_with_segmentation = run_paradigma(
dfs=dfs_physilog, # Pre-loaded data dictionary
skip_preparation=False, # Need preparation to add data_segment_nr
pipelines='gait',
watch_side="left",
time_input_unit=TimeUnit.RELATIVE_S,
# Auto-segmentation parameters
split_by_gaps=True, # Enable automatic segmentation
max_gap_seconds=1.0, # Gaps > 1s create new data segment
min_segment_seconds=2.0, # Keep only data segments >= 2s
save_intermediate=[], # No file storage for demo
logging_level=logging.WARNING, # Only show warnings and errors
)
gait_results = results_with_segmentation['quantifications']['gait']
print(f"\nTotal arm swings quantified: {len(gait_results)}")
print(f"Number of gait segments: {gait_results['gait_segment_nr'].nunique()}")
if 'data_segment_nr' in gait_results.columns:
print(f"Number of data segments: {gait_results['data_segment_nr'].nunique()}")
print(f"\nColumns in output: {list(gait_results.columns)}")
gait_results.head()