# Source code for plasmapy.diagnostics.charged_particle_radiography.synthetic_radiography

```
"""
Routines for the analysis of proton radiographs. These routines can be broadly
classified as either creating synthetic radiographs from prescribed fields or
methods of 'inverting' experimentally created radiographs to reconstruct the
original fields (under some set of assumptions).
"""
__all__ = ["Tracker", "synthetic_radiograph"]
import collections
import sys
import warnings
from collections.abc import Iterable
import astropy.constants as const
import astropy.units as u
import numpy as np
from tqdm import tqdm
from plasmapy import particles
from plasmapy.formulary.mathematics import rot_a_to_b
from plasmapy.particles import Particle
from plasmapy.plasma.grids import AbstractGrid
from plasmapy.simulation.particle_integrators import boris_push
def _coerce_to_cartesian_si(pos):
"""
Takes a tuple of `astropy.unit.Quantity` values representing a position
in space in either Cartesian, cylindrical, or spherical coordinates, and
returns a numpy array representing the same point in Cartesian
coordinates and units of meters.
"""
# Auto-detect geometry based on units
geo_units = [x.unit for x in pos]
if geo_units[2].is_equivalent(u.rad):
geometry = "spherical"
elif geo_units[1].is_equivalent(u.rad):
geometry = "cylindrical"
else:
geometry = "cartesian"
# Convert geometrical inputs between coordinates systems
pos_out = np.zeros(3)
if geometry == "cartesian":
x, y, z = pos
pos_out[0] = x.to(u.m).value
pos_out[1] = y.to(u.m).value
pos_out[2] = z.to(u.m).value
elif geometry == "cylindrical":
r, t, z = pos
r = r.to(u.m)
t = t.to(u.rad).value
z = z.to(u.m)
pos_out[0] = (r * np.cos(t)).to(u.m).value
pos_out[1] = (r * np.sin(t)).to(u.m).value
pos_out[2] = z.to(u.m).value
elif geometry == "spherical":
r, t, p = pos
r = r.to(u.m)
t = t.to(u.rad).value
p = p.to(u.rad).value
pos_out[0] = (r * np.sin(t) * np.cos(p)).to(u.m).value
pos_out[1] = (r * np.sin(t) * np.sin(p)).to(u.m).value
pos_out[2] = (r * np.cos(t)).to(u.m).value
return pos_out
[docs]
class Tracker:
r"""
Represents a charged particle radiography experiment with simulated or
calculated E and B fields given at positions defined by a grid of spatial
coordinates. The particle source and detector plane are defined by vectors
from the origin of the grid.
Parameters
----------
grids : `~plasmapy.plasma.grids.AbstractGrid` or subclass thereof, or list
of same.
A Grid object or list of grid objects containing the required
quantities [E_x, E_y, E_z, B_x, B_y, B_z].
If any of these quantities are missing, a warning will be given and that
quantity will be assumed to be zero everywhere.
source : `~astropy.units.Quantity`, shape (3)
A vector pointing from the origin of the grid to the location
of the particle source. This vector will be interpreted as
being in either cartesian, cylindrical, or spherical coordinates
based on its units. Valid geometries are:
* Cartesian (x,y,z) : (meters, meters, meters)
* Cylindrical (r, theta, z) : (meters, radians, meters)
* Spherical (r, theta, phi) : (meters, radians, radians)
In spherical coordinates theta is the polar angle.
detector : `~astropy.units.Quantity`, shape (3)
A vector pointing from the origin of the grid to the center
of the detector plane. The vector from the source point to this
point defines the normal vector of the detector plane. This vector
can also be specified in cartesian, cylindrical, or spherical
coordinates (see the ``source`` keyword).
detector_hdir : `numpy.ndarray`, shape (3), optional
A unit vector (in Cartesian coordinates) defining the horizontal
direction on the detector plane. By default, the horizontal axis in the
detector plane is defined to be perpendicular to both the
source-to-detector vector and the z-axis (unless the source-to-detector
axis is parallel to the z axis, in which case the horizontal axis is
the x-axis).
The detector vertical axis is then defined
to be orthogonal to both the source-to-detector vector and the
detector horizontal axis.
verbose : bool, optional
If true, updates on the status of the program will be printed
into the standard output while running.
"""
def __init__(
self,
grids: AbstractGrid | Iterable[AbstractGrid],
source: u.Quantity[u.m],
detector: u.Quantity[u.m],
detector_hdir=None,
verbose: bool = True,
) -> None:
# self.grid is the grid object
if isinstance(grids, AbstractGrid):
self.grids = [
grids,
]
elif isinstance(grids, collections.abc.Iterable):
self.grids = grids
else:
raise TypeError("Type of argument `grids` not recognized.")
# self.grid_arr is the grid positions in si units. This is created here
# so that it isn't continuously called later
self.grids_arr = [grid.grid.to(u.m).value for grid in self.grids]
self.verbose = verbose
# A list of wire meshes added to the grid with add_wire_mesh
# Particles that would hit these meshes will be removed at runtime
# by _apply_wire_mesh
self.mesh_list = []
# This flag records whether the simulation has been run
self._has_run = False
# ************************************************************************
# Setup the source and detector geometries
# ************************************************************************
self.source = _coerce_to_cartesian_si(source)
self.detector = _coerce_to_cartesian_si(detector)
self._log(f"Source: {self.source} m")
self._log(f"Detector: {self.detector} m")
# Calculate normal vectors (facing towards the grid origin) for both
# the source and detector planes
self.src_n = -self.source / np.linalg.norm(self.source)
self.det_n = -self.detector / np.linalg.norm(self.detector)
# Vector directly from source to detector
self.src_det = self.detector - self.source
# Magnification
self.mag = 1 + (np.linalg.norm(self.detector) / np.linalg.norm(self.source))
self._log(f"Magnification: {self.mag}")
# Check that source-detector vector actually passes through the grid
test = [
grid.vector_intersects(self.source * u.m, self.detector * u.m)
for grid in self.grids
]
if not any(test):
raise ValueError(
"The vector between the source and the detector "
"does not intersect the grid provided!"
)
# Determine the angle above which particles will not hit the grid
# these particles can be ignored until the end of the simulation,
# then immediately advanced to the detector grid with their original
# velocities
self.max_theta_hit_grid = self._max_theta_hit_grid()
# *********************************************************************
# Define the detector plane
# *********************************************************************
# Load or calculate the detector hdir
if detector_hdir is not None:
self.det_hdir = detector_hdir / np.linalg.norm(detector_hdir)
else:
self.det_hdir = self._default_detector_hdir()
# Calculate the detector vdir
ny = np.cross(self.det_hdir, self.det_n)
self.det_vdir = -ny / np.linalg.norm(ny)
# *********************************************************************
# Validate the E and B fields
# *********************************************************************
req_quantities = ["E_x", "E_y", "E_z", "B_x", "B_y", "B_z"]
for grid in self.grids:
grid.require_quantities(req_quantities, replace_with_zeros=True)
for rq in req_quantities:
# Check that there are no infinite values
if not np.isfinite(grid[rq].value).all():
raise ValueError(
f"Input arrays must be finite: {rq} contains "
"either NaN or infinite values."
)
# Check that the max values on the edges of the arrays are
# small relative to the maximum values on that grid
#
# Array must be dimensionless to re-assemble it into an array
# of max values like this
arr = np.abs(grid[rq]).value
edge_max = np.max(
np.array(
[
np.max(a)
for a in (
arr[0, :, :],
arr[-1, :, :],
arr[:, 0, :],
arr[:, -1, :],
arr[:, :, 0],
arr[:, :, -1],
)
]
)
)
if edge_max > 1e-3 * np.max(arr):
unit = grid.recognized_quantities[rq].unit
warnings.warn(
"Fields should go to zero at edges of grid to avoid "
f"non-physical effects, but a value of {edge_max:.2E} {unit} was "
f"found on the edge of the {rq} array. Consider applying a "
"envelope function to force the fields at the edge to go to "
"zero.",
RuntimeWarning,
)
@property
def num_grids(self): # noqa: D102
return len(self.grids)
def _default_detector_hdir(self):
"""
Calculates the default horizontal unit vector for the detector plane
(see __init__ description for details).
"""
# Create unit vectors that define the detector plane
# Define plane horizontal axis
if np.allclose(np.abs(self.det_n), np.array([0, 0, 1])):
nx = np.array([1, 0, 0])
else:
nx = np.cross(np.array([0, 0, 1]), self.det_n)
return nx / np.linalg.norm(nx)
def _max_theta_hit_grid(self):
r"""
Using the grid and the source position, compute the maximum particle
theta that will impact the grid. This value can be used to determine
which particles are worth tracking.
"""
theta = np.zeros([8, self.num_grids])
for i, _grid in enumerate(self.grids): # noqa: B007
ind = 0
for x in (0, -1):
for y in (0, -1):
for z in (0, -1):
# Source to grid corner vector
vec = self.grids_arr[i][x, y, z, :] - self.source
# Calculate angle between vec and the source-to-detector
# axis, which is the central axis of the particle beam
theta[ind, i] = np.arccos(
np.dot(vec, self.src_det)
/ np.linalg.norm(vec)
/ np.linalg.norm(self.src_det)
)
ind += 1
return np.max(theta)
def _log(self, msg) -> None:
if self.verbose:
# We'll need to switch from print() to using logging library
print(msg) # noqa: T201
# Define some constants so they don't get constantly re-evaluated
_c = const.c.si.value
# *************************************************************************
# Create mesh
# *************************************************************************
[docs]
def add_wire_mesh(
self, location, extent, nwires, wire_diameter, mesh_hdir=None, mesh_vdir=None
):
"""
Add a wire mesh grid between the particle source and the object grid
that blocks particles whose paths intersect the wires.
Parameters
----------
location : `~astropy.units.Quantity`, shape (3)
A vector pointing from the origin of the grid to the center of the
mesh grid. This location must be between the source and the
object grid.
This vector will be interpreted as
being in either cartesian, cylindrical, or spherical coordinates
based on its units. Valid geometries are:
* Cartesian (x,y,z) : (meters, meters, meters)
* Cylindrical (r, theta, z) : (meters, radians, meters)
* Spherical (r, theta, phi) : (meters, radians, radians)
In spherical coordinates theta is the polar angle.
extent : Tuple of 1 or 2 `~astropy.units.Quantity`
The size of the mesh grid (in the mesh plane). If one value
is provided, the mesh is circular and the value provided is
interpreted as the diameter. If two values are provided, the
mesh is rectangular and the values are interpreted as the
width and height respectively.
nwires : Tuple of 1 or 2 ints, or a single int
The number of wires in the horizontal and vertical directions. If
only one value is provided, the number in the two directions is
assumed to be equal. Note that a wire will cross the center of the
mesh only when nwires is odd.
wire_diameter : `~astropy.units.Quantity`
The diameter of the wires.
mesh_hdir : `numpy.ndarray`, shape (3), optional
A unit vector (in Cartesian coordinates) defining the horizontal
direction on the mesh plane. Modifying this vector can rotate the
mesh in the plane or tilt the mesh plane relative to the
source-detector axis. By default, ``mesh_hdir`` is set equal to
``detector_hdir`` (see ``detector_hdir`` keyword in ``__init__``).
mesh_vdir : `numpy.ndarray`, shape (3), optional
A unit vector (in Cartesian coordinates) defining the vertical
direction on the mesh plane. Modifying this vector can tilt the
mesh relative to the source-detector axis. By default, ``mesh_vdir``
is defined to be perpendicular to ``mesh_hdir`` and the detector
plane normal (such that the mesh is parallel to the detector plane).
Raises
------
ValueError
Raises a ValueError if the provided mesh location is not
between the source and the object grid.
"""
# Raise an error if the run method has already been called.
self._enforce_order()
location = _coerce_to_cartesian_si(location)
wire_radius = wire_diameter.si.value / 2
if not isinstance(extent, tuple):
extent = (extent,)
if len(extent) == 1:
radius = 0.5 * extent[0].si.value
width = extent[0].si.value
height = extent[0].si.value
elif len(extent) == 2:
radius = None
width = extent[0].si.value
height = extent[1].si.value
else:
raise ValueError(
"extent must be a tuple of 1 or 2 elements, but "
f"{len(extent)} elements were provided."
)
if not isinstance(nwires, tuple):
nwires = (nwires,)
if len(nwires) != 2:
nwires = (nwires[0], nwires[0])
# If no hdir/vdir is specified, calculate a default value
# If one is specified, make sure it is normalized
if mesh_hdir is None:
# Re-calculate the default here, in case the user
# specified a different det_hdir
mesh_hdir = self._default_detector_hdir()
else:
mesh_hdir = mesh_hdir / np.linalg.norm(mesh_hdir)
if mesh_vdir is None:
mesh_vdir = np.cross(mesh_hdir, self.det_n)
mesh_vdir = -mesh_vdir / np.linalg.norm(mesh_vdir)
else:
mesh_vdir = mesh_vdir / np.linalg.norm(mesh_vdir)
# Raise exception if mesh is AFTER the field grid
if np.linalg.norm(location - self.source) > np.linalg.norm(self.source):
raise ValueError(
f"The specified mesh location, {location},"
"is not between the source and the origin."
)
mesh_entry = {
"location": location,
"wire_radius": wire_radius,
"radius": radius,
"width": width,
"height": height,
"nwires": nwires,
"mesh_hdir": mesh_hdir,
"mesh_vdir": mesh_vdir,
}
self.mesh_list.append(mesh_entry)
def _apply_wire_mesh(
self,
location=None,
wire_radius=None,
radius=None,
width=None,
height=None,
nwires=None,
mesh_hdir=None,
mesh_vdir=None,
):
"""
Apply wire meshes that were added to ``self.mesh_list``.
"""
x = self._coast_to_plane(location, mesh_hdir, mesh_vdir)
# Particle positions in 2D on the mesh plane
xloc = np.dot(x - location, mesh_hdir)
yloc = np.dot(x - location, mesh_vdir)
# Create an array in which True indicates that a particle has hit
# a wire and False indicates that it has not
hit = np.zeros(self.nparticles, dtype=bool)
# Mark particles that overlap vertical or horizontal position with
# a wire
h_centers = np.linspace(-width / 2, width / 2, num=nwires[0])
for c in h_centers:
hit |= np.isclose(xloc, c, atol=wire_radius)
v_centers = np.linspace(-height / 2, height / 2, num=nwires[1])
for c in v_centers:
hit |= np.isclose(yloc, c, atol=wire_radius)
# Put back any particles that are outside the mesh boundaries
# First handle the case where the mesh is rectangular
if radius is None:
# Replace particles outside the x-boundary
hit[
np.logical_or(
xloc > np.max(h_centers) + wire_radius,
xloc < np.min(h_centers) - wire_radius,
)
] = False
# Replace particles outside the y-boundary
hit[
np.logical_or(
yloc > np.max(v_centers) + wire_radius,
yloc < np.min(v_centers) - wire_radius,
)
] = False
# Handle the case where the mesh is circular
else:
loc_rad = np.sqrt(xloc**2 + yloc**2)
hit[loc_rad > radius] = False
# In the case of a circular mesh, also create a round wire along
# the outside edge
hit[np.isclose(loc_rad, radius, atol=wire_radius)] = True
# Identify the particles that have hit something, then remove them from
# all of the arrays
keep_these_particles = ~hit
number_kept_particles = keep_these_particles.sum()
nremoved = self.nparticles - number_kept_particles
if self.nparticles - nremoved <= 0:
raise ValueError(
"The specified mesh is blocking all of the particles. "
f"The wire diameter ({2*wire_radius}) may be too large."
)
self.x = self.x[keep_these_particles, :]
self.v = self.v[keep_these_particles, :]
self.theta = self.theta[
keep_these_particles
] # Important to apply here to get correct grid_ind
self.nparticles = number_kept_particles
# *************************************************************************
# Particle creation methods
# *************************************************************************
def _angles_monte_carlo(self, random_seed=None):
"""
Generates angles for each particle randomly such that the flux
per solid angle is uniform.
"""
# Create a probability vector for the theta distribution
# Theta must follow a sine distribution in order for the particle
# flux per solid angle to be uniform.
arg = np.linspace(0, self.max_theta, num=int(1e5))
prob = np.sin(arg)
prob *= 1 / np.sum(prob)
# Create a numpy random number generator instance
rng = np.random.default_rng(seed=random_seed)
# Randomly choose theta's weighted with the sine probabilities
theta = rng.choice(arg, size=self.nparticles, replace=True, p=prob)
# Also generate a uniform phi distribution
phi = rng.uniform(high=2 * np.pi, size=self.nparticles)
return theta, phi
def _angles_uniform(self):
"""
Generates angles for each particle such that their velocities are
uniformly distributed on a grid in theta and phi. This method
requires that `nparticles` be a perfect square. If it is not,
`nparticles` will be set as the largest perfect square smaller
than the provided `nparticles`.
"""
# Calculate the approximate square root
n_per = np.floor(np.sqrt(self.nparticles)).astype(np.int32)
# Set new nparticles to be a perfect square
self.nparticles = n_per**2
# Create an imaginary grid positioned 1 unit from the source
# and spanning max_theta at the corners
extent = np.sin(self.max_theta) / np.sqrt(2)
arr = np.linspace(-extent, extent, num=n_per)
harr, varr = np.meshgrid(arr, arr, indexing="ij")
# calculate the angles from the source for each point in
# the grid.
theta = np.arctan(np.sqrt(harr**2 + varr**2))
phi = np.arctan2(varr, harr)
return theta.flatten(), phi.flatten()
[docs]
@particles.particle_input
def create_particles(
self,
nparticles,
particle_energy,
max_theta=None,
particle: Particle = Particle("p+"), # noqa: B008
distribution="monte-carlo",
random_seed=None,
) -> None:
r"""
Generates the angular distributions about the Z-axis, then
rotates those distributions to align with the source-to-detector axis.
By default, particles are generated over almost the entire pi/2.
However, if the detector is far from the source, many of these
particles will never be observed. The max_theta keyword allows these
extraneous particles to be neglected to focus computational resources
on the particles who will actually hit the detector.
Parameters
----------
nparticles : integer
The number of particles to include in the simulation. The default
is 1e5.
particle_energy : `~astropy.units.Quantity`
The energy of the particle, in units convertible to eV.
All particles are given the same energy.
max_theta : `~astropy.units.Quantity`, optional
The largest velocity vector angle (measured from the
source-to-detector axis) for which particles should be generated.
Decreasing this angle can eliminate particles that would never
reach the detector region of interest. If no value is given, a
guess will be made based on the size of the grid.
Units must be convertible to radians.
particle : ~plasmapy.particles.particle_class.Particle or string representation of same, optional
Representation of the particle species as either a |Particle| object
or a string representation. The default particle is protons.
distribution: str
A keyword which determines how particles will be distributed
in velocity space. Options are:
- 'monte-carlo': velocities will be chosen randomly,
such that the flux per solid angle is uniform.
- 'uniform': velocities will be distributed such that,
left unperturbed,they will form a uniform pattern
on the detection plane. This method
requires that ``nparticles`` be a perfect square. If it is not,
``nparticles`` will be set as the largest perfect square smaller
than the provided ``nparticles``.
Simulations run in the ``'uniform'`` mode will imprint a grid pattern
on the image, but will well-sample the field grid with a
smaller number of particles. The default is ``'monte-carlo'``.
random_seed : int, optional
A random seed to be used when generating random particle
distributions, e.g. with the ``monte-carlo`` distribution.
"""
self._log("Creating Particles")
# Raise an error if the run method has already been called.
self._enforce_order()
# Load inputs
self.nparticles = int(nparticles)
self.particle_energy = particle_energy.to(u.eV).value
self.q = particle.charge.to(u.C).value
self.m = particle.mass.to(u.kg).value
# If max_theta is not specified, make a guess based on the grid size
if max_theta is None:
self.max_theta = np.clip(
1.5 * self.max_theta_hit_grid, 0.01, 0.99 * np.pi / 2
)
else:
self.max_theta = max_theta.to(u.rad).value
# Calculate the velocity corresponding to the particle energy
ER = self.particle_energy * 1.6e-19 / (self.m * self._c**2)
v0 = self._c * np.sqrt(1 - 1 / (ER + 1) ** 2)
if distribution == "monte-carlo":
theta, phi = self._angles_monte_carlo(random_seed=random_seed)
elif distribution == "uniform":
theta, phi = self._angles_uniform()
# Temporarily save theta to later determine which particles
# should be tracked
self.theta = theta
# Construct the velocity distribution around the z-axis
self.v = np.zeros([self.nparticles, 3])
self.v[:, 0] = v0 * np.sin(theta) * np.cos(phi)
self.v[:, 1] = v0 * np.sin(theta) * np.sin(phi)
self.v[:, 2] = v0 * np.cos(theta)
# Calculate the rotation matrix that rotates the z-axis
# onto the source-detector axis
a = np.array([0, 0, 1])
b = self.detector - self.source
rot = rot_a_to_b(a, b)
# Apply rotation matrix to calculated velocity distribution
self.v = np.matmul(self.v, rot)
# Place particles at the source
self.x = np.tile(self.source, (self.nparticles, 1))
[docs]
@particles.particle_input
def load_particles(
self,
x,
v,
particle: Particle = Particle("p+"), # noqa: B008
):
r"""
Load arrays of particle positions and velocities.
Parameters
----------
x : `~astropy.units.Quantity`, shape (N,3)
Positions for N particles
v: `~astropy.units.Quantity`, shape (N,3)
Velocities for N particles
particle : |particle-like|, optional
Representation of the particle species as either a |Particle| object
or a string representation. The default particle is protons.
distribution: str
A keyword which determines how particles will be distributed
in velocity space. Options are:
- 'monte-carlo': velocities will be chosen randomly,
such that the flux per solid angle is uniform.
- 'uniform': velocities will be distributed such that,
left unperturbed,they will form a uniform pattern
on the detection plane.
Simulations run in the ``'uniform'`` mode will imprint a grid pattern
on the image, but will well-sample the field grid with a
smaller number of particles. The default is ``'monte-carlo'``.
"""
# Raise an error if the run method has already been called.
self._enforce_order()
self.q = particle.charge.to(u.C).value
self.m = particle.mass.to(u.kg).value
if x.shape[0] != v.shape[0]:
raise ValueError(
"Provided x and v arrays have inconsistent numbers "
" of particles "
f"({x.shape[0]} and {v.shape[0]} respectively)."
)
else:
self.nparticles = x.shape[0]
self.x = x.to(u.m).value
self.v = v.to(u.m / u.s).value
self.theta = np.arccos(
np.inner(self.v, self.src_n) / np.linalg.norm(self.v, axis=-1)
)
n_wrong_way = np.sum(np.where(self.theta > np.pi / 2, 1, 0))
if n_wrong_way > 1:
warnings.warn(
f"{100*n_wrong_way/self.nparticles:.2f}% of particles "
"initialized are heading away from the grid. Check the "
" orientation of the provided velocity vectors.",
RuntimeWarning,
)
# *************************************************************************
# Run/push loop methods
# *************************************************************************
def _adaptive_dt(self, Ex, Ey, Ez, Bx, By, Bz): # noqa: ARG002
r"""
Calculate the appropriate dt for each grid based on a number of
considerations
including the local grid resolution (ds) and the gyroperiod of the
particles in the current fields.
"""
# If dt was explicitly set, skip the rest of this function
if self.dt.size == 1:
return self.dt
# candidate timesteps includes one per grid (based on the grid resolution)
# plus additional candidates based on the field at each particle
candidates = np.ones([self.nparticles_grid, self.num_grids + 1]) * np.inf
# Compute the timestep indicated by the grid resolution
ds = np.array([grid.grid_resolution.to(u.m).value for grid in self.grids])
gridstep = 0.5 * (ds / self.vmax)
# Wherever a particle is on a grid, include that grid's gridstep
# in the list of candidate timesteps
for i, _grid in enumerate(self.grids): # noqa: B007
candidates[:, i] = np.where(self.on_grid[:, i] > 0, gridstep[i], np.inf)
# If not, compute a number of possible timesteps
# Compute the cyclotron gyroperiod
Bmag = np.max(np.sqrt(Bx**2 + By**2 + Bz**2)).to(u.T).value
# Compute the gyroperiod
if Bmag == 0:
gyroperiod = np.inf
else:
gyroperiod = 2 * np.pi * self.m / (self.q * np.max(Bmag))
candidates[:, self.num_grids] = gyroperiod / 12
# TODO: introduce a minimum timestep based on electric fields too!
# Enforce limits on dt
candidates = np.clip(candidates, self.dt[0], self.dt[1])
# dt is the min of all the candidates for each particle
# a separate dt is returned for each particle
dt = np.min(candidates, axis=-1)
# dt should never actually be infinite, so replace any infinities
# with the largest gridstep
return np.where(dt == np.inf, np.max(gridstep), dt)
def _coast_to_grid(self) -> None:
r"""
Coasts all particles to the timestep when the first particle should
be entering the grid. Doing in this in one step (rather than pushing
the particles through zero fields) saves computation time.
"""
# Distance from the source to the nearest point on any grid
dist = min(
np.min(np.linalg.norm(arr - self.source, axis=3)) for arr in self.grids_arr
)
# Find speed of each particle towards the grid.
vmax = np.dot(self.v, self.src_n)
# Time for each particle to reach the grid
t = dist / vmax
# Coast the particles to the advanced position
self.x = self.x + self.v * t[:, np.newaxis]
def _coast_to_plane(self, center, hdir, vdir, x=None):
"""
Calculates the positions where the current trajectories of each
particle impact a plane, described by the plane's center and
horizontal and vertical unit vectors.
Returns an [nparticles, 3] array of the particle positions in the plane
By default this function does not alter self.x. The optional keyword
x can be used to pass in an output array that will used to hold
the positions in the plane. This can be used to directly update self.x
as follows:
self._coast_to_plane(self.detector, self.det_hdir, self.det_vdir, x
= self.x)
"""
normal = np.cross(hdir, vdir)
# Calculate the time required to evolve each particle into the
# plane
t = np.inner(center[np.newaxis, :] - self.x, normal) / np.inner(self.v, normal)
# Calculate particle positions in the plane
if x is None:
# If no output array is provided, preallocate
x = np.empty_like(self.x)
x[...] = self.x + self.v * t[:, np.newaxis]
# Check that all points are now in the plane
# (Eq. of a plane is nhat*x + d = 0)
plane_eq = np.dot(x - center, normal)
assert np.allclose(plane_eq, 0, atol=1e-6)
return x
def _remove_deflected_particles(self) -> None:
r"""
Removes any particles that have been deflected away from the detector
plane (eg. those that will never hit the grid).
"""
dist_remaining = np.dot(self.x, self.det_n) + np.linalg.norm(self.detector)
v_towards_det = np.dot(self.v, -self.det_n)
# If particles have not yet reached the detector plane and are moving
# away from it, they will never reach the detector.
# So, we can remove them from the arrays
# Find the indices of all particles that we should keep:
# i.e. those still moving towards the detector.
ind = np.logical_not((v_towards_det < 0) & (dist_remaining > 0)).nonzero()[0]
# Drop the other particles
self.x = self.x[ind, :]
self.v = self.v[ind, :]
self.v_init = self.v_init[ind, :]
self.nparticles_grid = self.x.shape[0]
# Store the number of particles deflected
self.fract_deflected = (self.nparticles - ind.size) / self.nparticles
# Warn the user if a large number of particles are being deflected
if self.fract_deflected > 0.05:
warnings.warn(
f"{100*self.fract_deflected:.1f}% particles have been "
"deflected away from the detector plane. The fields "
"provided may be too high to successfully radiograph "
"with this particle energy.",
RuntimeWarning,
)
def _push(self) -> None:
r"""
Advance particles using an implementation of the time-centered
Boris algorithm.
"""
# Get a list of positions (input for interpolator)
pos = self.x[self.grid_ind, :] * u.m
# Update the list of particles on and off the grid
# shape [nparticles, ngrids]
self.on_grid = np.array([grid.on_grid(pos) for grid in self.grids]).T
# entered_grid is zero at the end if a particle has never
# entered any grid
self.entered_grid += np.sum(self.on_grid, axis=-1)
Ex = np.zeros(self.nparticles_grid) * u.V / u.m
Ey = np.zeros(self.nparticles_grid) * u.V / u.m
Ez = np.zeros(self.nparticles_grid) * u.V / u.m
Bx = np.zeros(self.nparticles_grid) * u.T
By = np.zeros(self.nparticles_grid) * u.T
Bz = np.zeros(self.nparticles_grid) * u.T
for grid in self.grids:
# Estimate the E and B fields for each particle
# Note that this interpolation step is BY FAR the slowest part of the push
# loop. Any speed improvements will have to come from here.
if self.field_weighting == "volume averaged":
_Ex, _Ey, _Ez, _Bx, _By, _Bz = grid.volume_averaged_interpolator(
pos,
"E_x",
"E_y",
"E_z",
"B_x",
"B_y",
"B_z",
persistent=True,
)
elif self.field_weighting == "nearest neighbor":
_Ex, _Ey, _Ez, _Bx, _By, _Bz = grid.nearest_neighbor_interpolator(
pos,
"E_x",
"E_y",
"E_z",
"B_x",
"B_y",
"B_z",
persistent=True,
)
# Interpret any NaN values (points off the grid) as zero
# Do this before adding to the totals, because 0 + nan = nan
_Ex = np.nan_to_num(_Ex, nan=0.0 * u.V / u.m)
_Ey = np.nan_to_num(_Ey, nan=0.0 * u.V / u.m)
_Ez = np.nan_to_num(_Ez, nan=0.0 * u.V / u.m)
_Bx = np.nan_to_num(_Bx, nan=0.0 * u.T)
_By = np.nan_to_num(_By, nan=0.0 * u.T)
_Bz = np.nan_to_num(_Bz, nan=0.0 * u.T)
# Add the values interpolated for this grid to the totals
Ex += _Ex
Ey += _Ey
Ez += _Ez
Bx += _Bx
By += _By
Bz += _Bz
# Create arrays of E and B as required by push algorithm
E = np.array(
[Ex.to(u.V / u.m).value, Ey.to(u.V / u.m).value, Ez.to(u.V / u.m).value]
)
E = np.moveaxis(E, 0, -1)
B = np.array([Bx.to(u.T).value, By.to(u.T).value, Bz.to(u.T).value])
B = np.moveaxis(B, 0, -1)
# Calculate the adaptive timestep from the fields currently experienced
# by the particles
# If user sets dt explicitly, that's handled in _adaptive_dt
dt = self._adaptive_dt(Ex, Ey, Ez, Bx, By, Bz)
# TODO: Test v/c and implement relativistic Boris push when required
# vc = np.max(v)/_c
# If dt is not a scalar, make sure it can be multiplied by an
# [nparticles, 3] shape field array
if dt.size > 1:
dt = dt[:, np.newaxis]
x = self.x[self.grid_ind, :]
v = self.v[self.grid_ind, :]
boris_push(x, v, B, E, self.q, self.m, dt)
self.x[self.grid_ind, :] = x
self.v[self.grid_ind, :] = v
@property
def on_any_grid(self):
"""
Binary array for each particle indicating whether it is currently
on ANY grid.
"""
return np.sum(self.on_grid, axis=-1) > 0
def _stop_condition(self) -> bool:
r"""
The stop condition is that most of the particles have entered the grid
and almost all have now left it.
"""
# Count the number of particles who have entered, which is the
# number of non-zero entries in entered_grid
self.num_entered = np.nonzero(self.entered_grid)[0].size
# How many of the particles have entered the grid
self.fract_entered = np.sum(self.num_entered) / self.nparticles_grid
# Of the particles that have entered the grid, how many are currently
# on the grid?
# if/else avoids dividing by zero
if np.sum(self.num_entered) > 0:
# Normalize to the number that have entered a grid
still_on = np.sum(self.on_any_grid) / np.sum(self.num_entered)
else:
still_on = 0.0
if self.fract_entered <= 0.1 or still_on >= 0.001:
return False
# Warn user if < 10% of the particles ended up on the grid
if self.num_entered < 0.1 * self.nparticles:
warnings.warn(
f"Only {100*self.num_entered/self.nparticles:.2f}% of "
"particles entered the field grid: consider "
"decreasing the max_theta to increase this "
"number.",
RuntimeWarning,
)
return True
[docs]
def run(
self,
dt=None,
field_weighting="volume averaged",
):
r"""
Runs a particle-tracing simulation.
Timesteps are adaptively calculated based on the
local grid resolution of the particles and the electric and magnetic
fields they are experiencing. After all particles
have left the grid, they are advanced to the
detector plane where they can be used to construct a synthetic
diagnostic image.
Parameters
----------
dt : `~astropy.units.Quantity`, optional
An explicitly set timestep in units convertible to seconds.
Setting this optional keyword overrules the adaptive time step
capability and forces the use of this timestep throughout. If a tuple
of timesteps is provided, the adaptive timestep will be clamped
between the first and second values.
field_weighting : str
String that selects the field weighting algorithm used to determine
what fields are felt by the particles. Options are:
* 'nearest neighbor': Particles are assigned the fields on
the grid vertex closest to them.
* 'volume averaged' : The fields experienced by a particle are a
volume-average of the eight grid points surrounding them.
The default is 'volume averaged'.
Returns
-------
None
"""
# Load and validate inputs
field_weightings = ["volume averaged", "nearest neighbor"]
if field_weighting in field_weightings:
self.field_weighting = field_weighting
else:
raise ValueError(
f"{field_weighting} is not a valid option for ",
"field_weighting. Valid choices are",
f"{field_weightings}",
)
# By default, set dt as an infinite range (auto dt with no restrictions)
self.dt = np.array([0.0, np.inf]) * u.s if dt is None else dt
self.dt = (self.dt).to(u.s).value
# Check to make sure particles have already been generated
if not hasattr(self, "x"):
raise ValueError(
"Either the create_particles or load_particles method must be "
"called before running the particle tracing algorithm."
)
# If meshes have been added, apply them now
for mesh in self.mesh_list:
self._apply_wire_mesh(**mesh)
# Store a copy of the initial velocity distribution in memory
# This will be used later to calculate the maximum deflection
self.v_init = np.copy(self.v)
# Calculate the maximum velocity
# Used for determining the grid crossing maximum timestep
self.vmax = np.max(np.linalg.norm(self.v, axis=-1))
# Determine which particles should be tracked
# This array holds the indices of all particles that WILL hit the grid
# Only these particles will actually be pushed through the fields
self.grid_ind = np.where(self.theta < self.max_theta_hit_grid)[0]
self.nparticles_grid = len(self.grid_ind)
self.fract_tracked = self.nparticles_grid / self.nparticles
# Create flags for tracking when particles during the simulation
# on_grid -> zero if the particle is off grid, 1
# shape [nparticles, ngrids]
self.on_grid = np.zeros([self.nparticles_grid, self.num_grids])
# Entered grid -> non-zero if particle EVER entered a grid
self.entered_grid = np.zeros([self.nparticles_grid])
# Generate a null distribution of points (the result in the absence of
# any fields) for statistical comparison
self.x0 = self._coast_to_plane(self.detector, self.det_hdir, self.det_vdir)
# Advance the particles to the near the start of the grid
self._coast_to_grid()
# Initialize a "progress bar" (really more of a meter)
# Setting sys.stdout lets this play nicely with regular print()
pbar = tqdm(
initial=0,
total=self.nparticles_grid + 1,
disable=not self.verbose,
desc="Particles on grid",
unit="particles",
bar_format="{l_bar}{bar}{n:.1e}/{total:.1e} {unit}",
file=sys.stdout,
)
# Push the particles until the stop condition is satisfied
# (no more particles on the simulation grid)
while not self._stop_condition():
n_on_grid = np.sum(self.on_any_grid)
pbar.n = n_on_grid
pbar.last_print_n = n_on_grid
pbar.update()
self._push()
pbar.close()
# Remove particles that will never reach the detector
self._remove_deflected_particles()
# Advance the particles to the image plane
self._coast_to_plane(self.detector, self.det_hdir, self.det_vdir, x=self.x)
# Log a summary of the run
self._log("Run completed")
self._log(f"Fraction of particles tracked: {self.fract_tracked:.1%}")
self._log(
"Fraction of tracked particles that entered the grid: "
f"{self.fract_entered*100:.1f}%"
)
self._log(
"Fraction of tracked particles deflected away from the "
"detector plane: "
f"{self.fract_deflected*100}%"
)
# Simulation has not run, because creating new particles changes the simulation
self._has_run = True
@property
def results_dict(self):
r"""
A dictionary containing the results of the simulation.
.. list-table:: Dictionary keys and descriptions.
:width: 100%
:widths: 1 1 4
:header-rows: 1
* - Key
- Type
- Description
* - ``"source"``
- `~numpy.ndarray`
- The source location vector, in meters.
* - ``"detector"``
- `~numpy.ndarray`
- The detector location vector, in meters.
* - ``"mag"``
- `float`
- The system magnification.
* - ``"nparticles"``
- `int`
- Number of particles in the simulation.
* - ``"max_deflection"``
- `~numpy.ndarray`
- The maximum deflection experienced by a particle in the
simulation, in radians.
* - ``"x"``
- `~numpy.ndarray`, [``nparticles``,]
- The x-coordinate location where each particle hit the
detector plane, in meters.
* - ``"y"``
- `~numpy.ndarray`, [``nparticles``,]
- The y-coordinate location where each particle hit the
detector plane, in meters.
* - ``"v"``
- `~numpy.ndarray`, [``nparticles``, 3]
- The velocity of each particle when it hits the detector
plane, in meters per second. The velocity is in a
coordinate system relative to the detector plane. The
components are [normal, horizontal, vertical] relative
to the detector plane coordinates.
* - ``"x0"``
- `~numpy.ndarray`, [``nparticles``,]
- The x-coordinate location where each particle would have
hit the detector plane if the grid fields were zero, in
meters. Useful for calculating the source profile.
* - ``"y0"``
- `~numpy.ndarray`, [``nparticles``,]
- The y-coordinate location where each particle would have
hit the detector plane if the grid fields were zero, in
meters. Useful for calculating the source profile.
* - ``"v0"``
- `~numpy.ndarray`, [``nparticles``, 3]
- The velocity of each particle when it hit the detector
plan if the grid fields were zero, in meters per second.
The velocity is in a coordinate system relative to the
detector plane. The components are [normal, horizontal,
vertical] relative to the detector plane coordinates.
"""
if not self._has_run:
raise RuntimeError(
"The simulation must be run before a results "
"dictionary can be created."
)
# Determine locations of points in the detector plane using unit
# vectors
xloc = np.dot(self.x - self.detector, self.det_hdir)
yloc = np.dot(self.x - self.detector, self.det_vdir)
x0loc = np.dot(self.x0 - self.detector, self.det_hdir)
y0loc = np.dot(self.x0 - self.detector, self.det_vdir)
# Determine the velocity components of each particle in the
# coordinate frame of the detector, eg. the components of v are
# now [normal, det_hdir, det_vdir]
v = np.zeros(self.v.shape)
v[:, 0] = np.dot(self.v, self.det_n)
v[:, 1] = np.dot(self.v, self.det_hdir)
v[:, 2] = np.dot(self.v, self.det_vdir)
v0 = np.zeros(self.v.shape)
v0[:, 0] = np.dot(self.v_init, self.det_n)
v0[:, 1] = np.dot(self.v_init, self.det_hdir)
v0[:, 2] = np.dot(self.v_init, self.det_vdir)
return {
"source": self.source,
"detector": self.detector,
"mag": self.mag,
"nparticles": self.nparticles,
"max_deflection": self.max_deflection.to(u.rad).value,
"x": xloc,
"y": yloc,
"v": v,
"x0": x0loc,
"y0": y0loc,
"v0": v0,
}
[docs]
def save_results(self, path) -> None:
"""
Save the simulations results :attr:`results_dict` to a `numpy`
``.npz`` file format (see `numpy.lib.format`) using `numpy.savez`.
Parameters
----------
path : `str` or `os.path`
Either the filename (string) or an open file (file-like object)
where the data will be saved. If file is a string or a Path,
the ``.npz`` extension will be appended to the filename if
it is not already there.
Notes
-----
Useful for saving the results from a simulation so they can be
loaded at a later time and passed into
`~plasmapy.diagnostics.charged_particle_radiography.synthetic_radiography.synthetic_radiograph`.
"""
np.savez(path, **self.results_dict)
@property
def max_deflection(self):
"""
The maximum deflection experienced by one of the particles, determined
by comparing their initial and final velocity vectors.
This value can be used to determine the charged particle radiography regime
using the dimensionless number defined by Kugland et al. 2012
Returns
-------
max_deflection : float
The maximum deflection in radians
"""
# Normalize the initial and final velocities
v_norm = self.v / np.linalg.norm(self.v, axis=1, keepdims=True)
v_init_norm = self.v_init / np.linalg.norm(self.v_init, axis=1, keepdims=True)
# Compute the dot product
proj = np.sum(v_norm * v_init_norm, axis=1)
# In case of numerical errors, make sure the output is within the domain of
# arccos
proj = np.where(proj > 1, 1, proj)
max_deflection = np.max(np.arccos(proj))
return max_deflection * u.rad
def _enforce_order(self):
r"""
The `Tracker` methods could give strange results if setup methods
are used again after the simulation has run. This method
raises an error if the simulation has already been run.
"""
if self._has_run:
raise RuntimeError(
"Modifying the `Tracker` object after running the "
"simulation is not supported. Create a new `Tracker` "
"object for a new simulation."
)
# *************************************************************************
# Synthetic diagnostic methods (creating output)
# *************************************************************************
[docs]
def synthetic_radiograph( # noqa: C901
obj, size=None, bins=None, ignore_grid: bool = False, optical_density: bool = False
):
r"""
Calculate a "synthetic radiograph" (particle count histogram in the
image plane).
.. |Tracker| replace:: `~plasmapy.diagnostics.charged_particle_radiography.synthetic_radiography.Tracker`
.. |results_dict| replace:: `~plasmapy.diagnostics.charged_particle_radiography.synthetic_radiography.Tracker.results_dict`
Parameters
----------
obj: `dict` or |Tracker|
Either a |Tracker|
object that has been run, or a dictionary equivalent to
|results_dict|.
size : `~astropy.units.Quantity`, shape ``(2, 2)``, optional
The size of the detector array, specified as the minimum
and maximum values included in both the horizontal and vertical
directions in the detector plane coordinates. Shape is
``((hmin, hmax), (vmin, vmax))``. If not specified, the size will be
set to include all particles on the detector. Units must be convertible
to meters.
bins : array of integers, shape ``(2)``
The number of bins in each direction in the format
``(hbins, vbins)``. The default is ``(200, 200)``.
ignore_grid: `bool`
If `True`, returns the intensity in the image plane in the absence
of simulated fields.
optical_density: `bool`
If `True`, return the optical density rather than the intensity
.. math::
OD = -log_{10}(Intensity/I_0)
where :math:`Intensity` is the simulation intensity on the
detector plane and :math:`I_0` is the intensity on the detector
plane in the absence of simulated fields. Default is `False`.
If the :math:`Intensity` histogram contains zeros, then the
corresponding values in :math:`OD` will be `numpy.inf`. When
plotting :math:`OD` the `~numpy.inf` values can be replaced
using ``numpy.nan_to_num(OD, neginf=0, posinf=0)``.
Returns
-------
hax : `~astropy.units.Quantity` array shape ``(hbins,)``
The horizontal axis of the synthetic radiograph in meters.
vax : `~astropy.units.Quantity` array shape ``(vbins, )``
The vertical axis of the synthetic radiograph in meters.
intensity : `~numpy.ndarray`, shape ``(hbins, vbins)``
The number of particles counted in each bin of the histogram.
"""
# condition `obj` input
if isinstance(obj, Tracker):
# results_dict raises an error if the simulation has not been run.
d = obj.results_dict
elif isinstance(obj, dict):
d = obj
else:
raise TypeError(
f"Expected type dict or {Tracker} for argument `obj`, but "
f"got type {type(obj)}."
)
if bins is None:
bins = [200, 200]
# Note that, at the end of the simulation, all particles were moved
# into the image plane.
# If ignore_grid is True, use the predicted positions in the absence of
# simulated fields
if ignore_grid:
xloc = d["x0"]
yloc = d["y0"]
else:
xloc = d["x"]
yloc = d["y"]
if size is None:
# If a detector size is not given, choose a size based on the
# particle positions
w = np.max([np.max(np.abs(xloc)), np.max(np.abs(yloc))])
size = np.array([[-w, w], [-w, w]]) * u.m
elif not isinstance(size, u.Quantity):
raise TypeError(
"Argument `size` must be an astropy.units.Quantity object with "
"units convertible to meters."
)
elif not size.unit.is_equivalent(u.m):
raise ValueError("Argument `size` must have units convertible to meters.")
elif size.shape != (2, 2):
raise ValueError(
f"Argument `size` must have shape (2, 2), but got {size.shape}."
)
# Generate the histogram
intensity, h, v = np.histogram2d(xloc, yloc, range=size.to(u.m).value, bins=bins)
# h, v are the bin edges: compute the centers to produce arrays
# of the right length (then trim off the extra point)
h = ((h + np.roll(h, -1)) / 2)[:-1]
v = ((v + np.roll(v, -1)) / 2)[:-1]
# Throw a warning if < 50% of the particles are included on the
# histogram
percentage = np.sum(intensity) / d["nparticles"]
if percentage < 0.5:
warnings.warn(
f"Only {percentage:.2%} of the particles are shown "
"on this synthetic radiograph. Consider increasing "
"the size to include more.",
RuntimeWarning,
)
if optical_density:
# Generate the null radiograph
x, y, I0 = synthetic_radiograph(obj, size=size, bins=bins, ignore_grid=True)
# Calculate I0 as the mean of the non-zero values in the null
# histogram. Zeros are just outside of the illuminate area.
I0 = np.mean(I0[I0 != 0])
# Calculate the optical_density
# ignore any errors resulting from zero values in intensity
with np.errstate(divide="ignore"):
intensity = -np.log10(intensity / I0)
return h * u.m, v * u.m, intensity
```