Source code for plasmapy.analysis.time_series.conditional_averaging

"""
Functionality to calculate the conditional average and conditional
variance of a time series.
"""

__all__ = ["ConditionalEvents"]


import astropy.units as u
import numpy as np
from scipy.signal import find_peaks


[docs] class ConditionalEvents: """ Calculate conditional average, conditional variance, peaks, arrival times and waiting times of events of a time series. Parameters ---------- signal : 1D |array_like| Signal to be analyzed. time : 1D |array_like| Corresponding time values for ``signal``. lower_threshold : `float` or `~astropy.units.Quantity` Lower threshold for event detection. upper_threshold : `float` or `~astropy.units.Quantity`, default: `None` Upper threshold for event detection. reference_signal : 1D |array_like|, default: `None` Reference signal. If `None`, ``signal`` is the reference signal. length_of_return : `float`, default: `None` Desired length of returned data. If `None`, estimated as ``len(signal) / len(number_of_events) * time_step``. distance : `float`, default: ``0`` Minimum distance between peaks, in units of time. remove_non_max_peaks : `bool`, default: `False` Remove events where peak is not the largest value inside window. Raises ------ `ValueError`: If length of ``signal`` and ``time`` are not equal. If length of ``reference_signal`` and ``time`` are not equal (when ``reference_signal`` is provided). If ``length_of_return`` is greater than the length of the time span. If ``length_of_return`` is negative. If ``upper_threshold`` is less than or equal to ``lower_threshold``. `~astropy.units.UnitsError`: If astropy units of ``signal``/``reference_signal`` and either ``lower_threshold`` or ``upper_threshold`` do not match. If the units of ``time``, ``length_of_return`` and ``distance`` do not match. Notes ----- The method, in its simplest form, works by finding peaks in a signal that fulfill a certain size threshold. Equally sized excerpts of the signal around every peak are then cut out and averaged. This yields the average shape of the events that fulfill the condition. A detailed analysis of the conditional averaging method is presented in the Master thesis by :cite:t:`nilsen:2023`. Examples -------- .. autolink-skip:: section >>> from plasmapy.analysis.time_series.conditional_averaging import ( ... ConditionalEvents, ... ) >>> cond_events = ConditionalEvents( ... signal=[1, 2, 1, 1, 2, 1], ... time=[1, 2, 3, 4, 5, 6], ... lower_threshold=1.5, ... ) >>> cond_events.time array([-1.0, 0.0, 1.0]) >>> cond_events.average array([1., 2., 1.]) >>> cond_events.variance array([1., 1., 1.]) >>> cond_events.peaks array([2, 2]) >>> cond_events.waiting_times array([3]) >>> cond_events.arrival_times array([2, 5]) >>> cond_events.number_of_events 2 """ def __init__( self, signal, time, lower_threshold, *, upper_threshold=None, reference_signal=None, length_of_return=None, distance=0, remove_non_max_peaks: bool = False, ) -> None: self._check_for_value_errors( distance, signal, time, reference_signal, length_of_return, upper_threshold, lower_threshold, ) if reference_signal is not None: self._check_units_consistency( [reference_signal, lower_threshold, upper_threshold] ) else: self._check_units_consistency([signal, lower_threshold, upper_threshold]) self._check_units_consistency([time, length_of_return, distance]) self._astropy_signal_unit = None self._astropy_time_unit = None signal, self._astropy_signal_unit = self._separate_unit_from_variable(signal) time, self._astropy_time_unit = self._separate_unit_from_variable(time) lower_threshold, _ = self._separate_unit_from_variable(lower_threshold) upper_threshold, _ = self._separate_unit_from_variable(upper_threshold) reference_signal, _ = self._separate_unit_from_variable(reference_signal) length_of_return, _ = self._separate_unit_from_variable(length_of_return) distance, _ = self._separate_unit_from_variable(distance) self._reference_signal_provided = True if reference_signal is None: self._reference_signal_provided = False reference_signal = signal.copy() signal = self._ensure_numpy_array(signal) time = self._ensure_numpy_array(time) reference_signal = self._ensure_numpy_array(reference_signal) time_step = np.diff(time).sum() / (len(time) - 1) peak_locations, _ = find_peaks( reference_signal, height=[lower_threshold, upper_threshold], distance=int(distance / time_step) + 1, ) conditional_events_indices = self._separate_events( reference_signal, lower_threshold, upper_threshold ) peak_indices = self._choose_largest_peak_per_event( reference_signal, conditional_events_indices, peak_locations, ) if length_of_return is None: length_of_return = len(signal) / len(conditional_events_indices) * time_step self._return_time = ( np.arange( -int(length_of_return / (time_step * 2)), int(length_of_return / (time_step * 2)) + 1, ) * time_step ) conditional_events = self._calculate_all_events(signal, peak_indices) if remove_non_max_peaks: if self._reference_signal_provided: conditional_events_reference_signal = self._calculate_all_events( reference_signal, peak_indices ) conditional_events, peak_indices = self._check_if_largest_value_is_peak( conditional_events, peak_indices, conditional_events_reference_signal, ) else: conditional_events, peak_indices = self._check_if_largest_value_is_peak( conditional_events, peak_indices, None ) self._conditional_average = np.mean(conditional_events, axis=0) self._conditional_variance = self._calculate_conditional_variance( conditional_events ) self._peaks = signal[peak_indices] self._number_of_events = len(self._peaks) self._arrival_times = time[peak_indices] self._waiting_times = np.diff(self._arrival_times) if self._astropy_signal_unit is not None: self._peaks *= self._astropy_signal_unit self._conditional_average *= self._astropy_signal_unit if self._astropy_time_unit is not None: self._return_time *= self._astropy_time_unit self._arrival_times *= self._astropy_time_unit self._waiting_times *= self._astropy_time_unit @property def time(self): """ Time values corresponding to the analysis window. Returns ------- time : 1D |array_like| Time values representing the analysis window. """ return self._return_time @property def average(self): """ Conditional average over events. Returns ------- average : 1D |array_like| Array representing the conditional average over events. """ return self._conditional_average @property def variance(self): """ Conditional variance over events. Returns ------- variance : 1D |array_like| Array representing the conditional variance over events. """ return self._conditional_variance @property def peaks(self): """ Peak values of conditional events. Returns ------- peaks : 1D |array_like| Peak values of conditional events. """ return self._peaks @property def waiting_times(self): """ Waiting times between consecutive peaks. Returns ------- waiting_times : 1D |array_like| Waiting times between consecutive peaks of conditional events. """ return self._waiting_times @property def arrival_times(self): """ Arrival times corresponding to the conditional events. Returns ------- arrival_times : 1D |array_like| Arrival times the conditional events. """ return self._arrival_times @property def number_of_events(self): """ Total number of conditional events. Returns ------- number_of_events : int Total number of conditional events. """ return self._number_of_events def _check_for_value_errors( self, distance, signal, time, reference_signal, length_of_return, upper_threshold, lower_threshold, ): if distance < 0: raise ValueError("The distance parameter can't be negative") if len(signal) != len(time): raise ValueError("Length of signal and time must be equal") if reference_signal is not None and len(reference_signal) != len(time): raise ValueError("Length of reference_signal and time must be equal") if length_of_return is not None: if length_of_return > time[-1] - time[0]: raise ValueError( "Choose length_of_return shorter or euqal to time length" ) if length_of_return < 0: raise ValueError("The length_of_return parameter must be bigger than 0") if upper_threshold and upper_threshold <= lower_threshold: raise ValueError( "The upper_threshold is higher than the lower_threshold, no events will be found" ) def _separate_unit_from_variable(self, variable): if isinstance(variable, u.Quantity): return variable.value, variable.unit return variable, None def _check_units_consistency(self, variables): # check whether all variables have an astropy unit first_variable_has_unit = isinstance(variables[0], u.Quantity) for variable in variables: if variable is not None and first_variable_has_unit != isinstance( variable, u.Quantity ): raise u.UnitsError(f"Units do not match: {variable} and {variables[0]}") # check whether all variables have same astropy unit if first_variable_has_unit: first_unit = variables[0].unit for variable in variables: if variable is not None and first_unit != variable.unit: raise u.UnitsError( f"Units do not match: {variable} and {variables[0]}" ) def _ensure_numpy_array(self, variable): if not isinstance(variable, np.ndarray): variable = np.array(variable) return variable def _separate_events(self, reference_signal, lower_threshold, upper_threshold): places = np.where(reference_signal > lower_threshold)[0] if upper_threshold: higher = np.where(reference_signal < upper_threshold)[0] places = np.intersect1d(places, higher) distance_between_places = np.diff(places) _split = np.where(distance_between_places != 1)[0] return np.split(places, _split + 1) def _choose_largest_peak_per_event( self, reference_signal, conditional_events_indices, peak_indices, ): for event in conditional_events_indices: peaks_in_event = np.isin(peak_indices, event) if peaks_in_event.sum() > 1: peak_ind = peak_indices[peaks_in_event] highest_local_peak = reference_signal[peak_ind].argmax() not_highest_local_peaks = np.delete(peak_ind, highest_local_peak) peak_indices = np.delete( peak_indices, np.isin(peak_indices, not_highest_local_peaks) ) return peak_indices def _calculate_all_events(self, signal, peak_indices): t_half_len = int((len(self._return_time) - 1) / 2) conditional_events = np.zeros([len(peak_indices), len(self._return_time)]) for i, global_peak_loc in enumerate(peak_indices): low_ind = int(max(0, global_peak_loc - t_half_len)) high_ind = int(min(len(signal), global_peak_loc + t_half_len + 1)) single_event = signal[low_ind:high_ind] if low_ind == 0: single_event = np.append( np.zeros(-global_peak_loc + t_half_len), single_event ) if high_ind == len(signal): single_event = np.append( single_event, np.zeros(global_peak_loc + t_half_len + 1 - len(signal)), ) conditional_events[i, :] = single_event return conditional_events def _check_if_largest_value_is_peak( self, conditional_events, peak_indices, conditional_events_reference_signal ): def is_middle_value_highest(sequence): middle_index = len(sequence) // 2 return np.max(sequence[middle_index]) == np.max(sequence) checked_conditional_events = [] checked_peak_indices = [] if self._reference_signal_provided: for event, peak, reference_event in zip( conditional_events, peak_indices, conditional_events_reference_signal, strict=False, ): if is_middle_value_highest(reference_event): checked_conditional_events.append(event) checked_peak_indices.append(peak) else: for event, peak in zip(conditional_events, peak_indices, strict=False): if is_middle_value_highest(event): checked_conditional_events.append(event) checked_peak_indices.append(peak) return np.array(checked_conditional_events), np.array(checked_peak_indices) def _calculate_conditional_variance(self, conditional_events): return self._conditional_average**2 / np.mean(conditional_events**2, axis=0)