Source code for plasmapy.analysis.swept_langmuir.helpers

"""Helper functions for analyzing swept Langmuir traces."""

__all__ = ["check_sweep", "merge_voltage_clusters", "sort_sweep_arrays"]

import numbers
import warnings
from typing import Literal

import astropy.units as u
import numpy as np

from plasmapy.utils.exceptions import PlasmaPyWarning


[docs] def check_sweep( # noqa: C901, PLR0912 voltage: np.ndarray, current: np.ndarray, strip_units: bool = True, # noqa: FBT001, FBT002 allow_unsorted: bool = False, # noqa: FBT001, FBT002 ) -> tuple[np.ndarray, np.ndarray]: """ Check that the voltage and current arrays are properly formatted for analysis by `plasmapy.analysis.swept_langmuir`. Parameters ---------- voltage: `numpy.ndarray` 1D `numpy.ndarray` representing the voltage of the swept Langmuir trace. Voltage should be monotonically increasing. *No units are assumed or checked, but values should be in volts.* current: `numpy.ndarray` 1D `numpy.ndarray` representing the current of the swept Langmuir trace. Values should start from a negative ion-saturation current and increase to a positive electron-saturation current. *No units are assumed or checked, but values should be in amperes.* strip_units: `bool`, default: `True` If `True`, then the units on ``voltage`` and/or ``current`` will be stripped if either are passed in as an Astropy `~astropy.units.Quantity`. allow_unsorted: `bool`, default: `False` If `True`, then the supplied ``voltage`` array must be monotonically increasing. Returns ------- voltage : `numpy.ndarray` Input argument ``voltage`` after it goes through all of its checks and conditioning. current : `numpy.ndarray` Input argument ``current`` after it goes through all of its checks and conditioning. Raises ------ `TypeError` If either the ``voltage`` or ``current`` arrays are not instances of a `numpy.ndarray`. `ValueError`: If either the ``voltage`` or ``current`` arrays are not 1D. `ValueError` If the ``voltage`` array is not monotonically increasing. `ValueError` If the ``current`` array never crosses zero (i.e. has no floating potential). `ValueError` If the ``current`` array does not start form a negative ion-saturation current and increases to a positive electron-saturation current. `ValueError` If either the ``voltage`` or ``current`` array does not have a `numpy.dtype` of either `numpy.integer` or `numpy.floating`. """ # -- examine voltage array -- # check type if isinstance(voltage, np.ndarray): pass elif isinstance(voltage, list | tuple): voltage = np.array(voltage) else: raise TypeError( f"Expected 1D numpy array for voltage, but got {type(voltage)}.", ) # check array structure if not ( np.issubdtype(voltage.dtype, np.floating) or np.issubdtype(voltage.dtype, np.integer) ): raise ValueError( f"Expected 1D numpy array of floats or integers for voltage, but" f" got an array with dtype '{voltage.dtype}'.", ) elif voltage.ndim != 1: raise ValueError( f"Expected 1D numpy array for voltage, but got array with " f"{voltage.ndim} dimensions.", ) elif not np.all(np.diff(voltage) >= 0) and not allow_unsorted: raise ValueError("The voltage array is not monotonically increasing.") if isinstance(voltage, u.Quantity) and strip_units: voltage = voltage.value # -- examine current array -- # check type if isinstance(current, np.ndarray): pass elif isinstance(current, list | tuple): current = np.array(current) else: raise TypeError( f"Expected 1D numpy array for current, but got {type(current)}.", ) # check array structure if not ( np.issubdtype(current.dtype, np.floating) or np.issubdtype(current.dtype, np.integer) ): raise ValueError( f"Expected 1D numpy array of floats or integers for current, but" f" got an array with dtype '{current.dtype}'.", ) elif current.ndim != 1: raise ValueError( f"Expected 1D numpy array for current, but got array with " f"{current.ndim} dimensions.", ) elif current.min() > 0.0 or current.max() < 0: raise ValueError( "Invalid swept Langmuir trace, the current never crosses zero " "'current = 0'.", ) elif current[0] > 0.0 or current[-1] < 0.0: raise ValueError( "The current array needs to start from a negative ion-saturation " "current to a positive electron-saturation current.", ) if voltage.size != current.size: raise ValueError( f"Incompatible arrays, 'voltage' size {voltage.size} must be the same" f" as the 'current' size {current.size}.", ) if isinstance(current, u.Quantity) and strip_units: current = current.value return voltage, current
[docs] def sort_sweep_arrays( voltage: np.ndarray, current: np.ndarray, voltage_order: Literal["ascending", "descending"] = "ascending", ) -> tuple[np.ndarray, np.ndarray]: """ Sort the swept langmuir ``voltage`` and ``current`` traces to ensure the ``voltage`` array is either monotonically increasing or decreasing. Parameters ---------- voltage: `numpy.ndarray` 1D `numpy.ndarray` representing the voltage of the swept Langmuir trace. *No units are assumed or checked, but values should be in volts.* current: `numpy.ndarray` 1D `numpy.ndarray` representing the current of the swept Langmuir trace. Values should start from a negative ion-saturation current and increase to a positive electron-saturation current. *No units are assumed or checked, but values should be in amperes.* voltage_order: `str` Either ``'ascending'`` or ``'descending'`` to indicate how the ``voltage`` array should be sorted. (DEFAULT: ``'ascending'``) Returns ------- voltage : `numpy.ndarray` Sorted ``voltage`` array. current : `numpy.ndarray` Matched ``current`` array to the sorted ``voltage`` array. """ if not isinstance(voltage_order, str): raise TypeError( "Expected 'voltage_order' to be a string equal to 'ascending' " f"or 'descending', but got type {type(voltage_order)}.", ) elif voltage_order not in ["ascending", "descending"]: raise ValueError( "Expected 'voltage_order' to be a string equal to 'ascending' " f"or 'descending', but got '{voltage_order}'.", ) voltage, current = check_sweep( voltage, current, strip_units=True, allow_unsorted=True, ) # determine order voltage_diff = np.diff(voltage) if np.all(voltage_diff >= 0): _order = "ascending" elif np.all(voltage_diff <= 0): _order = "descending" else: _order = None if _order == voltage_order: # already ordered return voltage, current # perform sorting if _order is None: index_sort = np.argsort(voltage) if voltage_order == "descending": index_sort = index_sort[::-1] voltage = voltage[index_sort] current = current[index_sort] else: voltage = voltage[::-1] current = current[::-1] return voltage, current
def _is_voltage_regularly_spaced( voltage_diff: np.ndarray, mask_zero_diff: np.ndarray, ) -> bool: """ Determine if the voltage difference array ``voltage_diff`` is regularly spaced; that is the differences are all equal or some integer multiple of the smallest difference. """ is_regular_grid = False if np.count_nonzero(mask_zero_diff) > 0: # is_regular_grid = False return False elif np.allclose(voltage_diff, voltage_diff[0]): # grid is already regularly spaced is_regular_grid = True elif np.allclose( np.rint(voltage_diff / np.min(voltage_diff)) - voltage_diff / np.min(voltage_diff), 0, ): # grid has a common dV, but at times jumps N * dV times min_dV = np.min(voltage_diff) ndV = np.rint(voltage_diff / min_dV) is_regular_grid = not np.any(ndV > 10) return is_regular_grid def _merge_voltage_clusters__zero_diff_neighbors( voltage: np.ndarray, current: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: """ Take the ``voltage`` and ``current`` arrays associated with a swept langmuir trace and average together clusters of identical voltage values. """ # generate boolean mask for zero diff locations voltage_diff = np.diff(voltage) mask_zero_diff = np.isclose(voltage_diff, 0.0) mask_zero_diff = np.append(mask_zero_diff, [mask_zero_diff[-1]]) # initialize new voltage and current arrays new_voltage = np.full(voltage.shape, np.nan, dtype=voltage.dtype) new_current = np.full(current.shape, np.nan, dtype=current.dtype) mask = np.logical_not(mask_zero_diff) new_voltage[mask] = voltage[mask] new_current[mask] = current[mask] # merge zero diff clusters while np.any(mask_zero_diff): leading_cluster_volt = voltage[mask_zero_diff][0] index = np.where(new_voltage == leading_cluster_volt)[0] if len(index) == 0: index = np.where(mask_zero_diff)[0] index = index[0] volt_mask = np.isclose(voltage, leading_cluster_volt) new_voltage[index] = leading_cluster_volt new_current[index] = np.average(current[volt_mask]) mask_zero_diff[volt_mask] = False # remove nan entries mask = np.logical_not(np.isnan(new_voltage)) new_voltage = new_voltage[mask] new_current = new_current[mask] return new_voltage, new_current def _merge_voltage_clusters__within_dv( # noqa: C901 voltage: np.ndarray, current: np.ndarray, voltage_step_size: float, ) -> tuple[np.ndarray, np.ndarray]: """ Take the ``voltage`` and ``current`` arrays associated with a swept langmuir trace and average together clusters of voltage values within step size ``voltage_step_size``. """ # initialize new voltage and current arrays new_voltage = voltage.copy() new_current = current.copy() voltage_diff = np.diff(voltage) # generate global cluster mask mask1 = voltage_diff < voltage_step_size mask2 = np.isclose(voltage_diff, voltage_step_size) cluster_mask = np.logical_or(mask1, mask2) # determine cluster locations indices = np.where(np.diff(cluster_mask))[0] if cluster_mask[0]: # 1st element in voltage is in a cluster indices = np.append([0], indices) indices[1] = indices[1] + 1 else: indices[0:2] = indices[0:2] + 1 if indices.size > 1: indices[2:] = indices[2:] + 1 if cluster_mask[-1] and indices[-1] != voltage.size - 1: # last element in voltage is in a cluster indices = np.append(indices, [voltage.size - 1]) newshape = (int(indices.size / 2), 2) indices = np.reshape(indices, newshape) # merge clusters and update new_voltage and new_current for ii in range(indices.shape[0]): start_index = indices[ii][0] stop_index = indices[ii][1] + 1 new_voltage[start_index:stop_index] = np.nan new_current[start_index:stop_index] = np.nan sub_voltage = voltage[start_index:stop_index] sub_current = current[start_index:stop_index] v_range = sub_voltage[-1] - sub_voltage[0] nbins = int(np.rint(v_range / voltage_step_size)) if not np.isclose(nbins - (v_range / voltage_step_size), 0): nbins = int( np.floor((sub_voltage[-1] - sub_voltage[0]) / voltage_step_size), ) if nbins == 0: nbins = 1 if nbins == 1: new_voltage[start_index] = np.average(sub_voltage) new_current[start_index] = np.average(sub_current) continue range_array = np.linspace(sub_voltage[0], sub_voltage[-1], nbins + 1) for jj in range(nbins): start_voltage = range_array[jj] stop_voltage = range_array[jj + 1] mask1 = sub_voltage >= start_voltage if jj == nbins - 1: mask2 = sub_voltage <= stop_voltage else: mask2 = sub_voltage < stop_voltage mask = np.logical_and(mask1, mask2) if np.count_nonzero(mask) > 0: new_voltage[start_index + jj] = np.average(sub_voltage[mask]) new_current[start_index + jj] = np.average(sub_current[mask]) # filter out NaN values nan_mask = np.logical_not(np.isnan(new_voltage)) new_voltage = new_voltage[nan_mask] new_current = new_current[nan_mask] return new_voltage, new_current
[docs] def merge_voltage_clusters( # noqa: C901, PLR0912 voltage: np.ndarray, current: np.ndarray, voltage_step_size: float | None = None, filter_nan: bool = False, # noqa: FBT001, FBT002 ) -> tuple[np.ndarray, np.ndarray]: r""" Search the ``voltage`` array for closely spaced voltage clusters based on the ``voltage_step_size`` parameter and merge those clusters, and associated ``current`` values, into a single point. This function is intended to merge together identical, or close, voltage points to remove small step sizes in the swept langmuir trace before differentiation. Parameters ---------- voltage: `numpy.ndarray` 1D `numpy.ndarray` representing the voltage of the swept Langmuir trace. Voltage needs to be monotonically ascending. *No units are assumed or checked, but values should be in volts.* current: `numpy.ndarray` 1D `numpy.ndarray` representing the current of the swept Langmuir trace. Values should start from a negative ion-saturation current and increase to a positive electron-saturation current. *No units are assumed or checked, but values should be in amperes.* voltage_step_size: `float` | `None`, default: `None` A non-zero, positive step size for the ``voltage`` array cluster identification. A value of ``0`` will merge only duplicate voltage values. A value of `None` will default to 95% of the average step size of the ``voltage`` array (only counting duplicate voltages once). filter_nan: `bool`, default: `False` Set `True` to automatically filter `~numpy.nan` values from the ``voltage`` and ``current`` arrays. Returns ------- voltage : `numpy.ndarray` The new ``voltage`` array. current : `numpy.ndarray` The new ``current`` array. Notes ----- An identified voltage cluster can span a voltage range larger than ``voltage_step_size`` and still have every voltage step being smaller than ``voltage_step_size``. In this scenario, the voltage cluster will be divided up into :math:`N`-sections for averaging, where :math:`N` is given by .. math:: N = \texttt{floor}\left( \frac{V_{cluster,max} - V_{cluster,min}}{\texttt{voltage_step_size}} \right) """ # condition voltage_step_size if voltage_step_size is not None and not isinstance( voltage_step_size, numbers.Real, ): raise TypeError( "Expected 'voltage_step_size' to be a float or None, got type " f"{type(voltage_step_size)}.", ) elif isinstance(voltage_step_size, numbers.Integral): voltage_step_size = float(voltage_step_size) try: voltage, current = check_sweep(voltage, current) except ValueError as err: # check if the ValueError was a result of voltage containing nan # values if not np.any(np.isnan(voltage)): # voltage array has no nan values raise ValueError(*err.args) from err if not filter_nan: raise ValueError( "The voltage array contains NaN values. If you want NaN " "values to be automatically filtered, then set argument " "filter_nan=True.", ) from err # filter (voltage) NaN values and check sweep again nan_mask = np.logical_not(np.isnan(voltage)) voltage = voltage[nan_mask] current = current[nan_mask] voltage, current = check_sweep(voltage, current) # filter (current) NaN values if not np.any(np.isnan(current)): # voltage array has no nan values pass elif filter_nan: # mask out current NaN values nan_mask = np.logical_not(np.isnan(current)) voltage = voltage[nan_mask] current = current[nan_mask] else: raise ValueError( "The current array contains NaN values. If you want NaN " "values to be automatically filtered, then set argument " "filter_nan=True.", ) # check if grid is regularly spaced voltage_diff = np.diff(voltage) mask_zero_diff = np.isclose(voltage_diff, 0.0) is_regular_grid = _is_voltage_regularly_spaced(voltage_diff, mask_zero_diff) # return if voltage is already regularly spaced and no voltage merging is # requested if is_regular_grid and voltage_step_size is None: warnings.warn( "The supplied ``voltage`` array is already regularly spaced. If " "you want to re-bin the arrays to a different voltage_step_size, " "then use something like numpy.interp. No merging performed.", PlasmaPyWarning, stacklevel=2, ) return voltage.copy(), current.copy() # condition voltage_step_size ... Round 2 if voltage_step_size is None: voltage_step_size = 0.95 * np.abs( np.average(voltage_diff[np.logical_not(mask_zero_diff)]), ) elif voltage_step_size == 0: voltage_step_size = 0.0 elif voltage_step_size < 0: voltage_step_size = -voltage_step_size if np.all(voltage_step_size <= np.abs(voltage_diff)) and voltage_step_size != 0: warnings.warn( f"The supplied voltage_step_size ({voltage_step_size}) is smaller than " f"any of the voltage steps in the voltage array. Supply a step size " f"greater than the smallest voltage step ({np.min(voltage_diff)}). " f"No merging performed.", PlasmaPyWarning, stacklevel=2, ) return voltage.copy(), current.copy() # now merge clusters if voltage_step_size == 0: new_voltage, new_current = _merge_voltage_clusters__zero_diff_neighbors( voltage, current, ) else: new_voltage, new_current = _merge_voltage_clusters__within_dv( voltage, current, voltage_step_size, ) return new_voltage, new_current