# -*- coding: utf-8 -*-
"""
Functions for processing and interpreting physiological data
"""
import matplotlib.pyplot as plt
import numpy as np
from scipy import interpolate, signal
from peakdet import editor, utils
[docs]@utils.make_operation()
def filter_physio(data, cutoffs, method, *, order=3):
"""
Applies an `order`-order digital `method` Butterworth filter to `data`
Parameters
----------
data : Physio_like
Input physiological data to be filtered
cutoffs : int or list
If `method` is 'lowpass' or 'highpass', an integer specifying the lower
or upper bound of the filter (in Hz). If method is 'bandpass' or
'bandstop', a list specifying the lower and upper bound of the filter
(in Hz).
method : {'lowpass', 'highpass', 'bandpass', 'bandstop'}
The type of filter to apply to `data`
order : int, optional
Order of filter to be applied. Default: 3
Returns
-------
filtered : :class:`peakdet.Physio`
Filtered input `data`
"""
_valid_methods = ['lowpass', 'highpass', 'bandpass', 'bandstop']
data = utils.check_physio(data, ensure_fs=True)
if method not in _valid_methods:
raise ValueError('Provided method {} is not permitted; must be in {}.'
.format(method, _valid_methods))
cutoffs = np.array(cutoffs)
if method in ['lowpass', 'highpass'] and cutoffs.size != 1:
raise ValueError('Cutoffs must be length 1 when using {} filter'
.format(method))
elif method in ['bandpass', 'bandstop'] and cutoffs.size != 2:
raise ValueError('Cutoffs must be length 2 when using {} filter'
.format(method))
nyq_cutoff = cutoffs / (data.fs * 0.5)
if np.any(nyq_cutoff > 1):
raise ValueError('Provided cutoffs {} are outside of the Nyquist '
'frequency for input data with sampling rate {}.'
.format(cutoffs, data.fs))
b, a = signal.butter(int(order), nyq_cutoff, btype=method)
filtered = utils.new_physio_like(data, signal.filtfilt(b, a, data))
return filtered
[docs]@utils.make_operation()
def interpolate_physio(data, target_fs, *, kind='cubic'):
"""
Interpolates `data` to desired sampling rate `target_fs`
Parameters
----------
data : Physio_like
Input physiological data to be interpolated
target_fs : float
Desired sampling rate for `data`
kind : str or int, optional
Type of interpolation to perform. Must be one of available kinds in
:func:`scipy.interpolate.interp1d`. Default: 'cubic'
Returns
-------
interp : :class:`peakdet.Physio`
Interpolated input `data`
"""
data = utils.check_physio(data, ensure_fs=True)
factor = target_fs / data.fs
# generate original and target "time" series
t_orig = np.linspace(0, len(data) / data.fs, len(data))
t_new = np.linspace(0, len(data) / data.fs, int(len(data) * factor))
# interpolate data and generate new Physio object
interp = interpolate.interp1d(t_orig, data, kind=kind)(t_new)
interp = utils.new_physio_like(data, interp, fs=target_fs)
return interp
[docs]@utils.make_operation()
def peakfind_physio(data, *, thresh=0.2, dist=None):
"""
Performs peak and trough detection on `data`
Parameters
----------
data : Physio_like
Input data in which to find peaks
thresh : float [0,1], optional
Relative height threshold a data point must surpass to be classified as
a peak. Default: 0.2
dist : int, optional
Distance in indices that peaks must be separated by in `data`. If None,
this is estimated. Default: None
Returns
-------
peaks : :class:`peakdet.Physio`
Input `data` with detected peaks and troughs
"""
ensure_fs = True if dist is None else False
data = utils.check_physio(data, ensure_fs=ensure_fs, copy=True)
# first pass peak detection to get approximate distance between peaks
cdist = data.fs // 4 if dist is None else dist
thresh = np.squeeze(np.diff(np.percentile(data, [5, 95]))) * thresh
locs, heights = signal.find_peaks(data[:], distance=cdist, height=thresh)
# second, more thorough peak detection
cdist = np.diff(locs).mean() // 2
heights = np.percentile(heights['peak_heights'], 1)
locs, heights = signal.find_peaks(data[:], distance=cdist, height=heights)
data._metadata['peaks'] = locs
# perform trough detection based on detected peaks
data._metadata['troughs'] = utils.check_troughs(data, data.peaks)
return data
@utils.make_operation()
def delete_peaks(data, remove):
"""
Deletes peaks in `remove` from peaks stored in `data`
Parameters
----------
data : Physio_like
remove : array_like
Returns
-------
data : Physio_like
"""
data = utils.check_physio(data, ensure_fs=False, copy=True)
data._metadata['peaks'] = np.setdiff1d(data._metadata['peaks'], remove)
data._metadata['troughs'] = utils.check_troughs(data, data.peaks)
return data
@utils.make_operation()
def reject_peaks(data, remove):
"""
Marks peaks in `remove` as rejected artifacts in `data`
Parameters
----------
data : Physio_like
remove : array_like
Returns
-------
data : Physio_like
"""
data = utils.check_physio(data, ensure_fs=False, copy=True)
data._metadata['reject'] = np.append(data._metadata['reject'], remove)
data._metadata['troughs'] = utils.check_troughs(data, data.peaks)
return data
[docs]def edit_physio(data):
"""
Opens interactive plot with `data` to permit manual editing of time series
Parameters
----------
data : Physio_like
Physiological data to be edited
Returns
-------
edited : :class:`peakdet.Physio`
Input `data` with manual edits
"""
data = utils.check_physio(data, ensure_fs=True)
# no point in manual edits if peaks/troughs aren't defined
if not (len(data.peaks) and len(data.troughs)):
return
# perform manual editing
edits = editor._PhysioEditor(data)
plt.show(block=True)
delete, reject = sorted(edits.deleted), sorted(edits.rejected)
# replay editing on original provided data object
if reject is not None:
data = reject_peaks(data, remove=reject)
if delete is not None:
data = delete_peaks(data, remove=delete)
return data
[docs]def plot_physio(data, *, ax=None):
"""
Plots `data` and associated peaks / troughs
Parameters
----------
data : Physio_like
Physiological data to plot
ax : :class:`matplotlib.axes.Axes`, optional
Axis on which to plot `data`. If None, a new axis is created. Default:
None
Returns
-------
ax : :class:`matplotlib.axes.Axes`
Axis with plotted `data`
"""
# generate x-axis time series
fs = 1 if np.isnan(data.fs) else data.fs
time = np.arange(0, len(data) / fs, 1 / fs)
if ax is None:
fig, ax = plt.subplots(1, 1)
# plot data with peaks + troughs, as appropriate
ax.plot(time, data, 'b',
time[data.peaks], data[data.peaks], '.r',
time[data.troughs], data[data.troughs], '.g')
return ax