Device specific data loadingļ
This tutorial demonstrates how to load sensor data of the following devices into memory:
Axivity AX3 / AX6
Empatica EmbracePlus
Note that Paradigma requires further data preparation as outlined in the data preparation tutorial.
Axivityļ
Axivity sensor data (AX3 & AX6) are stored in .CWA format, which requires some preparation to be processable. In this tutorial, we showcase how to transform .CWA files into a workable format in Python using openmovement. More information on the openmovement package can be found on the Open Movement GitHub page.
For the openmovement package, make sure to install the master branch, as this branch contains the valid code for preparing .CWA data. This can for example be done using pip by running:
pip install git+https://github.com/digitalinteraction/openmovement-python.git@master
Or, when using Poetry, add the following line to the list of dependencies in pyproject.toml:
openmovement = { git = "https://github.com/digitalinteraction/openmovement-python.git", branch = "master" }
from pathlib import Path
from pprint import pprint
import pandas as pd
from openmovement.load import CwaData
# Load data
path_to_input_data = Path('../../example_data/axivity/')
test_data_filename = 'test_data.CWA'
prepared_data_filename = 'test_data.parquet'
# Note: Set include_gyro to False when using AX3 devices without gyroscope,
# or when gyroscope data is not needed
with CwaData(
filename=path_to_input_data / test_data_filename,
include_gyro=True,
include_temperature=False
) as cwa_data:
print("Data format info:")
pprint(cwa_data.data_format)
df = cwa_data.get_samples() # Load all samples into a DataFrame
# Set time to start at 0 seconds
df['time_dt'] = df['time'].copy()
df['time'] = (df['time'] - df['time'].iloc[0]).dt.total_seconds()
df.head()
Empatica EmbracePlusļ
Empatica EmbracePlus sensor data is stored in Apache Avro (.avro) format. In short, Empatica automatically writes sensor data every 30 minutes to a cloud storage with the naming convention [participant_id]_[timestamp].avro. In this tutorial we will show how to read and prepare a single .avro file.
For more detailed documentation on using this data format in Python, consider reading the official Apache Avro documentation. Extensive documentation is available on how to read and write .avro files in Python here.
import json
from pathlib import Path
from avro.datafile import DataFileReader
from avro.io import DatumReader
path_to_input_data = Path('../../example_data/empatica/')
empatica_data_filename = 'test_data.avro'
## Read Avro file
# reader = DataFileReader(
# open(path_to_empatica_data / empatica_data_filename, "rb"),
# DatumReader()
# )
with open(path_to_input_data / empatica_data_filename, "rb") as f:
reader = DataFileReader(f, DatumReader())
schema = json.loads(reader.meta.get("avro.schema").decode("utf-8"))
empatica_data = next(reader)
accel_data = empatica_data['rawData']['accelerometer']
# The example data does not contain gyroscope data, but if it did,
# you could access it like this:
# gyro_data = empatica_data['rawData']['gyroscope']
# To convert accelerometer and gyroscope data into the correct format, we need to
# check the Avro schema version. This converts accelerometer into g (9.81 m/s²) units,
# and gyroscope into degrees per second (rad/s). More info on units and conversion
# can be found in the schema object using: print(schema).
avro_version = (
(empatica_data["schemaVersion"]["major"]),
(empatica_data["schemaVersion"]["minor"]),
(empatica_data["schemaVersion"]["patch"]),
)
# Due to changes in the Avro schema, conversion differs for versions
# before and after 6.5.0
if avro_version < (6, 5, 0):
physical_range = (
accel_data["imuParams"]["physicalMax"]
- accel_data["imuParams"]["physicalMin"]
)
digital_range = (
accel_data["imuParams"]["digitalMax"]
- accel_data["imuParams"]["digitalMin"]
)
accel_x = [val * physical_range / digital_range for val in accel_data["x"]]
accel_y = [val * physical_range / digital_range for val in accel_data["y"]]
accel_z = [val * physical_range / digital_range for val in accel_data["z"]]
else:
conversion_factor = accel_data["imuParams"]["conversionFactor"]
accel_x = [val * conversion_factor for val in accel_data["x"]]
accel_y = [val * conversion_factor for val in accel_data["y"]]
accel_z = [val * conversion_factor for val in accel_data["z"]]
sampling_frequency = accel_data['samplingFrequency']
nrows = len(accel_x)
t_start = accel_data['timestampStart']
t_array = [t_start + i * (1e6 /sampling_frequency) for i in range(nrows)]
t_from_0_array = ([(x - t_array[0]) / 1e6 for x in t_array])
df = pd.DataFrame({
'time': t_from_0_array,
'time_dt': pd.to_datetime(t_array, unit='us'),
'accel_x': accel_x,
'accel_y': accel_y,
'accel_z': accel_z,
})
print(
f"Data loaded from Avro file with {nrows} rows sampled "
f"at {sampling_frequency} Hz."
)
print(f"Start time: {pd.to_datetime(t_start, unit='us')}")
df.head()