ParaDigMa Pipeline Orchestrator Tutorial

This tutorial demonstrates how to use the pipeline orchestrator run_paradigma(), which serves as the main entry point for running ParaDigMa analysis pipelines. The orchestrator coordinates multiple analysis steps and can process different formats of sensor data.

Overview

The run_paradigma() function is called an orchestrator because it coordinates multiple analysis steps depending on the user input. It can process:

  • Gait analysis: Arm swing quantification from IMU data

  • Tremor analysis: Tremor detection and quantification from gyroscope data

  • Pulse rate estimation: Pulse rate analysis from PPG data

Key Features

  • Multi-pipeline support: Run multiple analyses simultaneously

  • Flexible data input: Works with both prepared and raw sensor data

  • Multiple data formats: Supports Verily, Axivity, Empatica, and custom formats

  • Robust processing: Automatic data preparation and error handling

Data Requirements

The orchestrator accepts either:

  1. Prepared data: Prepared according to the Data preparation tutorial

  2. Raw data: Automatically processed (note: this feature has a limited scope)

Let’s explore different usage scenarios with examples.

Import required modules

import json
import logging
from pathlib import Path

from paradigma.constants import TimeUnit
from paradigma.load import load_data_files
from paradigma.orchestrator import run_paradigma

1. Single pipeline with prepared data

Let’s start with a simple example using prepared PPG data for pulse rate analysis.

The function load_data_files attempts to load data of any or multiple of the following formats: ‘parquet’, ‘csv’, ‘pkl’, ‘pickle’, ‘json’, ‘avro’, ‘cwa’. You can load the data in your preferred ways, but note that the output should be of format Dict[str, pd.DataFrame]:

{
    'file_1': df_1, 
    'file_2': df_2, 
    ..., 
    'file_n': df_n
}

Alternatively, you can provide:

  • A single DataFrame: Will be processed with key 'df_1'

  • A list of DataFrames: Each will get keys like 'df_1', 'df_2', etc.

This means ParaDigMa can run multiple files in sequence. This is useful when you have multiple files spanning a week, and you want aggregations to be computed across all files.

path_to_ppg_data = Path('../../example_data/verily/ppg')

dfs_ppg = load_data_files(
    data_path=path_to_ppg_data,
    file_patterns='json'
)

print(f"Loaded {len(dfs_ppg)} PPG files:")
for filename in dfs_ppg.keys():
    df = dfs_ppg[filename]
    print(f"  - {filename}: {len(df)} samples, {len(df.columns)} columns")

print(f"\nFirst 5 rows of {list(dfs_ppg.keys())[0]}:")
dfs_ppg[list(dfs_ppg.keys())[0]].head()

Output Control

When running ParaDigMa, you can control where results are saved and what intermediate results to store:

Output Directory:

  • Default: output_dir defaults to "./output"

  • Custom: Specify your own path like output_dir="./my_results"

  • No storage: Files are only saved if save_intermediate is not empty

Store Intermediate Results:

The save_intermediate parameter accepts a list of strings:

save_intermediate=['preprocessing', 'quantification', 'aggregation']

Valid options are:

  • 'preparation': Save prepared data

  • 'preprocessing': Save preprocessed signals

  • 'classification': Save classification results

  • 'quantification': Save quantified measures

  • 'aggregation': Save aggregated results

If save_intermediate=[] (empty list), no files are saved - results are only returned in memory.

Also, set the correct units of the time column. For all options, please check the API reference.

Logging Control

ParaDigMa uses Python’s standard logging module to provide progress updates and diagnostics. You can control the verbosity level and optionally provide a custom logger for advanced use cases.

Basic Logging Levels:

import logging

# Default - shows progress and important information
run_paradigma(..., logging_level=logging.INFO)

# Detailed - shows additional processing details  
run_paradigma(..., logging_level=logging.DEBUG)

# Quiet - only warnings and errors
run_paradigma(..., logging_level=logging.WARNING)

# Silent - only errors
run_paradigma(..., logging_level=logging.ERROR)

Custom Logger (Advanced):

For full control over logging (custom formatting, multiple handlers, etc.), provide your own logger:

# Create custom logger with your configuration
custom_logger = logging.getLogger('my_analysis')
custom_logger.setLevel(logging.DEBUG)
custom_logger.addHandler(...)  # Add your handlers

