{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# ParaDigMa Pipeline Orchestrator Tutorial\n", "\n", "This tutorial demonstrates how to use the **pipeline orchestrator** `run_paradigma()`, which serves as the main entry point for running ParaDigMa analysis pipelines. The orchestrator coordinates multiple analysis steps and can process different formats of sensor data." ] }, { "cell_type": "markdown", "id": "1", "metadata": {}, "source": [ "## Overview\n", "\n", "The `run_paradigma()` function is called an _orchestrator_ because it coordinates multiple analysis steps depending on the user input. It can process:\n", "\n", "- **Gait analysis**: Arm swing quantification from IMU data\n", "- **Tremor analysis**: Tremor detection and quantification from gyroscope data \n", "- **Pulse rate estimation**: Pulse rate analysis from PPG data\n", "\n", "### Key Features\n", "\n", "- **Multi-pipeline support**: Run multiple analyses simultaneously\n", "- **Flexible data input**: Works with both prepared and raw sensor data\n", "- **Multiple data formats**: Supports Verily, Axivity, Empatica, and custom formats\n", "- **Robust processing**: Automatic data preparation and error handling\n", "\n", "### Data Requirements\n", "\n", "The orchestrator accepts either:\n", "1. **Prepared data**: Prepared according to the [Data preparation tutorial](https://biomarkersparkinson.github.io/paradigma/tutorials/data_preparation.html)\n", "2. **Raw data**: Automatically processed (note: this feature has a limited scope)\n", "\n", "Let's explore different usage scenarios with examples." ] }, { "cell_type": "markdown", "id": "2", "metadata": {}, "source": [ "## Import required modules" ] }, { "cell_type": "code", "execution_count": null, "id": "3", "metadata": {}, "outputs": [], "source": [ "import json\n", "import logging\n", "from pathlib import Path\n", "\n", "from paradigma.constants import TimeUnit\n", "from paradigma.load import load_data_files\n", "from paradigma.orchestrator import run_paradigma" ] }, { "cell_type": "markdown", "id": "4", "metadata": {}, "source": [ "## 1. Single pipeline with prepared data\n", "\n", "Let's start with a simple example using prepared PPG data for pulse rate analysis. \n", "\n", "The function `load_data_files` attempts to load data of any or multiple of the following formats: \n", "'parquet', 'csv', 'pkl', 'pickle', 'json', 'avro', 'cwa'. You can load the data in your preferred \n", "ways, but note that the output should be of format `Dict[str, pd.DataFrame]`:\n", "```python\n", "{\n", " 'file_1': df_1, \n", " 'file_2': df_2, \n", " ..., \n", " 'file_n': df_n\n", "}\n", "```\n", "\n", "Alternatively, you can provide:\n", "- A **single DataFrame**: Will be processed with key `'df_1'`\n", "- A **list of DataFrames**: Each will get keys like `'df_1'`, `'df_2'`, etc.\n", "\n", "This means ParaDigMa can run multiple files in sequence. This is useful when you have multiple files\n", "spanning a week, and you want aggregations to be computed across all files." ] }, { "cell_type": "code", "execution_count": null, "id": "5", "metadata": {}, "outputs": [], "source": [ "path_to_ppg_data = Path('../../example_data/verily/ppg')\n", "\n", "dfs_ppg = load_data_files(\n", " data_path=path_to_ppg_data,\n", " file_patterns='json'\n", ")\n", "\n", "print(f\"Loaded {len(dfs_ppg)} PPG files:\")\n", "for filename in dfs_ppg.keys():\n", " df = dfs_ppg[filename]\n", " print(f\" - {filename}: {len(df)} samples, {len(df.columns)} columns\")\n", "\n", "print(f\"\\nFirst 5 rows of {list(dfs_ppg.keys())[0]}:\")\n", "dfs_ppg[list(dfs_ppg.keys())[0]].head()" ] }, { "cell_type": "markdown", "id": "6", "metadata": {}, "source": [ "### Output Control\n", "\n", "When running ParaDigMa, you can control where results are saved and what intermediate results to store:\n", "\n", "**Output Directory:**\n", "- Default: `output_dir` defaults to `\"./output\"` \n", "- Custom: Specify your own path like `output_dir=\"./my_results\"`\n", "- No storage: Files are only saved if `save_intermediate` is not empty\n", "\n", "**Store Intermediate Results:**\n", "\n", "The `save_intermediate` parameter accepts a list of strings:\n", "```python\n", "save_intermediate=['preprocessing', 'quantification', 'aggregation']\n", "```\n", "\n", "Valid options are:\n", "- `'preparation'`: Save prepared data\n", "- `'preprocessing'`: Save preprocessed signals\n", "- `'classification'`: Save classification results\n", "- `'quantification'`: Save quantified measures\n", "- `'aggregation'`: Save aggregated results\n", "\n", "If `save_intermediate=[]` (empty list), **no files are saved** - results are only returned in memory.\n", "\n", "Also, set the correct units of the `time` column. For all options, please check [the API reference](https://biomarkersparkinson.github.io/paradigma/autoapi/paradigma/constants/index.html#paradigma.constants.TimeUnit)." ] }, { "cell_type": "markdown", "id": "7", "metadata": {}, "source": [ "### Logging Control\n", "\n", "ParaDigMa uses Python's standard `logging` module to provide progress updates and diagnostics. You can control the verbosity level and optionally provide a custom logger for advanced use cases.\n", "\n", "**Basic Logging Levels:**\n", "\n", "```python\n", "import logging\n", "\n", "# Default - shows progress and important information\n", "run_paradigma(..., logging_level=logging.INFO)\n", "\n", "# Detailed - shows additional processing details \n", "run_paradigma(..., logging_level=logging.DEBUG)\n", "\n", "# Quiet - only warnings and errors\n", "run_paradigma(..., logging_level=logging.WARNING)\n", "\n", "# Silent - only errors\n", "run_paradigma(..., logging_level=logging.ERROR)\n", "```\n", "\n", "**Custom Logger (Advanced):**\n", "\n", "For full control over logging (custom formatting, multiple handlers, etc.), provide your own logger:\n", "\n", "```python\n", "# Create custom logger with your configuration\n", "custom_logger = logging.getLogger('my_analysis')\n", "custom_logger.setLevel(logging.DEBUG)\n", "custom_logger.addHandler(...) # Add your handlers\n", "\n", "# Pass it to run_paradigma\n", "run_paradigma(..., custom_logger=custom_logger)\n", "```\n", "\n", "When a custom logger is provided, the `logging_level` parameter is ignored." ] }, { "cell_type": "code", "execution_count": null, "id": "8", "metadata": {}, "outputs": [], "source": [ "pipeline = 'pulse_rate'\n", "\n", "# Example 1: Using default output directory with storage\n", "results_single_pipeline = run_paradigma(\n", " dfs=dfs_ppg,\n", " pipelines=pipeline,\n", " skip_preparation=True,\n", " time_input_unit=TimeUnit.RELATIVE_S,\n", " save_intermediate=['quantification', 'aggregation'], # Files saved to ./output\n", " logging_level=logging.WARNING # Only show warnings and errors\n", ")\n", "\n", "print(results_single_pipeline['metadata'][pipeline])\n", "print(results_single_pipeline['aggregations'][pipeline])\n", "results_single_pipeline['quantifications'][pipeline].head()" ] }, { "cell_type": "code", "execution_count": null, "id": "9", "metadata": {}, "outputs": [], "source": [ "# Example 2: No file storage - results only in memory\n", "results_no_storage = run_paradigma(\n", " dfs=dfs_ppg,\n", " pipelines=pipeline,\n", " skip_preparation=True,\n", " time_input_unit=TimeUnit.RELATIVE_S,\n", " save_intermediate=[], # No files saved\n", " logging_level=logging.WARNING # Only show warnings and errors\n", ")\n", "\n", "print(\"Results returned without file storage:\")\n", "print(f\" Quantifications: {len(results_no_storage['quantifications'][pipeline])} rows\")\n", "print(f\" Aggregations: {results_no_storage['aggregations'][pipeline]}\")" ] }, { "cell_type": "markdown", "id": "10", "metadata": {}, "source": [ "### Example: No File Storage\n", "\n", "If you only want to work with results in memory without saving any files, use an empty `save_intermediate` list:" ] }, { "cell_type": "markdown", "id": "11", "metadata": {}, "source": [ "Note that `run_paradigma` currently does not accept accelerometer data as a supplement to the pulse\n", "rate pipeline for signal quality analysis. If you want to do these analyses, please check out the\n", "[Pulse rate analysis](https://biomarkersparkinson.github.io/paradigma/tutorials/_static/pulse_rate_analysis.html)\n", "tutorial for more info." ] }, { "cell_type": "markdown", "id": "12", "metadata": {}, "source": [ "## 2. Multi-pipeline with prepared data\n", "\n", "One of the key features of the orchestrator is the ability to run multiple analysis pipelines simultaneously on the same data. This is more efficient than running them separately." ] }, { "cell_type": "markdown", "id": "13", "metadata": {}, "source": [ "### Results Structure\n", "\n", "The multi-pipeline orchestrator returns a nested structure that organizes results by pipeline:\n", "\n", "```python\n", "{\n", " 'quantifications': {\n", " 'gait': DataFrame, # Gait segment-level quantifications\n", " 'tremor': DataFrame # Tremor window-level quantifications\n", " },\n", " 'aggregations': {\n", " 'gait': {...}, # Aggregated gait metrics\n", " 'tremor': {...} # Aggregated tremor metrics \n", " },\n", " 'metadata': {\n", " 'gait': {...}, # Gait analysis metadata\n", " 'tremor': {...} # Tremor analysis metadata\n", " },\n", " 'errors': [...] # List of errors encountered (empty if successful)\n", "}\n", "```\n", "\n", "The `errors` list tracks any failures during processing. Each error contains:\n", "- `stage`: Where the error occurred (loading, preparation, pipeline_execution, aggregation)\n", "- `error`: Error message\n", "- `file`: Filename (if file-specific)\n", "- `pipeline`: Pipeline name (if pipeline-specific)\n", "\n", "Check for errors after processing:\n", "```python\n", "if results['errors']:\n", " print(f\"Warning: {len(results['errors'])} error(s) occurred\")\n", " for error in results['errors']:\n", " print(f\" - {error['stage']}: {error['error']}\")\n", "```" ] }, { "cell_type": "code", "execution_count": null, "id": "14", "metadata": {}, "outputs": [], "source": [ "# Load prepared IMU data\n", "path_to_imu_data = Path('../../example_data/verily/imu')\n", "\n", "dfs_imu = load_data_files(\n", " data_path=path_to_imu_data,\n", " file_patterns='json'\n", ")\n", "\n", "print(f\"Loaded {len(dfs_imu)} IMU files:\")\n", "for filename in dfs_imu.keys():\n", " df = dfs_imu[filename]\n", " print(f\" - {filename}: {len(df)} samples, {len(df.columns)} columns\")\n", "\n", "print(f\"\\nFirst 5 rows of {list(dfs_imu.keys())[0]}:\")\n", "dfs_imu[list(dfs_imu.keys())[0]].head()" ] }, { "cell_type": "code", "execution_count": null, "id": "15", "metadata": {}, "outputs": [], "source": [ "# Run gait and tremor analysis on the prepared data\n", "# Using custom output directory\n", "results_multi_pipeline = run_paradigma(\n", " output_dir=Path('./output_multi'),\n", " dfs=dfs_imu, # Pre-loaded data\n", " skip_preparation=True, # Data is already prepared\n", " pipelines=['gait', 'tremor'], # Multiple pipelines (list format)\n", " watch_side='left', # Required for gait analysis\n", " save_intermediate=['quantification'], # Store quantifications only\n", " logging_level=logging.WARNING # Only show warnings and errors\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "16", "metadata": {}, "outputs": [], "source": [ "# Explore the results structure\n", "print(\"Detailed Results Analysis:\")\n", "\n", "# Gait results\n", "arm_swing_quantified = results_multi_pipeline['quantifications']['gait']\n", "arm_swing_aggregates = results_multi_pipeline['aggregations']['gait']\n", "arm_swing_meta = results_multi_pipeline['metadata']['gait']\n", "print(f\"\\nArm swing quantification ({len(arm_swing_quantified)} windows):\")\n", "print(\n", " f\" Columns: {list(arm_swing_quantified.columns[:5])}... \"\n", " f\"({len(arm_swing_quantified.columns)} total)\"\n", ")\n", "print(f\" Files: {arm_swing_quantified['file_key'].unique()}\")\n", "\n", "print(f\"\\nArm swing aggregation ({len(arm_swing_aggregates)} time ranges):\")\n", "print(f\" Gait segment categories: {list(arm_swing_aggregates.keys())}\")\n", "print(f\" Aggregates: {list(arm_swing_aggregates['0_10'].keys())}\")\n", "print(f\" Metadata first gait segment: {arm_swing_meta[1]}\")\n", "\n", "# Tremor results\n", "tremor_quantified = results_multi_pipeline['quantifications']['tremor']\n", "tremor_aggregates = results_multi_pipeline['aggregations']['tremor']\n", "tremor_meta = results_multi_pipeline['metadata']['tremor']\n", "print(f\"\\nTremor quantification ({len(tremor_quantified)} windows):\")\n", "print(\n", " f\" Columns: {list(tremor_quantified.columns[:5])}... \"\n", " f\"({len(tremor_quantified.columns)} total)\"\n", ")\n", "print(f\" Files: {tremor_quantified['file_key'].unique()}\")\n", "\n", "print(f\"\\nTremor aggregation ({len(tremor_aggregates)} time ranges):\")\n", "print(f\" Aggregates: {list(tremor_aggregates.keys())}\")\n", "print(f\" Metadata first tremor segment: {tremor_meta}\")" ] }, { "cell_type": "markdown", "id": "17", "metadata": {}, "source": [ "## 3. Raw Data Processing\n", "\n", "The orchestrator can also process raw sensor data automatically. This includes data preparation steps like format standardization, unit conversion, and orientation correction. Note that this feature has been developed on limited data examples, and therefore may not function as expected on newly observed data.\n", "\n", "### Column Mapping for Custom Data Formats\n", "\n", "If your raw data uses different column names than ParaDigMa's standard naming convention, use the `column_mapping` parameter to map your column names to the expected ones.\n", "\n", "**Standard ParaDigMa column names:**\n", "- **Required for all pipelines:**\n", " - `time`: Timestamp column\n", " \n", "- **For IMU pipelines (gait, tremor):**\n", " - `accelerometer_x`, `accelerometer_y`, `accelerometer_z`: Accelerometer axes\n", " - `gyroscope_x`, `gyroscope_y`, `gyroscope_z`: Gyroscope axes\n", " \n", "- **For PPG pipeline (pulse_rate):**\n", " - `ppg`: PPG signal\n", "\n", "**Example mapping:**\n", "```python\n", "column_mapping = {\n", " 'timestamp': 'time', # Your 'timestamp' column → ParaDigMa 'time' column\n", " 'acceleration_x': 'accelerometer_x', # Your 'acceleration' columns → ParaDigMa 'accelerometer' columns'\n", " 'acceleration_y': 'accelerometer_y',\n", " 'acceleration_z': 'accelerometer_z',\n", " 'rotation_x': 'gyroscope_x', # Your 'rotation' columns → ParaDigMa 'gyroscope' columns\n", " 'rotation_y': 'gyroscope_y',\n", " 'rotation_z': 'gyroscope_z',\n", "}\n", "```" ] }, { "cell_type": "code", "execution_count": null, "id": "18", "metadata": {}, "outputs": [], "source": [ "path_to_raw_data = Path('../../example_data/axivity')\n", "\n", "device_orientation = [\"-x\", \"-y\", \"z\"] # Sensor was worn upside-down\n", "pipeline = 'gait'\n", "\n", "# Working with raw data - this requires data preparation\n", "# Using custom output directory\n", "results_end_to_end = run_paradigma(\n", " output_dir=Path('./output_raw'),\n", " data_path=path_to_raw_data, # Point to data folder\n", " skip_preparation=False, # ParaDigMa will prepare the data\n", " pipelines=pipeline,\n", " watch_side=\"left\",\n", " time_input_unit=TimeUnit.RELATIVE_S, # Specify time unit for raw data\n", " accelerometer_units='g',\n", " gyroscope_units='deg/s',\n", " target_frequency=100.0,\n", " device_orientation=device_orientation,\n", " save_intermediate=['aggregation'], # Only save aggregations\n", " logging_level=logging.WARNING, # Only show warnings and errors\n", ")\n", "\n", "print(\n", " f\"\\nMetadata:\\n\"\n", " f\"{json.dumps(results_end_to_end['metadata'][pipeline][1], indent=2)}\"\n", ")\n", "print(\n", " f\"\\nAggregations:\\n\"\n", " f\"{json.dumps(results_end_to_end['aggregations'][pipeline], indent=2)}\"\n", ")\n", "print(\"\\nQuantifications (first 5 rows; each row represents a single arm swing):\")\n", "results_end_to_end['quantifications'][pipeline].head()" ] }, { "cell_type": "markdown", "id": "19", "metadata": {}, "source": [ "## 4. Auto-Segmentation for Non-Contiguous Data\n", "\n", "When working with sensor data, you may encounter gaps or interruptions in the recording (e.g., battery died, device removed, multiple recording sessions). The orchestrator can automatically detect these gaps and split the data into contiguous segments for processing.\n", "\n", "### When to Use Auto-Segmentation\n", "\n", "Use `split_by_gaps=True` when:\n", "- Your data has recording interruptions or gaps\n", "- You're getting \"Time array is not contiguous\" errors\n", "- You want to process multiple recording sessions in one file\n", "- Data spans multiple days with breaks\n", "\n", "### Understanding Data Segments vs Gait Segments\n", "\n", "Important distinction:\n", "- **Data segments (`data_segment_nr`)**: Contiguous recording chunks separated by temporal gaps\n", " - Created during data preparation\n", " - Example: 4 segments if recording had 3 interruptions\n", " \n", "- **Gait segments (`gait_segment_nr`)**: Detected gait bouts within the data\n", " - Created during gait pipeline analysis\n", " - Example: 52 gait bouts detected across all data segments\n", " - Only applicable to gait analysis\n", "\n", "The orchestrator will:\n", "1. Detect gaps larger than `max_gap_seconds` (default: 1.5 seconds)\n", "2. Split data into contiguous data segments\n", "3. Discard segments shorter than `min_segment_seconds` (default: 1.5 seconds)\n", "4. Add a `data_segment_nr` column to track which recording chunk each data point belongs to\n", "5. Process each data segment independently through the pipeline\n", "6. Combine results with `gait_segment_nr` for detected gait bouts (gait pipeline only)\n", "\n", "### Example: Gait-up Physilog Data with Gaps\n", "\n", "This example uses data from a Gait-up Physilog 4 device with 3 large gaps (up to ~20 minutes). The data is already in Parquet format with standard column names, but timestamps are non-contiguous." ] }, { "cell_type": "code", "execution_count": null, "id": "20", "metadata": {}, "outputs": [], "source": [ "# Load Gait-up Physilog data with non-contiguous timestamps\n", "path_to_physilog_data = Path('../../example_data/gait_up_physilog')\n", "\n", "dfs_physilog = load_data_files(\n", " data_path=path_to_physilog_data,\n", " file_patterns='parquet'\n", ")\n", "\n", "print(f\"Loaded {len(dfs_physilog)} Gait-up Physilog file(s):\")\n", "for filename, df in dfs_physilog.items():\n", " print(f\" - {filename}: {len(df)} samples, {len(df.columns)} columns\")\n", " print(f\" Time range: {df['time'].min():.1f}s to {df['time'].max():.1f}s\")\n", " print(f\" Duration: {(df['time'].max() - df['time'].min()):.1f}s\")\n", "\n", " # Check for gaps\n", " time_diffs = df['time'].diff().dropna()\n", " large_gaps = time_diffs[time_diffs > 1.0]\n", " if len(large_gaps) > 0:\n", " print(\n", " f\"Contains {len(large_gaps)} gap(s) > 1s \"\n", " f\"(largest: {large_gaps.max():.1f}s)\"\n", " )\n", "\n", " # Check for NaN values (common in real-world data)\n", " nan_counts = df.isnull().sum()\n", " if nan_counts.sum() > 0:\n", " print(f\"Contains {nan_counts.sum()} NaN values\")\n", "\n", "# Clean DataFrames with NaN values (after iteration to avoid SettingWithCopyWarning)\n", "for filename in list(dfs_physilog.keys()):\n", " df = dfs_physilog[filename]\n", " df_clean = df.dropna().reset_index(drop=True)\n", " if len(df_clean) < len(df):\n", " print(\n", " f\"Dropping {len(df) - len(df_clean)} rows with NaN values \"\n", " f\"from file {filename}\"\n", " )\n", " dfs_physilog[filename] = df_clean" ] }, { "cell_type": "code", "execution_count": null, "id": "21", "metadata": {}, "outputs": [], "source": [ "# Example: Processing non-contiguous data with auto-segmentation\n", "# Data already has standard column names and units, but needs segmentation\n", "\n", "results_with_segmentation = run_paradigma(\n", " dfs=dfs_physilog, # Pre-loaded data dictionary\n", " skip_preparation=False, # Need preparation to add data_segment_nr\n", " pipelines='gait',\n", " watch_side=\"left\",\n", " time_input_unit=TimeUnit.RELATIVE_S,\n", " # Auto-segmentation parameters\n", " split_by_gaps=True, # Enable automatic segmentation\n", " max_gap_seconds=1.0, # Gaps > 1s create new data segment\n", " min_segment_seconds=2.0, # Keep only data segments >= 2s\n", " save_intermediate=[], # No file storage for demo\n", " logging_level=logging.WARNING, # Only show warnings and errors\n", ")\n", "\n", "gait_results = results_with_segmentation['quantifications']['gait']\n", "\n", "print(f\"\\nTotal arm swings quantified: {len(gait_results)}\")\n", "print(f\"Number of gait segments: {gait_results['gait_segment_nr'].nunique()}\")\n", "if 'data_segment_nr' in gait_results.columns:\n", " print(f\"Number of data segments: {gait_results['data_segment_nr'].nunique()}\")\n", "print(f\"\\nColumns in output: {list(gait_results.columns)}\")\n", "gait_results.head()" ] } ], "metadata": {}, "nbformat": 4, "nbformat_minor": 5 }