Source code for plasmapy.simulation.particle_tracker

"""
Module containing the definition for the general particle tracker.
"""

__all__ = [
    "AbstractSaveRoutine",
    "AbstractTerminationCondition",
    "DoNotSaveSaveRoutine",
    "IntervalSaveRoutine",
    "NoParticlesOnGridsTerminationCondition",
    "ParticleTracker",
    "TimeElapsedTerminationCondition",
]

import collections
import sys
import warnings
from abc import ABC, abstractmethod
from collections.abc import Iterable
from pathlib import Path
from typing import Optional

import astropy.units as u
import h5py
import numpy as np
from numpy.typing import NDArray
from tqdm import tqdm

from plasmapy.particles import Particle, particle_input
from plasmapy.plasma.grids import AbstractGrid
from plasmapy.plasma.plasma_base import BasePlasma
from plasmapy.simulation.particle_integrators import boris_push


[docs] class AbstractTerminationCondition(ABC): """Abstract base class containing the necessary methods for a ParticleTracker termination condition.""" @property def tracker(self) -> Optional["ParticleTracker"]: """Return the `ParticleTracker` object for this termination condition.""" return self._particle_tracker @tracker.setter def tracker(self, particle_tracker: "ParticleTracker") -> None: self._particle_tracker: ParticleTracker = particle_tracker @property @abstractmethod def require_synchronized_dt(self) -> bool: """Return if this termination condition requires a synchronized time step.""" ... @property @abstractmethod def progress_description(self) -> str: """Return a small string describing the relevant quantity shown on the meter.""" ... @property @abstractmethod def units_string(self) -> str: """Return a string representation of the units of `total`.""" ... @property @abstractmethod def is_finished(self) -> bool: """Return `True` if the simulation has finished.""" ... @property @abstractmethod def progress(self) -> float: """Return a number representing the progress of the simulation (compared to total). This number represents the numerator of the completion percentage. """ ... @property @abstractmethod def total(self) -> float: """Return a number representing the total number of steps in a simulation. This number represents the denominator of the completion percentage. """ ...
[docs] class TimeElapsedTerminationCondition(AbstractTerminationCondition): """Termination condition corresponding to the elapsed time of a ParticleTracker.""" def __init__(self, termination_time: u.Quantity) -> None: self.termination_time: float = termination_time.to(u.s).value @property def require_synchronized_dt(self) -> bool: """The elapsed time termination condition requires a synchronized step to stop all particles at the same time. """ return True @property def progress_description(self) -> str: """The time elapsed termination condition depends on elapsed time, therefore the relevant quantity is time remaining. """ return "Time remaining" @property def units_string(self) -> str: """The units for the time elapsed condition have the units of seconds.""" return "seconds" @property def is_finished(self) -> bool: """Conclude the simulation if all particles have been tracked over the specified termination time.""" return bool(float(self.tracker.time) >= self.termination_time) @property def progress(self) -> float: """Return the current time step of the simulation.""" return float(self.tracker.time) @property def total(self) -> float: """Return the total amount of time over which the particles are tracked.""" return self.termination_time
[docs] class NoParticlesOnGridsTerminationCondition(AbstractTerminationCondition): """Termination condition corresponding to stopping the simulation when all particles have exited the grid.""" def __init__(self) -> None: pass @property def require_synchronized_dt(self) -> bool: """The no field termination condition does not require a synchronized time step.""" return False @property def progress_description(self) -> str: """The progress meter is described in terms of the fraction of particles still on the grid.""" return "Number of particles still on grid" @property def units_string(self) -> str: """The progress meter is described in terms of the fraction of particles still on the grid.""" return "Particles" @property def is_finished(self) -> bool: """The simulation is finished when no more particles are on any grids.""" is_not_on_grid = self.tracker.on_any_grid == False # noqa: E712 return is_not_on_grid.all() and self.tracker.iteration_number > 0 @property def progress(self) -> float: """The progress of the simulation is measured by how many particles are no longer on a grid.""" is_not_on_grid: NDArray[np.bool_] = self.tracker.on_grid == 0 return float(is_not_on_grid.sum()) @property def total(self) -> float: """The progress of the simulation is measured against the total number of particles in the simulation.""" return float(self.tracker.nparticles)
[docs] class AbstractSaveRoutine(ABC): """Abstract base class containing the necessary methods for a |ParticleTracker| save routine. The save routine class is responsible for defining the conditions and hooks for saving. Parameters ---------- output_directory : `~pathlib.Path`, optional Output for objects that are saved to disk. If a directory is not specified then a memory save routine is used. Notes ----- After every push, the `post_push_hook` method is called with the respective `~plasmapy.simulation.particle_tracker.ParticleTracker` object passed as a parameter. Then, the hook calls `save_now` to determine whether or not the simulation state should be saved. """ def __init__(self, output_directory: Path | None = None) -> None: self.output_directory = output_directory self.x_all = [] self.v_all = [] self._particle_tracker: ParticleTracker | None = None @property def tracker(self) -> Optional["ParticleTracker"]: """Return the `ParticleTracker` object for this stop condition.""" return self._particle_tracker @tracker.setter def tracker(self, particle_tracker: "ParticleTracker") -> None: self._particle_tracker = particle_tracker @property @abstractmethod def require_synchronized_dt(self) -> bool: """Return if this save routine requires a synchronized time step.""" ... @property @abstractmethod def save_now(self) -> bool: """Determine if to save on the current push step.""" ...
[docs] def save(self) -> None: """Save the current state of the simulation to disk or memory based on whether the output directory was set.""" if self.output_directory is not None: self._save_to_disk() else: self._save_to_memory()
def _save_to_disk(self) -> None: """Save a hdf5 file containing simulation positions and velocities.""" path = self.output_directory / f"{self.tracker.iteration_number}.hdf5" with h5py.File(path, "w") as output_file: output_file["x"] = self.tracker.x output_file["v"] = self.tracker.v def _save_to_memory(self) -> None: """Append simulation positions and velocities to save routine object.""" self.x_all.append(np.copy(self._particle_tracker.x)) self.v_all.append(np.copy(self._particle_tracker.v))
[docs] def post_push_hook(self, force_save=False) -> None: """Function called after a push step. This function is responsible for handling two steps of save routine, namely: - Deciding to save on the current time step - How the simulation data is saved (i.e. to disk or memory) """ if self.save_now or force_save: self.save()
[docs] class DoNotSaveSaveRoutine(AbstractSaveRoutine): """The default save routine for the |ParticleTracker| class. This save routine is a placeholder and will not save the state of the particle tracker. """ def __init__(self) -> None: super().__init__() @property def require_synchronized_dt(self) -> bool: """The do not save save routine does not require a synchronized time step.""" return False @property def save_now(self) -> bool: """The do not save save routine will never save by definition.""" return False
[docs] class IntervalSaveRoutine(AbstractSaveRoutine): """Abstract class describing a save routine that saves every given interval.""" def __init__(self, interval: u.Quantity, **kwargs) -> None: super().__init__(**kwargs) self.t_all: list[float] = [] self.save_interval: float = interval.to(u.s).value self.time_of_last_save: float = 0 @property def require_synchronized_dt(self) -> bool: """Save output only makes sense for synchronized time steps.""" return True @property def save_now(self) -> bool: """Save at every interval given in instantiation.""" return bool(self.tracker.time - self.time_of_last_save >= self.save_interval)
[docs] def save(self) -> None: """Save the current state of the simulation. Sets the time of last save attribute and log the timestamp. """ super().save() self.time_of_last_save = self.tracker.time self.t_all.append(self.tracker.time)
[docs] def results(self) -> tuple[u.Quantity, u.Quantity, u.Quantity]: """Return the results of the simulation. The quantities returned are the times, positions, and velocities, respectively. """ return ( np.asarray(self.t_all) * u.s, np.asarray(self.x_all) * u.m, np.asarray(self.v_all) * u.m / u.s, )
[docs] class ParticleTracker: r"""A particle tracker for particles in electric and magnetic fields without inter-particle interactions. Particles are instantiated and pushed through a grid of provided E and B fields using the Boris push algorithm. These fields are specified as part of a grid which are then interpolated to determine the local field acting on each particle. The time step used in the push routine can be specified, or an adaptive time step will be determined based off the gyroradius of the particle. Some save routines involve time stamping the location and velocities of particles at fixed intervals. In order for this data to be coherent, it is required that all particles follow the same time step. This is referred to as a synchronized time step. If no time step is specified and the provided save routine does not require a synchronized time step, then an adaptive time step is calculated independently for each particle. The simulation will push particles through the provided fields until a condition is met. This termination condition is provided as an instance of the `~plasmapy.simulation.particle_tracker.AbstractTerminationCondition` class as arguments to the simulation constructor. The results of a simulation can be exported by specifying an instance of the `~plasmapy.simulation.particle_tracker.AbstractSaveRoutine` class to the ``run`` method. Parameters ---------- grids : An instance of `~plasmapy.plasma.grids.AbstractGrid` A Grid object or list of grid objects containing the required quantities. The list of required quantities varies depending on other keywords. termination_condition : `~plasmapy.simulation.particle_tracker.AbstractTerminationCondition` An instance of `~plasmapy.simulation.particle_tracker.AbstractTerminationCondition` which determines when the simulation has finished. See `~plasmapy.simulation.particle_tracker.AbstractTerminationCondition` for more details. save_routine : `~plasmapy.simulation.particle_tracker.AbstractSaveRoutine`, optional An instance of `~plasmapy.simulation.particle_tracker.AbstractSaveRoutine` which determines which time steps of the simulation to save. The default is `~plasmapy.simulation.particle_tracker.DoNotSaveSaveRoutine`. See `~plasmapy.simulation.particle_tracker.AbstractSaveRoutine` for more details. dt : `~astropy.units.Quantity`, optional An explicitly set time step in units convertible to seconds. Setting this optional keyword overrules the adaptive time step capability and forces the use of this time step throughout. dt_range : tuple of shape (2,) of `~astropy.units.Quantity`, optional If specified, the calculated adaptive time step will be clamped between the first and second values. field_weighting : str String that selects the field weighting algorithm used to determine what fields are felt by the particles. Options are: * 'nearest neighbor': Particles are assigned the fields on the grid vertex closest to them. * 'volume averaged' : The fields experienced by a particle are a volume-average of the eight grid points surrounding them. The default is 'volume averaged'. req_quantities : `list` of `str`, default : `None` A list of quantity keys required to be specified on the Grid object. The base particle pushing simulation requires the quantities [E_x, E_y, E_z, B_x, B_y, B_z]. This keyword is for specifying quantities in addition to these six. If any required quantities are missing, those quantities will be assumed to be zero everywhere. A warning will be raised if any of the additional required quantities are missing and are set to zero. verbose : bool, optional If true, updates on the status of the program will be printed into the standard output while running. The default is True. Notes ----- We adopt the convention of ``NaN`` values to represent various states of a particle. If the particle's position and velocity are not ``NaN``, the particle is being tracked and evolved. If the particle's position is not ``NaN``, but the velocity is ``NaN``, the particle has been stopped (i.e. it is still in the simulation but is no longer evolved.) If both the particle's position and velocity are set to ``NaN``, then the particle has been removed from the simulation. """ def __init__( self, grids: AbstractGrid | Iterable[AbstractGrid], termination_condition: AbstractTerminationCondition | None = None, save_routine: AbstractSaveRoutine | None = None, dt=None, dt_range=None, field_weighting="volume averaged", req_quantities=None, verbose=True, ) -> None: # self.grid is the grid object if isinstance(grids, AbstractGrid): self.grids = [ grids, ] elif isinstance(grids, collections.abc.Iterable): self.grids = grids else: self.grids = None # Errors for unsupported grid types are raised in the validate constructor inputs method # Instantiate the "do not save" save routine if no save routine was specified if save_routine is None: save_routine = DoNotSaveSaveRoutine() # Validate inputs to the run function self._validate_constructor_inputs( grids, termination_condition, save_routine, field_weighting ) self._set_time_step_attributes(dt, termination_condition, save_routine) if dt_range is not None and not self._is_adaptive_time_step: raise ValueError( "Specifying a time step range is only possible for an adaptive time step." ) self.verbose = verbose # This flag records whether the simulation has been run self._has_run = False # Raise a ValueError if a synchronized dt is required by termination condition or save routine but one is # not given. This is only the case if an array with differing entries is specified for dt if self._require_synchronized_time and not self._is_synchronized_time_step: raise ValueError( "Please specify a synchronized time step to use the simulation with this configuration!" ) self._preprocess_grids(req_quantities) # self.grid_arr is the grid positions in si units. This is created here # so that it isn't continuously called later self.grids_arr = [grid.grid.to(u.m).value for grid in self.grids] self.dt = dt.to(u.s).value if dt is not None else None dt_range = [0, np.inf] * u.s if dt_range is None else dt_range self.dt_range = dt_range.to(u.s).value # Update the `tracker` attribute so that the stop condition & save routine can be used termination_condition.tracker = self save_routine.tracker = self self.termination_condition = termination_condition self.save_routine = save_routine def _set_time_step_attributes( self, dt, termination_condition, save_routine ) -> None: """Determines whether the simulation will follow a synchronized or adaptive time step.""" self._require_synchronized_time = ( termination_condition.require_synchronized_dt or (save_routine is not None and save_routine.require_synchronized_dt) ) if isinstance(dt, u.Quantity): if isinstance(dt.value, np.ndarray): # If an array is specified for the time step, a synchronized time step is implied if all # the entries are equal self._is_synchronized_time_step = bool( np.all(dt.value[0] == dt.value[:]) ) else: self._is_synchronized_time_step = True self._is_adaptive_time_step = False elif dt is None: self._is_synchronized_time_step = self._require_synchronized_time self._is_adaptive_time_step = True if self._is_adaptive_time_step: # Initialize default values for time steps per gyroperiod and Courant parameter self.setup_adaptive_time_step()
[docs] def setup_adaptive_time_step( self, time_steps_per_gyroperiod: int | None = 12, Courant_parameter: float | None = 0.5, ) -> None: """Set parameters for the adaptive time step candidates. Parameters ---------- time_steps_per_gyroperiod : int, optional The minimum ratio of the particle gyroperiod to the timestep. Higher numbers correspond to higher temporal resolution. The default is twelve. Courant_parameter : float, optional The Courant parameter is the minimum ratio of the timestep to the grid crossing time, grid cell length / particle velocity. Lower Courant numbers correspond to higher temporal resolution. Notes ----- Two candidates are calculated for the adaptive time step: a time step based on the gyroradius of the particle and a time step based on the resolution of the grid. The candidate associated with the gyroradius of the particle takes a ``time_steps_per_gyroperiod`` parameter that specifies how many times the orbit of a gyrating particles will be subdivided. The other candidate, associated with the spatial resolution of the grid object, calculates a time step using the time it would take the fastest particle to cross some fraction of a grid cell length. This fraction is the Courant number. """ if not self._is_adaptive_time_step: raise ValueError( "The setup adaptive time step method only applies to adaptive time steps!" ) self._steps_per_gyroperiod = time_steps_per_gyroperiod self._Courant_parameter = Courant_parameter
def _validate_constructor_inputs( self, grids, termination_condition, save_routine, field_weighting: str ) -> None: """ Ensure the specified termination condition and save routine are actually a termination routine class and save routine, respectively. This function also sets the `_is_synchronized_time_step` and `_is_adaptive_time_step` attributes. """ if isinstance(grids, BasePlasma): raise TypeError( "It appears you may be trying to access an older version of the ParticleTracker class." "This class has been deprecated." "Please revert to PlasmaPy version 2023.5.1 to use this version of ParticleTracker." ) # The constructor did not recognize the provided grid object elif self.grids is None: raise TypeError("Type of argument `grids` not recognized.") if not isinstance(termination_condition, AbstractTerminationCondition): raise TypeError("Please specify a valid termination condition.") if not isinstance(save_routine, AbstractSaveRoutine): raise TypeError("Please specify a valid save routine.") # Load and validate inputs field_weightings = ["volume averaged", "nearest neighbor"] if field_weighting in field_weightings: self.field_weighting = field_weighting else: raise ValueError( f"{field_weighting} is not a valid option for ", "field_weighting. Valid choices are", f"{field_weightings}", ) def _preprocess_grids(self, additional_required_quantities) -> None: """Add required quantities to grid objects. Grids lacking the required quantities will be filled with zeros. """ # Some quantities are necessary for the particle tracker to function regardless of other configurations required_quantities = {"E_x", "E_y", "E_z", "B_x", "B_y", "B_z"} for grid in self.grids: # Require the field quantities - do not warn if they are absent # and are replaced with zeros grid.require_quantities( required_quantities, replace_with_zeros=True, warn_on_replace_with_zeros=False, ) if additional_required_quantities is not None: # Require the additional quantities - in this case, do warn # if they are set to zeros grid.require_quantities( additional_required_quantities, replace_with_zeros=True ) if additional_required_quantities is not None: # Add additional required quantities based off simulation configuration required_quantities.update(additional_required_quantities) for grid in self.grids: for rq in required_quantities: # Check that there are no infinite values if not np.isfinite(grid[rq].value).all(): raise ValueError( f"Input arrays must be finite: {rq} contains " "either NaN or infinite values." ) # Check that the max values on the edges of the arrays are # small relative to the maximum values on that grid # # Array must be dimensionless to re-assemble it into an array # of max values like this arr = np.abs(grid[rq]).value edge_max = np.max( np.array( [ np.max(a) for a in ( arr[0, :, :], arr[-1, :, :], arr[:, 0, :], arr[:, -1, :], arr[:, :, 0], arr[:, :, -1], ) ] ) ) if edge_max > 1e-3 * np.max(arr): unit = grid.recognized_quantities[rq].unit warnings.warn( "Quantities should go to zero at edges of grid to avoid " f"non-physical effects, but a value of {edge_max:.2E} {unit} was " f"found on the edge of the {rq} array. Consider applying a " "envelope function to force the quantities at the edge to go to " "zero.", RuntimeWarning, ) @property def num_grids(self) -> int: """The number of grids specified at instantiation.""" return len(self.grids) def _log(self, msg) -> None: if self.verbose: print(msg) # noqa: T201
[docs] @particle_input def load_particles( self, x, v, particle: Particle = Particle("p+"), # noqa: B008 ) -> None: r""" Load arrays of particle positions and velocities. Parameters ---------- x : `~astropy.units.Quantity`, shape (N,3) Positions for N particles v : `~astropy.units.Quantity`, shape (N,3) Velocities for N particles particle : |particle-like|, optional Representation of the particle species as either a |Particle| object or a string representation. The default particle is protons. """ # Raise an error if the run method has already been called. self._enforce_order() self.q = particle.charge.to(u.C).value self.m = particle.mass.to(u.kg).value if x.shape[0] != v.shape[0]: raise ValueError( "Provided x and v arrays have inconsistent numbers " " of particles " f"({x.shape[0]} and {v.shape[0]} respectively)." ) else: self.nparticles: int = x.shape[0] self.x = x.to(u.m).value self.v = v.to(u.m / u.s).value
[docs] def run(self) -> None: r""" Runs a particle-tracing simulation. Time steps are adaptively calculated based on the local grid resolution of the particles and the electric and magnetic fields they are experiencing. Returns ------- None """ self._enforce_particle_creation() # Keep track of how many push steps have occurred for trajectory tracing self.iteration_number = 0 self.time: NDArray[np.float64] | float = ( np.zeros((self.nparticles, 1)) if not self.is_synchronized_time_step else 0 ) # Create flags for tracking when particles during the simulation # on_grid -> zero if the particle is off grid, 1 # shape [nparticles, ngrids] self.on_grid: NDArray[np.bool_] = np.zeros( [self.nparticles, self.num_grids] ).astype(np.bool_) # Entered grid -> non-zero if particle EVER entered a grid self.entered_grid: NDArray[np.bool_] = np.zeros([self.nparticles]).astype( np.bool_ ) # Initialize a "progress bar" (really more of a meter) # Setting sys.stdout lets this play nicely with regular print() pbar = tqdm( initial=0, total=self.termination_condition.total, disable=not self.verbose, desc=self.termination_condition.progress_description, unit=self.termination_condition.units_string, bar_format="{l_bar}{bar}{n:.1e}/{total:.1e} {unit}", file=sys.stdout, ) # Push the particles until the termination condition is satisfied is_finished = False while not (is_finished or self.nparticles_tracked == 0): is_finished = self.termination_condition.is_finished progress = min( self.termination_condition.progress, self.termination_condition.total ) pbar.n = progress pbar.last_print_n = progress pbar.update(0) self._push() if self.save_routine is not None: self.save_routine.post_push_hook() if self.save_routine is not None: self.save_routine.post_push_hook(force_save=True) pbar.close() # Log a summary of the run self._log("Run completed") # Simulation has not run, because creating new particles changes the simulation self._has_run = True
def _stop_particles(self, particles_to_stop_mask) -> None: """Stop tracking the particles specified by the stop mask. This is represented by setting the particle's velocity to NaN. """ if len(particles_to_stop_mask) != self.x.shape[0]: raise ValueError( f"Expected mask of size {self.x.shape[0]}, got {len(particles_to_stop_mask)}" ) self.v[particles_to_stop_mask] = np.nan def _remove_particles(self, particles_to_remove_mask) -> None: """Remove the specified particles from the simulation. For the sake of keeping consistent array lengths, the position and velocities of the removed particles are set to NaN. """ if len(particles_to_remove_mask) != self.x.shape[0]: raise ValueError( f"Expected mask of size {self.x.shape[0]}, got {len(particles_to_remove_mask)}" ) self.x[particles_to_remove_mask] = np.nan self.v[particles_to_remove_mask] = np.nan # ************************************************************************* # Run/push loop methods # ************************************************************************* def _adaptive_dt(self, Ex, Ey, Ez, Bx, By, Bz) -> NDArray[np.float64] | float: # noqa: ARG002 r""" Calculate the appropriate dt for each grid based on a number of considerations including the local grid resolution (ds) and the gyroperiod of the particles in the current fields. """ # candidate time steps includes one per grid (based on the grid resolution) # plus additional candidates based on the field at each particle candidates = np.ones([self.nparticles, self.num_grids + 1]) * np.inf # Compute the time step indicated by the grid resolution ds = np.array([grid.grid_resolution.to(u.m).value for grid in self.grids]) gridstep = self._Courant_parameter * (ds / self.vmax) # Wherever a particle is on a grid, include that grid's grid step # in the list of candidate time steps for i, _grid in enumerate(self.grids): # noqa: B007 candidates[:, i] = np.where(self.on_grid[:, i] > 0, gridstep[i], np.inf) # If not, compute a number of possible time steps # Compute the cyclotron gyroperiod Bmag = np.max(np.sqrt(Bx**2 + By**2 + Bz**2)).to(u.T).value # Compute the gyroperiod if Bmag == 0: gyroperiod = np.inf else: gyroperiod = 2 * np.pi * self.m / (self.q * np.max(Bmag)) # Subdivide the gyroperiod into a provided number of steps # Use the result as the candidate associated with gyration in B field candidates[:, self.num_grids] = gyroperiod / self._steps_per_gyroperiod # TODO: introduce a minimum time step based on electric fields too! # Enforce limits on dt candidates = np.clip(candidates, self.dt_range[0], self.dt_range[1]) if not self._is_synchronized_time_step: # dt is the min of all the candidates for each particle # a separate dt is returned for each particle dt = np.min(candidates, axis=-1) # dt should never actually be infinite, so replace any infinities # with the largest gridstep dt[dt == np.inf] = np.max(gridstep) else: # a single value for dt is returned # this is the time step used for all particles dt = np.min(candidates) return dt def _push(self) -> None: r""" Advance particles using an implementation of the time-centered Boris algorithm. """ # Get a list of positions (input for interpolator) tracked_mask = self._tracked_particle_mask() self.iteration_number += 1 # nparticles_tracked may fluctuate with stopping # all particles are therefore used regardless of if they reach the grid pos_all = self.x pos_tracked = pos_all[tracked_mask] vel_all = self.v vel_tracked = vel_all[tracked_mask] # Update the list of particles on and off the grid # shape [nparticles, ngrids] self.on_grid = ( np.array([grid.on_grid(pos_all * u.m) for grid in self.grids]) .astype(np.bool_) .T ) # entered_grid is zero at the end if a particle has never # entered any grid self.entered_grid += np.sum(self.on_grid, axis=-1).astype(np.bool_) Ex = np.zeros(self.nparticles_tracked) * u.V / u.m Ey = np.zeros(self.nparticles_tracked) * u.V / u.m Ez = np.zeros(self.nparticles_tracked) * u.V / u.m Bx = np.zeros(self.nparticles_tracked) * u.T By = np.zeros(self.nparticles_tracked) * u.T Bz = np.zeros(self.nparticles_tracked) * u.T for grid in self.grids: # Estimate the E and B fields for each particle # Note that this interpolation step is BY FAR the slowest part of the push # loop. Any speed improvements will have to come from here. if self.field_weighting == "volume averaged": _Ex, _Ey, _Ez, _Bx, _By, _Bz = grid.volume_averaged_interpolator( pos_tracked * u.m, "E_x", "E_y", "E_z", "B_x", "B_y", "B_z", persistent=True, ) elif self.field_weighting == "nearest neighbor": _Ex, _Ey, _Ez, _Bx, _By, _Bz = grid.nearest_neighbor_interpolator( pos_tracked * u.m, "E_x", "E_y", "E_z", "B_x", "B_y", "B_z", persistent=True, ) # Interpret any NaN values (points off the grid) as zero # Do this before adding to the totals, because 0 + nan = nan _Ex = np.nan_to_num(_Ex, nan=0.0 * u.V / u.m) _Ey = np.nan_to_num(_Ey, nan=0.0 * u.V / u.m) _Ez = np.nan_to_num(_Ez, nan=0.0 * u.V / u.m) _Bx = np.nan_to_num(_Bx, nan=0.0 * u.T) _By = np.nan_to_num(_By, nan=0.0 * u.T) _Bz = np.nan_to_num(_Bz, nan=0.0 * u.T) # Add the values interpolated for this grid to the totals Ex += _Ex Ey += _Ey Ez += _Ez Bx += _Bx By += _By Bz += _Bz # Create arrays of E and B as required by push algorithm E = np.array( [Ex.to(u.V / u.m).value, Ey.to(u.V / u.m).value, Ez.to(u.V / u.m).value] ) E = np.moveaxis(E, 0, -1) B = np.array([Bx.to(u.T).value, By.to(u.T).value, Bz.to(u.T).value]) B = np.moveaxis(B, 0, -1) # Calculate the adaptive time step from the fields currently experienced # by the particles # If user sets dt explicitly, that's handled in _adaptive_dt if self._is_adaptive_time_step: dt = self._adaptive_dt(Ex, Ey, Ez, Bx, By, Bz) else: dt = self.dt # TODO: Test v/c and implement relativistic Boris push when required # vc = np.max(v)/_c # Make sure the time step can be multiplied by a [nparticles, 3] shape field array if isinstance(dt, np.ndarray) and dt.size > 1: dt = dt[tracked_mask, np.newaxis] # Increment the tracked particles' time by dt self.time[tracked_mask] += dt else: self.time += dt self.x[tracked_mask], self.v[tracked_mask] = boris_push( pos_tracked, vel_tracked, B, E, self.q, self.m, dt, inplace=False ) self.dt = dt @property def on_any_grid(self) -> NDArray[np.bool_]: """ Binary array for each particle indicating whether it is currently on ANY grid. """ return np.sum(self.on_grid, axis=-1) > 0 @property def vmax(self) -> float: """The maximum velocity of any particle in the simulation. This quantity is used for determining the grid crossing maximum time step. """ tracked_mask = self._tracked_particle_mask() return float(np.max(np.linalg.norm(self.v[tracked_mask], axis=-1))) def _tracked_particle_mask(self) -> NDArray[np.bool_]: """ Calculates a boolean mask corresponding to particles that have not been stopped or removed. """ return ~np.logical_or(np.isnan(self.x[:, 0]), np.isnan(self.v[:, 0])) @property def nparticles_tracked(self) -> int: """Return the number of particles that don't have NaN position or velocity.""" return int(self._tracked_particle_mask().sum()) @property def is_adaptive_time_step(self) -> bool: """Return whether the simulation is calculating an adaptive time step or using the user-provided time step.""" return self._is_adaptive_time_step @property def is_synchronized_time_step(self) -> bool: """Return if the simulation is applying the same time step across all particles.""" return self._is_synchronized_time_step def _enforce_particle_creation(self) -> None: """Ensure the array position array `x` has been populated.""" # Check to make sure particles have already been generated if not hasattr(self, "x"): raise ValueError( "Either the create_particles or load_particles method must be " "called before running the particle tracing algorithm." ) def _enforce_order(self) -> None: r""" The `Tracker` methods could give strange results if setup methods are used again after the simulation has run. This method raises an error if the simulation has already been run. """ if self._has_run: raise RuntimeError( "Modifying the `Tracker` object after running the " "simulation is not supported. Create a new `Tracker` " "object for a new simulation." )