# Pass it to run_paradigma
run_paradigma(..., custom_logger=custom_logger)

When a custom logger is provided, the logging_level parameter is ignored.

pipeline = 'pulse_rate'

# Example 1: Using default output directory with storage
results_single_pipeline = run_paradigma(
    dfs=dfs_ppg,
    pipelines=pipeline,
    skip_preparation=True,
    time_input_unit=TimeUnit.RELATIVE_S,
    save_intermediate=['quantification', 'aggregation'],  # Files saved to ./output
    logging_level=logging.WARNING  # Only show warnings and errors
)

print(results_single_pipeline['metadata'][pipeline])
print(results_single_pipeline['aggregations'][pipeline])
results_single_pipeline['quantifications'][pipeline].head()
# Example 2: No file storage - results only in memory
results_no_storage = run_paradigma(
    dfs=dfs_ppg,
    pipelines=pipeline,
    skip_preparation=True,
    time_input_unit=TimeUnit.RELATIVE_S,
    save_intermediate=[],  # No files saved
    logging_level=logging.WARNING  # Only show warnings and errors
)

print("Results returned without file storage:")
print(f"  Quantifications: {len(results_no_storage['quantifications'][pipeline])} rows")
print(f"  Aggregations: {results_no_storage['aggregations'][pipeline]}")

Example: No File Storage

If you only want to work with results in memory without saving any files, use an empty save_intermediate list:

Note that run_paradigma currently does not accept accelerometer data as a supplement to the pulse rate pipeline for signal quality analysis. If you want to do these analyses, please check out the Pulse rate analysis tutorial for more info.

2. Multi-pipeline with prepared data

One of the key features of the orchestrator is the ability to run multiple analysis pipelines simultaneously on the same data. This is more efficient than running them separately.

Results Structure

The multi-pipeline orchestrator returns a nested structure that organizes results by pipeline:

{
    'quantifications': {
        'gait': DataFrame,      # Gait segment-level quantifications
        'tremor': DataFrame     # Tremor window-level quantifications
    },
    'aggregations': {
        'gait': {...},         # Aggregated gait metrics
        'tremor': {...}        # Aggregated tremor metrics  
    },
    'metadata': {
        'gait': {...},         # Gait analysis metadata
        'tremor': {...}        # Tremor analysis metadata
    },
    'errors': [...]            # List of errors encountered (empty if successful)
}

The errors list tracks any failures during processing. Each error contains:

  • stage: Where the error occurred (loading, preparation, pipeline_execution, aggregation)

  • error: Error message

  • file: Filename (if file-specific)

  • pipeline: Pipeline name (if pipeline-specific)

Check for errors after processing:

if results['errors']:
    print(f"Warning: {len(results['errors'])} error(s) occurred")
    for error in results['errors']:
        print(f"  - {error['stage']}: {error['error']}")
# Load prepared IMU data
path_to_imu_data = Path('../../example_data/verily/imu')

dfs_imu = load_data_files(
    data_path=path_to_imu_data,
    file_patterns='json'
)

print(f"Loaded {len(dfs_imu)} IMU files:")
for filename in dfs_imu.keys():
    df = dfs_imu[filename]
    print(f"  - {filename}: {len(df)} samples, {len(df.columns)} columns")

print(f"\nFirst 5 rows of {list(dfs_imu.keys())[0]}:")
dfs_imu[list(dfs_imu.keys())[0]].head()
# Run gait and tremor analysis on the prepared data
# Using custom output directory
results_multi_pipeline = run_paradigma(
    output_dir=Path('./output_multi'),
    dfs=dfs_imu,                        # Pre-loaded data
    skip_preparation=True,              # Data is already prepared
    pipelines=['gait', 'tremor'],       # Multiple pipelines (list format)
    watch_side='left',                  # Required for gait analysis
    save_intermediate=['quantification'],  # Store quantifications only
    logging_level=logging.WARNING  # Only show warnings and errors
)
# Explore the results structure
print("Detailed Results Analysis:")

