Device specific data loading

This tutorial demonstrates how to load sensor data of the following devices into memory:

  • Axivity AX3 / AX6

  • Empatica EmbracePlus

Note that Paradigma requires further data preparation as outlined in the data preparation tutorial.

Axivity

Axivity sensor data (AX3 & AX6) are stored in .CWA format, which requires some preparation to be processable. In this tutorial, we showcase how to transform .CWA files into a workable format in Python using openmovement. More information on the openmovement package can be found on the Open Movement GitHub page.

For the openmovement package, make sure to install the master branch, as this branch contains the valid code for preparing .CWA data. This can for example be done using pip by running:

pip install git+https://github.com/digitalinteraction/openmovement-python.git@master

Or, when using Poetry, add the following line to the list of dependencies in pyproject.toml:

openmovement = { git = "https://github.com/digitalinteraction/openmovement-python.git", branch = "master" }
from pathlib import Path
from pprint import pprint

import pandas as pd
from openmovement.load import CwaData

# Load data
path_to_input_data = Path('../../example_data/axivity/')
test_data_filename = 'test_data.CWA'
prepared_data_filename = 'test_data.parquet'

# Note: Set include_gyro to False when using AX3 devices without gyroscope,
# or when gyroscope data is not needed
with CwaData(
    filename=path_to_input_data / test_data_filename,
    include_gyro=True,
    include_temperature=False
) as cwa_data:
    print("Data format info:")
    pprint(cwa_data.data_format)

    df = cwa_data.get_samples()  # Load all samples into a DataFrame

# Set time to start at 0 seconds
df['time_dt'] = df['time'].copy()
df['time'] = (df['time'] - df['time'].iloc[0]).dt.total_seconds()

df.head()

Empatica EmbracePlus

Empatica EmbracePlus sensor data is stored in Apache Avro (.avro) format. In short, Empatica automatically writes sensor data every 30 minutes to a cloud storage with the naming convention [participant_id]_[timestamp].avro. In this tutorial we will show how to read and prepare a single .avro file.

For more detailed documentation on using this data format in Python, consider reading the official Apache Avro documentation. Extensive documentation is available on how to read and write .avro files in Python here.

import json
from pathlib import Path

from avro.datafile import DataFileReader
from avro.io import DatumReader

path_to_input_data = Path('../../example_data/empatica/')
empatica_data_filename = 'test_data.avro'

## Read Avro file
# reader = DataFileReader(
#     open(path_to_empatica_data / empatica_data_filename, "rb"),
#     DatumReader()
# )
with open(path_to_input_data / empatica_data_filename, "rb") as f:
    reader = DataFileReader(f, DatumReader())

    schema = json.loads(reader.meta.get("avro.schema").decode("utf-8"))
    empatica_data = next(reader)

accel_data = empatica_data['rawData']['accelerometer']

# The example data does not contain gyroscope data, but if it did,
# you could access it like this:
# gyro_data = empatica_data['rawData']['gyroscope']

# To convert accelerometer and gyroscope data into the correct format, we need to
# check the Avro schema version. This converts accelerometer into g (9.81 m/s²) units,
# and gyroscope into degrees per second (rad/s). More info on units and conversion
# can be found in the schema object using: print(schema).

avro_version = (
    (empatica_data["schemaVersion"]["major"]),
    (empatica_data["schemaVersion"]["minor"]),
    (empatica_data["schemaVersion"]["patch"]),
)

# Due to changes in the Avro schema, conversion differs for versions
# before and after 6.5.0
if avro_version < (6, 5, 0):
    physical_range = (
        accel_data["imuParams"]["physicalMax"]
        - accel_data["imuParams"]["physicalMin"]
    )
    digital_range = (
        accel_data["imuParams"]["digitalMax"]
        - accel_data["imuParams"]["digitalMin"]
    )
    accel_x = [val * physical_range / digital_range for val in accel_data["x"]]
    accel_y = [val * physical_range / digital_range for val in accel_data["y"]]
    accel_z = [val * physical_range / digital_range for val in accel_data["z"]]
else:
    conversion_factor = accel_data["imuParams"]["conversionFactor"]
    accel_x = [val * conversion_factor for val in accel_data["x"]]
    accel_y = [val * conversion_factor for val in accel_data["y"]]
    accel_z = [val * conversion_factor for val in accel_data["z"]]

sampling_frequency = accel_data['samplingFrequency']
nrows = len(accel_x)

t_start = accel_data['timestampStart']
t_array = [t_start + i * (1e6 /sampling_frequency) for i in range(nrows)]
t_from_0_array = ([(x - t_array[0]) / 1e6 for x in t_array])

df = pd.DataFrame({
    'time': t_from_0_array,
    'time_dt': pd.to_datetime(t_array, unit='us'),
    'accel_x': accel_x,
    'accel_y': accel_y,
    'accel_z': accel_z,
})

print(
    f"Data loaded from Avro file with {nrows} rows sampled "
    f"at {sampling_frequency} Hz."
)
print(f"Start time: {pd.to_datetime(t_start, unit='us')}")

df.head()