# Gait results
arm_swing_quantified = results_multi_pipeline['quantifications']['gait']
arm_swing_aggregates = results_multi_pipeline['aggregations']['gait']
arm_swing_meta = results_multi_pipeline['metadata']['gait']
print(f"\nArm swing quantification ({len(arm_swing_quantified)} windows):")
print(
    f"   Columns: {list(arm_swing_quantified.columns[:5])}... "
    f"({len(arm_swing_quantified.columns)} total)"
)
print(f"   Files: {arm_swing_quantified['file_key'].unique()}")

print(f"\nArm swing aggregation ({len(arm_swing_aggregates)} time ranges):")
print(f"   Gait segment categories: {list(arm_swing_aggregates.keys())}")
print(f"   Aggregates: {list(arm_swing_aggregates['0_10'].keys())}")
print(f"   Metadata first gait segment: {arm_swing_meta[1]}")

# Tremor results
tremor_quantified = results_multi_pipeline['quantifications']['tremor']
tremor_aggregates = results_multi_pipeline['aggregations']['tremor']
tremor_meta = results_multi_pipeline['metadata']['tremor']
print(f"\nTremor quantification ({len(tremor_quantified)} windows):")
print(
    f"   Columns: {list(tremor_quantified.columns[:5])}... "
    f"({len(tremor_quantified.columns)} total)"
)
print(f"   Files: {tremor_quantified['file_key'].unique()}")

print(f"\nTremor aggregation ({len(tremor_aggregates)} time ranges):")
print(f"   Aggregates: {list(tremor_aggregates.keys())}")
print(f"   Metadata first tremor segment: {tremor_meta}")

3. Raw Data Processing

The orchestrator can also process raw sensor data automatically. This includes data preparation steps like format standardization, unit conversion, and orientation correction. Note that this feature has been developed on limited data examples, and therefore may not function as expected on newly observed data.

Column Mapping for Custom Data Formats

If your raw data uses different column names than ParaDigMa’s standard naming convention, use the column_mapping parameter to map your column names to the expected ones.

Standard ParaDigMa column names:

  • Required for all pipelines:

    • time: Timestamp column

  • For IMU pipelines (gait, tremor):

    • accelerometer_x, accelerometer_y, accelerometer_z: Accelerometer axes

    • gyroscope_x, gyroscope_y, gyroscope_z: Gyroscope axes

  • For PPG pipeline (pulse_rate):

    • ppg: PPG signal

Example mapping:

column_mapping = {
    'timestamp': 'time',                      # Your 'timestamp' column → ParaDigMa 'time' column
    'acceleration_x': 'accelerometer_x',      # Your 'acceleration' columns → ParaDigMa 'accelerometer' columns'
    'acceleration_y': 'accelerometer_y',
    'acceleration_z': 'accelerometer_z',
    'rotation_x': 'gyroscope_x',              # Your 'rotation' columns → ParaDigMa 'gyroscope' columns
    'rotation_y': 'gyroscope_y',
    'rotation_z': 'gyroscope_z',
}
path_to_raw_data = Path('../../example_data/axivity')

device_orientation = ["-x", "-y", "z"]      # Sensor was worn upside-down
pipeline = 'gait'

# Working with raw data - this requires data preparation
# Using custom output directory
results_end_to_end = run_paradigma(
    output_dir=Path('./output_raw'),
    data_path=path_to_raw_data,             # Point to data folder
    skip_preparation=False,                 # ParaDigMa will prepare the data
    pipelines=pipeline,
    watch_side="left",
    time_input_unit=TimeUnit.RELATIVE_S,    # Specify time unit for raw data
    accelerometer_units='g',
    gyroscope_units='deg/s',
    target_frequency=100.0,
    device_orientation=device_orientation,
    save_intermediate=['aggregation'],      # Only save aggregations
    logging_level=logging.WARNING,  # Only show warnings and errors
)

print(
    f"\nMetadata:\n"
    f"{json.dumps(results_end_to_end['metadata'][pipeline][1], indent=2)}"
)
print(
    f"\nAggregations:\n"
    f"{json.dumps(results_end_to_end['aggregations'][pipeline], indent=2)}"
)
print("\nQuantifications (first 5 rows; each row represents a single arm swing):")
results_end_to_end['quantifications'][pipeline].head()

4. Auto-Segmentation for Non-Contiguous Data

When working with sensor data, you may encounter gaps or interruptions in the recording (e.g., battery died, device removed, multiple recording sessions). The orchestrator can automatically detect these gaps and split the data into contiguous segments for processing.

When to Use Auto-Segmentation

Use split_by_gaps=True when:

  • Your data has recording interruptions or gaps

  • You’re getting “Time array is not contiguous” errors

  • You want to process multiple recording sessions in one file

  • Data spans multiple days with breaks

Understanding Data Segments vs Gait Segments

Important distinction:

  • Data segments (data_segment_nr): Contiguous recording chunks separated by temporal gaps

    • Created during data preparation

    • Example: 4 segments if recording had 3 interruptions

  • Gait segments (gait_segment_nr): Detected gait bouts within the data

    • Created during gait pipeline analysis

    • Example: 52 gait bouts detected across all data segments

    • Only applicable to gait analysis

The orchestrator will:

  1. Detect gaps larger than max_gap_seconds (default: 1.5 seconds)

  2. Split data into contiguous data segments

  3. Discard segments shorter than min_segment_seconds (default: 1.5 seconds)

  4. Add a data_segment_nr column to track which recording chunk each data point belongs to

  5. Process each data segment independently through the pipeline

  6. Combine results with gait_segment_nr for detected gait bouts (gait pipeline only)

Example: Gait-up Physilog Data with Gaps

This example uses data from a Gait-up Physilog 4 device with 3 large gaps (up to ~20 minutes). The data is already in Parquet format with standard column names, but timestamps are non-contiguous.

# Load Gait-up Physilog data with non-contiguous timestamps
path_to_physilog_data = Path('../../example_data/gait_up_physilog')

dfs_physilog = load_data_files(
    data_path=path_to_physilog_data,
    file_patterns='parquet'
)

print(f"Loaded {len(dfs_physilog)} Gait-up Physilog file(s):")
for filename, df in dfs_physilog.items():
    print(f"  - {filename}: {len(df)} samples, {len(df.columns)} columns")
    print(f"    Time range: {df['time'].min():.1f}s to {df['time'].max():.1f}s")
    print(f"    Duration: {(df['time'].max() - df['time'].min()):.1f}s")

    # Check for gaps
    time_diffs = df['time'].diff().dropna()
    large_gaps = time_diffs[time_diffs > 1.0]
    if len(large_gaps) > 0:
        print(
            f"Contains {len(large_gaps)} gap(s) > 1s "
            f"(largest: {large_gaps.max():.1f}s)"
        )

    # Check for NaN values (common in real-world data)
    nan_counts = df.isnull().sum()
    if nan_counts.sum() > 0:
        print(f"Contains {nan_counts.sum()} NaN values")

# Clean DataFrames with NaN values (after iteration to avoid SettingWithCopyWarning)
for filename in list(dfs_physilog.keys()):
    df = dfs_physilog[filename]
    df_clean = df.dropna().reset_index(drop=True)
    if len(df_clean) < len(df):
        print(
            f"Dropping {len(df) - len(df_clean)} rows with NaN values "
            f"from file {filename}"
        )
    dfs_physilog[filename] = df_clean
# Example: Processing non-contiguous data with auto-segmentation
# Data already has standard column names and units, but needs segmentation

results_with_segmentation = run_paradigma(
    dfs=dfs_physilog,                     # Pre-loaded data dictionary
    skip_preparation=False,               # Need preparation to add data_segment_nr
    pipelines='gait',
    watch_side="left",
    time_input_unit=TimeUnit.RELATIVE_S,
    # Auto-segmentation parameters
    split_by_gaps=True,                   # Enable automatic segmentation
    max_gap_seconds=1.0,                  # Gaps > 1s create new data segment
    min_segment_seconds=2.0,              # Keep only data segments >= 2s
    save_intermediate=[],                 # No file storage for demo
    logging_level=logging.WARNING,  # Only show warnings and errors
)

gait_results = results_with_segmentation['quantifications']['gait']

print(f"\nTotal arm swings quantified: {len(gait_results)}")
print(f"Number of gait segments: {gait_results['gait_segment_nr'].nunique()}")
if 'data_segment_nr' in gait_results.columns:
    print(f"Number of data segments: {gait_results['data_segment_nr'].nunique()}")
print(f"\nColumns in output: {list(gait_results.columns)}")
gait_results.head()