"""
Routines for the analysis of proton radiographs. These routines can be broadly
classified as either creating synthetic radiographs from prescribed fields or
methods of 'inverting' experimentally created radiographs to reconstruct the
original fields (under some set of assumptions).
"""
__all__ = ["Tracker", "synthetic_radiograph"]
import warnings
from collections.abc import Iterable
from pathlib import Path
from typing import Literal
import astropy.constants as const
import astropy.units as u
import h5py
import numpy as np
from plasmapy import particles
from plasmapy.formulary.mathematics import rot_a_to_b
from plasmapy.particles import Particle
from plasmapy.plasma.grids import AbstractGrid
from plasmapy.simulation.particle_tracker.particle_tracker import ParticleTracker
from plasmapy.simulation.particle_tracker.save_routines import SaveOnceOnCompletion
from plasmapy.simulation.particle_tracker.termination_conditions import (
AllParticlesOffGridTerminationCondition,
)
def _coerce_to_cartesian_si(pos):
"""
Takes a tuple of `astropy.unit.Quantity` values representing a position
in space in either Cartesian, cylindrical, or spherical coordinates, and
returns a numpy array representing the same point in Cartesian
coordinates and units of meters.
"""
# Auto-detect geometry based on units
geo_units = [x.unit for x in pos]
if geo_units[2].is_equivalent(u.rad):
geometry = "spherical"
elif geo_units[1].is_equivalent(u.rad):
geometry = "cylindrical"
else:
geometry = "cartesian"
# Convert geometrical inputs between coordinates systems
pos_out = np.zeros(3)
if geometry == "cartesian":
x, y, z = pos
pos_out[0] = x.to(u.m).value
pos_out[1] = y.to(u.m).value
pos_out[2] = z.to(u.m).value
elif geometry == "cylindrical":
r, t, z = pos
r = r.to(u.m)
t = t.to(u.rad).value
z = z.to(u.m)
pos_out[0] = (r * np.cos(t)).to(u.m).value
pos_out[1] = (r * np.sin(t)).to(u.m).value
pos_out[2] = z.to(u.m).value
elif geometry == "spherical":
r, t, p = pos
r = r.to(u.m)
t = t.to(u.rad).value
p = p.to(u.rad).value
pos_out[0] = (r * np.sin(t) * np.cos(p)).to(u.m).value
pos_out[1] = (r * np.sin(t) * np.sin(p)).to(u.m).value
pos_out[2] = (r * np.cos(t)).to(u.m).value
return pos_out
class _SyntheticRadiographySaveRoutine(SaveOnceOnCompletion):
def __init__(
self, output_directory: Path | None = None, output_basename: str = "output"
) -> None:
super().__init__(
output_directory=output_directory, output_basename=output_basename
)
self._quantities = {
"source": (u.m, "attribute"),
"detector": (u.m, "attribute"),
"mag": (u.m, "attribute"),
"max_deflection": (None, "attribute"),
"nparticles": (None, "attribute"),
"x": (u.m, "dataset"),
"y": (u.m, "dataset"),
"v": (u.m / u.s, "dataset"),
"x0": (u.m, "dataset"),
"y0": (u.m, "dataset"),
"v0": (u.m, "dataset"),
}
def save(self) -> None:
result_dictionary = self._particle_tracker.results_dict
if self.output_directory is None:
return
output_file_path = self.output_directory / Path(f"{self.output_basename}.h5")
with h5py.File(output_file_path, "w") as output_file:
for key, (_units, data_type) in self._quantities.items():
match data_type:
case "attribute":
output_file.attrs.create(key, result_dictionary[key])
case "dataset":
output_file.create_dataset(key, data=result_dictionary[key])
[docs]
class Tracker(ParticleTracker):
r"""
Represents a charged particle radiography experiment with simulated or
calculated E and B fields given at positions defined by a grid of spatial
coordinates. The particle source and detector plane are defined by vectors
from the origin of the grid.
Parameters
----------
grids : `~plasmapy.plasma.grids.AbstractGrid` or subclass thereof, or list
of same.
A Grid object or list of grid objects containing the required
quantities [E_x, E_y, E_z, B_x, B_y, B_z].
If any of these quantities are missing, a warning will be given and that
quantity will be assumed to be zero everywhere.
source : `~astropy.units.Quantity`, shape (3)
A vector pointing from the origin of the grid to the location
of the particle source. This vector will be interpreted as
being in either Cartesian, cylindrical, or spherical coordinates
based on its units. Valid geometries are:
* Cartesian (x,y,z) : (meters, meters, meters)
* Cylindrical (r, theta, z) : (meters, radians, meters)
* Spherical (r, theta, phi) : (meters, radians, radians)
In spherical coordinates theta is the polar angle.
detector : `~astropy.units.Quantity`, shape (3)
A vector pointing from the origin of the grid to the center
of the detector plane. The vector from the source point to this
point defines the normal vector of the detector plane. This vector
can also be specified in Cartesian, cylindrical, or spherical
coordinates (see the ``source`` keyword).
dt : `~astropy.units.Quantity`, optional
An explicitly set time step in units convertible to seconds.
Setting this optional keyword overrules the adaptive time step
capability and forces the use of this time step throughout.
dt_range : tuple of shape (2,) of `~astropy.units.Quantity`, optional
If specified, the calculated adaptive time step will be clamped
between the first and second values.
field_weighting : str, default: ``"volume averaged"``
String that selects the field weighting algorithm used to determine
what fields are felt by the particles. Options are:
* ``"nearest neighbor"``:
Particles are assigned the fields on the grid vertex closest to
them.
* ``"volume averaged"``:
The fields experienced by a particle are a volume-average of the
eight grid points surrounding them.
detector_hdir : `numpy.ndarray`, shape (3), optional
A unit vector (in Cartesian coordinates) defining the horizontal
direction on the detector plane. By default, the horizontal axis in the
detector plane is defined to be perpendicular to both the
source-to-detector vector and the z-axis (unless the source-to-detector
axis is parallel to the z axis, in which case the horizontal axis is
the x-axis).
The detector vertical axis is then defined
to be orthogonal to both the source-to-detector vector and the
detector horizontal axis.
output_directory : `~pathlib.Path`, optional
Directory for objects that are saved to disk. If a directory is not
specified then a memory save routine is used.
output_basename : `str`, optional
Optional base name for output files.
fraction_exited_threshold : float, optional
The fraction of particles that must leave the grids to terminate the
simulation. This does not include particles that have never entered
the grids. By default, this is set to ``0.999`` (corresponding to 99.9%).
verbose : bool, optional
If `True`, updates on the status of the program will be printed
into the standard output while running.
"""
def __init__(
self,
grids: AbstractGrid | Iterable[AbstractGrid],
source: u.Quantity[u.m],
detector: u.Quantity[u.m],
dt=None,
dt_range=None,
field_weighting: Literal[
"volume averaged", "nearest neighbor"
] = "volume averaged",
detector_hdir=None,
output_directory: Path | None = None,
output_basename: str = "output",
fraction_exited_threshold: float = 0.999,
verbose: bool = True,
) -> None:
# The synthetic radiography class handles logging, so we can disable logging for the particle tracker
# The particle tracker class ensures that the provided grid argument has the proper type and
# that the necessary grid quantities are created if they are not already specified
save_routine = (
_SyntheticRadiographySaveRoutine(output_directory, output_basename)
if output_directory is not None
else None
)
termination_condition = AllParticlesOffGridTerminationCondition(
fraction_exited_threshold=fraction_exited_threshold
)
super().__init__(
grids,
termination_condition,
save_routine,
dt=dt,
dt_range=dt_range,
field_weighting=field_weighting,
verbose=False,
)
self.verbose = verbose
# A list of wire meshes added to the grid with add_wire_mesh
# Particles that would hit these meshes will be removed at runtime
# by _apply_wire_mesh
self.mesh_list = []
# ************************************************************************
# Setup the source and detector geometries
# ************************************************************************
self.source = _coerce_to_cartesian_si(source)
self.detector = _coerce_to_cartesian_si(detector)
self._log(f"Source: {self.source} m")
self._log(f"Detector: {self.detector} m")
# Calculate normal vectors (facing towards the grid origin) for both
# the source and detector planes
self.src_n = -self.source / np.linalg.norm(self.source)
self.det_n = -self.detector / np.linalg.norm(self.detector)
# Vector directly from source to detector
self.src_det = self.detector - self.source
# Magnification
self.mag = 1 + (np.linalg.norm(self.detector) / np.linalg.norm(self.source))
self._log(f"Magnification: {self.mag}")
# Check that source-detector vector actually passes through the grid
test = [
grid.vector_intersects(self.source * u.m, self.detector * u.m)
for grid in self.grids
]
if not any(test):
raise ValueError(
"The vector between the source and the detector "
"does not intersect the grid provided!"
)
# Determine the angle above which particles will not hit the grid
# these particles can be ignored until the end of the simulation,
# then immediately advanced to the detector grid with their original
# velocities
self.max_theta_hit_grid = self._max_theta_hit_grid()
# *********************************************************************
# Define the detector plane
# *********************************************************************
# Load or calculate the detector hdir
if detector_hdir is not None:
self.det_hdir = detector_hdir / np.linalg.norm(detector_hdir)
else:
self.det_hdir = self._default_detector_hdir()
# Calculate the detector vdir
ny = np.cross(self.det_hdir, self.det_n)
self.det_vdir = -ny / np.linalg.norm(ny)
def _default_detector_hdir(self):
"""
Calculates the default horizontal unit vector for the detector plane
(see __init__ description for details).
"""
# Create unit vectors that define the detector plane
# Define plane horizontal axis
if np.allclose(np.abs(self.det_n), np.array([0, 0, 1])):
nx = np.array([1, 0, 0])
else:
nx = np.cross(np.array([0, 0, 1]), self.det_n)
return nx / np.linalg.norm(nx)
def _max_theta_hit_grid(self):
r"""
Using the grid and the source position, compute the maximum particle
theta that will impact the grid. This value can be used to determine
which particles are worth tracking.
"""
theta = np.zeros([8, self.num_grids])
for i, _grid in enumerate(self.grids):
ind = 0
for x in (0, -1):
for y in (0, -1):
for z in (0, -1):
# Source to grid corner vector
vec = self.grids_arr[i][x, y, z, :] - self.source
# Calculate angle between vec and the source-to-detector
# axis, which is the central axis of the particle beam
theta[ind, i] = np.arccos(
np.dot(vec, self.src_det)
/ np.linalg.norm(vec)
/ np.linalg.norm(self.src_det)
)
ind += 1
return np.max(theta)
def _log(self, msg) -> None:
if self.verbose:
# We'll need to switch from print() to using logging library
print(msg) # noqa: T201
# Define some constants so they don't get constantly re-evaluated
_c = const.c.si.value
# *************************************************************************
# Create mesh
# *************************************************************************
[docs]
def add_wire_mesh(
self, location, extent, nwires, wire_diameter, mesh_hdir=None, mesh_vdir=None
):
"""
Add a wire mesh grid between the particle source and the object grid
that blocks particles whose paths intersect the wires.
Parameters
----------
location : `~astropy.units.Quantity`, shape (3)
A vector pointing from the origin of the grid to the center of the
mesh grid. This location must be between the source and the
object grid.
This vector will be interpreted as
being in either cartesian, cylindrical, or spherical coordinates
based on its units. Valid geometries are:
* Cartesian (x,y,z) : (meters, meters, meters)
* Cylindrical (r, theta, z) : (meters, radians, meters)
* Spherical (r, theta, phi) : (meters, radians, radians)
In spherical coordinates theta is the polar angle.
extent : Tuple of 1 or 2 `~astropy.units.Quantity`
The size of the mesh grid (in the mesh plane). If one value
is provided, the mesh is circular and the value provided is
interpreted as the diameter. If two values are provided, the
mesh is rectangular and the values are interpreted as the
width and height respectively.
nwires : Tuple of 1 or 2 ints, or a single int
The number of wires in the horizontal and vertical directions. If
only one value is provided, the number in the two directions is
assumed to be equal. Note that a wire will cross the center of the
mesh only when nwires is odd.
wire_diameter : `~astropy.units.Quantity`
The diameter of the wires.
mesh_hdir : `numpy.ndarray`, shape (3), optional
A unit vector (in Cartesian coordinates) defining the horizontal
direction on the mesh plane. Modifying this vector can rotate the
mesh in the plane or tilt the mesh plane relative to the
source-detector axis. By default, ``mesh_hdir`` is set equal to
``detector_hdir`` (see ``detector_hdir`` keyword in ``__init__``).
mesh_vdir : `numpy.ndarray`, shape (3), optional
A unit vector (in Cartesian coordinates) defining the vertical
direction on the mesh plane. Modifying this vector can tilt the
mesh relative to the source-detector axis. By default, ``mesh_vdir``
is defined to be perpendicular to ``mesh_hdir`` and the detector
plane normal (such that the mesh is parallel to the detector plane).
Raises
------
ValueError
Raises a ValueError if the provided mesh location is not
between the source and the object grid.
"""
# Raise an error if the run method has already been called.
self._enforce_order()
location = _coerce_to_cartesian_si(location)
wire_radius = wire_diameter.si.value / 2
if not isinstance(extent, tuple):
extent = (extent,)
if len(extent) == 1:
radius = 0.5 * extent[0].si.value
width = extent[0].si.value
height = extent[0].si.value
elif len(extent) == 2:
radius = None
width = extent[0].si.value
height = extent[1].si.value
else:
raise ValueError(
"extent must be a tuple of 1 or 2 elements, but "
f"{len(extent)} elements were provided."
)
if not isinstance(nwires, tuple):
nwires = (nwires,)
if len(nwires) != 2:
nwires = (nwires[0], nwires[0])
# If no hdir/vdir is specified, calculate a default value
# If one is specified, make sure it is normalized
if mesh_hdir is None:
# Re-calculate the default here, in case the user
# specified a different det_hdir
mesh_hdir = self._default_detector_hdir()
else:
mesh_hdir = mesh_hdir / np.linalg.norm(mesh_hdir)
if mesh_vdir is None:
mesh_vdir = np.cross(mesh_hdir, self.det_n)
mesh_vdir = -mesh_vdir / np.linalg.norm(mesh_vdir)
else:
mesh_vdir = mesh_vdir / np.linalg.norm(mesh_vdir)
# Raise exception if mesh is AFTER the field grid
if np.linalg.norm(location - self.source) > np.linalg.norm(self.source):
raise ValueError(
f"The specified mesh location, {location},"
"is not between the source and the origin."
)
mesh_entry = {
"location": location,
"wire_radius": wire_radius,
"radius": radius,
"width": width,
"height": height,
"nwires": nwires,
"mesh_hdir": mesh_hdir,
"mesh_vdir": mesh_vdir,
}
self.mesh_list.append(mesh_entry)
def _apply_wire_mesh(
self,
location=None,
wire_radius=None,
radius=None,
width=None,
height=None,
nwires=None,
mesh_hdir=None,
mesh_vdir=None,
):
"""
Apply wire meshes that were added to ``self.mesh_list``.
"""
x = self._coast_to_plane(location, mesh_hdir, mesh_vdir)
# Particle positions in 2D on the mesh plane
xloc = np.dot(x - location, mesh_hdir)
yloc = np.dot(x - location, mesh_vdir)
# Create an array in which True indicates that a particle has hit
# a wire and False indicates that it has not
hit = np.zeros(self.nparticles, dtype=bool)
# Mark particles that overlap vertical or horizontal position with
# a wire
h_centers = np.linspace(-width / 2, width / 2, num=nwires[0])
for c in h_centers:
hit |= np.isclose(xloc, c, atol=wire_radius)
v_centers = np.linspace(-height / 2, height / 2, num=nwires[1])
for c in v_centers:
hit |= np.isclose(yloc, c, atol=wire_radius)
# Put back any particles that are outside the mesh boundaries
# First handle the case where the mesh is rectangular
if radius is None:
# Replace particles outside the x-boundary
hit[
np.logical_or(
xloc > np.max(h_centers) + wire_radius,
xloc < np.min(h_centers) - wire_radius,
)
] = False
# Replace particles outside the y-boundary
hit[
np.logical_or(
yloc > np.max(v_centers) + wire_radius,
yloc < np.min(v_centers) - wire_radius,
)
] = False
# Handle the case where the mesh is circular
else:
loc_rad = np.sqrt(xloc**2 + yloc**2)
hit[loc_rad > radius] = False
# In the case of a circular mesh, also create a round wire along
# the outside edge
hit[np.isclose(loc_rad, radius, atol=wire_radius)] = True
# Identify the particles that have hit something, then remove them from
# all of the arrays
keep_these_particles = ~hit
number_kept_particles = keep_these_particles.sum()
nremoved = self.nparticles - number_kept_particles
if self.nparticles - nremoved <= 0:
raise ValueError(
"The specified mesh is blocking all of the particles. "
f"The wire diameter ({2*wire_radius}) may be too large."
)
self._stop_particles(~keep_these_particles)
# *************************************************************************
# Particle creation methods
# *************************************************************************
@staticmethod
def _angles_monte_carlo(nparticles, max_theta, random_seed=None):
"""
Generates angles for each particle randomly such that the flux
per solid angle is uniform.
"""
# Create a probability vector for the theta distribution
# Theta must follow a sine distribution in order for the particle
# flux per solid angle to be uniform.
arg = np.linspace(0, max_theta, num=int(1e5))
prob = np.sin(arg)
prob *= 1 / np.sum(prob)
# Create a numpy random number generator instance
rng = np.random.default_rng(seed=random_seed)
# Randomly choose theta's weighted with the sine probabilities
theta = rng.choice(arg, size=nparticles, replace=True, p=prob)
# Also generate a uniform phi distribution
phi = rng.uniform(high=2 * np.pi, size=nparticles)
return theta, phi
@staticmethod
def _angles_uniform(nparticles, max_theta):
"""
Generates angles for each particle such that their velocities are
uniformly distributed on a grid in theta and phi. This method
requires that `nparticles` be a perfect square. If it is not,
`nparticles` will be set as the largest perfect square smaller
than the provided `nparticles`.
"""
# Calculate the approximate square root
n_per = np.floor(np.sqrt(nparticles)).astype(np.int32)
# Create an imaginary grid positioned 1 unit from the source
# and spanning max_theta at the corners
extent = np.sin(max_theta) / np.sqrt(2)
arr = np.linspace(-extent, extent, num=n_per)
harr, varr = np.meshgrid(arr, arr, indexing="ij")
# calculate the angles from the source for each point in
# the grid.
theta = np.arctan(np.sqrt(harr**2 + varr**2))
phi = np.arctan2(varr, harr)
return theta.flatten(), phi.flatten()
[docs]
@particles.particle_input
def create_particles(
self,
nparticles,
particle_energy,
max_theta=None,
particle: Particle = Particle("p+"), # noqa: B008
distribution: Literal["monte-carlo", "uniform"] = "monte-carlo",
random_seed=None,
) -> None:
r"""
Generates the angular distributions about the Z-axis, then
rotates those distributions to align with the source-to-detector axis.
By default, particles are generated over almost the entire pi/2.
However, if the detector is far from the source, many of these
particles will never be observed. The max_theta keyword allows these
extraneous particles to be neglected to focus computational resources
on the particles who will actually hit the detector.
Parameters
----------
nparticles : integer
The number of particles to include in the simulation. The default
is 1e5.
particle_energy : `~astropy.units.Quantity`
The energy of the particle, in units convertible to eV.
All particles are given the same energy.
max_theta : `~astropy.units.Quantity`, optional
The largest velocity vector angle (measured from the
source-to-detector axis) for which particles should be generated.
Decreasing this angle can eliminate particles that would never
reach the detector region of interest. If no value is given, a
guess will be made based on the size of the grid.
Units must be convertible to radians.
particle : ~plasmapy.particles.particle_class.Particle or string representation of same, optional
Representation of the particle species as either a |Particle| object
or a string representation. The default particle is protons.
distribution: str
A keyword which determines how particles will be distributed
in velocity space. Options are:
- 'monte-carlo': velocities will be chosen randomly,
such that the flux per solid angle is uniform.
- 'uniform': velocities will be distributed such that,
left unperturbed,they will form a uniform pattern
on the detection plane. This method
requires that ``nparticles`` be a perfect square. If it is not,
``nparticles`` will be set as the largest perfect square smaller
than the provided ``nparticles``.
Simulations run in the ``'uniform'`` mode will imprint a grid pattern
on the image, but will well-sample the field grid with a
smaller number of particles. The default is ``'monte-carlo'``.
random_seed : int, optional
A random seed to be used when generating random particle
distributions, e.g. with the ``monte-carlo`` distribution.
"""
self._log("Creating Particles")
# Raise an error if the run method has already been called.
self._enforce_order()
# Load inputs
nparticles = int(nparticles)
particle_energy = particle_energy.to(u.eV).value
m = particle.mass.to(u.kg).value
# If max_theta is not specified, make a guess based on the grid size
if max_theta is None:
max_theta = np.clip(1.5 * self.max_theta_hit_grid, 0.01, 0.99 * np.pi / 2)
else:
max_theta = max_theta.to(u.rad).value
# Calculate the velocity corresponding to the particle energy
ER = particle_energy * 1.6e-19 / (m * self._c**2)
v0 = self._c * np.sqrt(1 - 1 / (ER + 1) ** 2)
if distribution == "monte-carlo":
theta, phi = self._angles_monte_carlo(
nparticles, max_theta, random_seed=random_seed
)
elif distribution == "uniform":
theta, phi = self._angles_uniform(nparticles, max_theta)
# Adjust nparticles to reflex what the distribution function returned.
# Some distributions will modify the number of particles to meet the
# necessary criteria of the distribution.
nparticles = theta.shape[0] # TODO: make sure this works
# Construct the velocity distribution around the z-axis
v = np.zeros([nparticles, 3])
v[:, 0] = v0 * np.sin(theta) * np.cos(phi)
v[:, 1] = v0 * np.sin(theta) * np.sin(phi)
v[:, 2] = v0 * np.cos(theta)
# Calculate the rotation matrix that rotates the z-axis
# onto the source-detector axis
a = np.array([0, 0, 1])
b = self.detector - self.source
rot = rot_a_to_b(a, b)
# Apply rotation matrix to calculated velocity distribution
v = np.matmul(v, rot)
# Place particles at the source
x = np.tile(self.source, (nparticles, 1))
# Call the underlying load method to ensure consistency with
# other properties within the ParticleTracker
self.load_particles(x * u.m, v * u.m / u.s, particle=particle)
[docs]
@particles.particle_input
def load_particles(
self,
x,
v,
particle: Particle = Particle("p+"), # noqa: B008
) -> None:
r"""
Load arrays of particle positions and velocities.
Parameters
----------
x : `~astropy.units.Quantity`, shape (N,3)
Positions for N particles
v: `~astropy.units.Quantity`, shape (N,3)
Velocities for N particles
particle : |particle-like|, optional
Representation of the particle species as either a |Particle| object
or a string representation. The default particle is protons.
"""
# Load particles for particle tracker class
super().load_particles(x, v, particle)
# But also calculate geometry-dependent variables
self.theta = np.arccos(
np.inner(self.v, self.src_n) / np.linalg.norm(self.v, axis=-1)
)
n_wrong_way = np.sum(np.where(self.theta > np.pi / 2, 1, 0))
if n_wrong_way > 1:
warnings.warn(
f"{100*n_wrong_way/self.nparticles:.2f}% of particles "
"initialized are heading away from the grid. Check the "
" orientation of the provided velocity vectors.",
RuntimeWarning,
)
# *************************************************************************
# Run/push loop methods
# *************************************************************************
def _coast_to_grid(self) -> None:
r"""
Coasts all particles to the timestep when the first particle should
be entering the grid. Doing in this in one step (rather than pushing
the particles through zero fields) saves computation time.
"""
# Distance from the source to the nearest point on any grid
dist = min(
np.min(np.linalg.norm(arr - self.source, axis=3)) for arr in self.grids_arr
)
tracked_mask = self._tracked_particle_mask
# Find speed of each particle towards the grid.
vmax = np.dot(self.v[tracked_mask], self.src_n)
# Time for each particle to reach the grid
t = dist / vmax
# Coast the particles to the advanced position
self.x[tracked_mask] = (
self.x[tracked_mask] + self.v[tracked_mask] * t[:, np.newaxis]
)
def _coast_to_plane(self, center, hdir, vdir, x=None, mask=None):
"""
Calculates the positions where the current trajectories of each
particle impact a plane, described by the plane's center and
horizontal and vertical unit vectors.
Returns an [nparticles, 3] array of the particle positions in the plane
By default this function does not alter self.x. The optional keyword
x can be used to pass in an output array that will used to hold
the positions in the plane. This can be used to directly update self.x
as follows:
self._coast_to_plane(self.detector, self.det_hdir, self.det_vdir, x
= self.x)
Parameters
----------
center : float
The center of the plane towards which the particles are advanced
hdir : `numpy.ndarray`, shape (3)
A unit vector (in Cartesian coordinates) defining the horizontal
direction of the plane.
vdir : `numpy.ndarray`, shape (3)
A unit vector (in Cartesian coordinates) defining the vertical
direction of the plane.
x : `numpy.ndarray`, shape (nparticles), optional
The array to which the resulting particle positions are stored.
By default, the current position array will be used.
mask : `numpy.ndarray`, shape (nparticles), optional
A boolean mask representing the particles to perform the coasting
operation. By default, only the tracked particles (i.e. those that
are going to hit the grids) will be coasted.
"""
normal = np.cross(hdir, vdir)
if mask is None:
mask = self._tracked_particle_mask
# Calculate the time required to evolve each particle into the
# plane
t = np.inner(center[np.newaxis, :] - self.x[mask], normal) / np.inner(
self.v[mask], normal
)
# Calculate particle positions in the plane
if x is None:
# If no output array is provided, preallocate
x = np.copy(self.x)
x[mask] = self.x[mask] + self.v[mask] * t[:, np.newaxis]
# Check that all points are now in the plane
# (Eq. of a plane is nhat*x + d = 0)
plane_eq = np.dot(x - center, normal)
assert np.allclose(plane_eq[mask], 0, atol=1e-6)
return x
def _remove_deflected_particles(self) -> None:
r"""
Removes any particles that have been deflected away from the detector
plane (eg. those that will never hit the grid).
"""
dist_remaining = np.dot(self.x, self.det_n) + np.linalg.norm(self.detector)
v_towards_det = np.dot(self.v, -self.det_n)
# If particles have not yet reached the detector plane and are moving
# away from it, they will never reach the detector.
# So, we can remove them from the arrays
# Find the indices of all particles that we should keep:
# i.e. those still moving towards the detector.
ind = np.logical_not((v_towards_det < 0) & (dist_remaining > 0)).nonzero()[0]
# Drop the other particles
self._remove_particles((v_towards_det < 0) & (dist_remaining > 0))
# Store the number of particles deflected
self.fract_deflected = (self.nparticles - ind.size) / self.nparticles
# Warn the user if a large number of particles are being deflected
if self.fract_deflected > 0.05:
warnings.warn(
f"{100*self.fract_deflected:.1f}% particles have been "
"deflected away from the detector plane. The fields "
"provided may be too high to successfully radiograph "
"with this particle energy.",
RuntimeWarning,
)
[docs]
def run(self) -> None:
r"""
Runs a particle-tracing simulation.
Timesteps are adaptively calculated based on the
local grid resolution of the particles and the electric and magnetic
fields they are experiencing. After all particles
have left the grid, they are advanced to the
detector plane where they can be used to construct a synthetic
diagnostic image.
Returns
-------
None
"""
self._enforce_particle_creation()
# If meshes have been added, apply them now
for mesh in self.mesh_list:
self._apply_wire_mesh(**mesh)
# Store a copy of the initial velocity distribution in memory
# This will be used later to calculate the maximum deflection
self.v_init = np.copy(self.v)
# These particles will not be pushed through the fields
# but instead will be automatically advanced
# to the detector plane
theta_mask = self.theta >= self.max_theta_hit_grid
# Take the intersection of the theta mask and the tracked particle mask
# This must be done because some particles could have already been stopped
# by _apply_wire_mesh
self.x = self._coast_to_plane(
self.detector,
self.det_hdir,
self.det_vdir,
mask=theta_mask & self._tracked_particle_mask,
)
self._stop_particles(theta_mask)
self.fract_tracked = self.nparticles_tracked / self.nparticles
# Generate a null distribution of points (the result in the absence of
# any fields) for statistical comparison
self.x0 = self._coast_to_plane(self.detector, self.det_hdir, self.det_vdir)
# Advance the tracked particles to the near the start of the grid
self._coast_to_grid()
self.coasted_particles = np.copy(self.x)
super().run()
if self.num_entered < 0.1 * self.nparticles:
warnings.warn(
f"Only {100 * self.num_entered / self.nparticles:.2f}% of "
"particles entered the field grid: consider "
"decreasing the max_theta to increase this "
"number.",
RuntimeWarning,
)
# Remove particles that will never reach the detector
self._remove_deflected_particles()
# Advance the particles to the image plane
self._coast_to_plane(self.detector, self.det_hdir, self.det_vdir, x=self.x)
self.save_routine.save()
# Log a summary of the run
self._log("Run completed")
self._log(f"Fraction of particles tracked: {self.fract_tracked:.1%}")
self._log(
"Fraction of tracked particles that entered the grid: "
f"{self.fract_entered*100:.1f}%"
)
self._log(
"Fraction of tracked particles deflected away from the "
"detector plane: "
f"{self.fract_deflected*100}%"
)
@property
def max_deflection(self):
"""
The maximum deflection experienced by one of the particles, determined
by comparing their initial and final velocity vectors.
This value can be used to determine the charged particle radiography regime
using the dimensionless number defined by Kugland et al. 2012
Returns
-------
max_deflection : float
The maximum deflection in radians.
"""
# Normalize the initial and final velocities
v_norm = self.v / np.linalg.norm(self.v, axis=1, keepdims=True)
v_init_norm = self.v_init / np.linalg.norm(self.v_init, axis=1, keepdims=True)
# Compute the dot product
proj = np.sum(v_norm * v_init_norm, axis=1)
# In case of numerical errors, make sure the output is within the domain of
# arccos
proj = np.where(proj > 1, 1, proj)
max_deflection = np.nanmax(np.arccos(proj))
return max_deflection * u.rad
@property
def results_dict(self):
r"""
A dictionary containing the results of the simulation.
.. list-table:: Dictionary keys and descriptions.
:width: 100%
:widths: 1 1 4
:header-rows: 1
* - Key
- Type
- Description
* - ``"source"``
- `~numpy.ndarray`
- The source location vector, in meters.
* - ``"detector"``
- `~numpy.ndarray`
- The detector location vector, in meters.
* - ``"mag"``
- `float`
- The system magnification.
* - ``"nparticles"``
- `int`
- Number of particles in the simulation.
* - ``"max_deflection"``
- `~numpy.ndarray`
- The maximum deflection experienced by a particle in the
simulation, in radians.
* - ``"x"``
- `~numpy.ndarray`, [``nparticles``,]
- The x-coordinate location where each particle hit the
detector plane, in meters.
* - ``"y"``
- `~numpy.ndarray`, [``nparticles``,]
- The y-coordinate location where each particle hit the
detector plane, in meters.
* - ``"v"``
- `~numpy.ndarray`, [``nparticles``, 3]
- The velocity of each particle when it hits the detector
plane, in meters per second. The velocity is in a
coordinate system relative to the detector plane. The
components are [normal, horizontal, vertical] relative
to the detector plane coordinates.
* - ``"x0"``
- `~numpy.ndarray`, [``nparticles``,]
- The x-coordinate location where each particle would have
hit the detector plane if the grid fields were zero, in
meters. Useful for calculating the source profile.
* - ``"y0"``
- `~numpy.ndarray`, [``nparticles``,]
- The y-coordinate location where each particle would have
hit the detector plane if the grid fields were zero, in
meters. Useful for calculating the source profile.
* - ``"v0"``
- `~numpy.ndarray`, [``nparticles``, 3]
- The velocity of each particle when it hit the detector
plan if the grid fields were zero, in meters per second.
The velocity is in a coordinate system relative to the
detector plane. The components are [normal, horizontal,
vertical] relative to the detector plane coordinates.
"""
if not self._has_run:
raise RuntimeError(
"The simulation must be run before a results "
"dictionary can be created."
)
# Determine locations of points in the detector plane using unit
# vectors
xloc = np.dot(self.x - self.detector, self.det_hdir)
yloc = np.dot(self.x - self.detector, self.det_vdir)
x0loc = np.dot(self.x0 - self.detector, self.det_hdir)
y0loc = np.dot(self.x0 - self.detector, self.det_vdir)
# Determine the velocity components of each particle in the
# coordinate frame of the detector, eg. the components of v are
# now [normal, det_hdir, det_vdir]
v = np.zeros(self.v.shape)
v[:, 0] = np.dot(self.v, self.det_n)
v[:, 1] = np.dot(self.v, self.det_hdir)
v[:, 2] = np.dot(self.v, self.det_vdir)
v0 = np.zeros(self.v.shape)
v0[:, 0] = np.dot(self.v_init, self.det_n)
v0[:, 1] = np.dot(self.v_init, self.det_hdir)
v0[:, 2] = np.dot(self.v_init, self.det_vdir)
return {
"source": self.source,
"detector": self.detector,
"mag": self.mag,
"nparticles": self.nparticles,
"max_deflection": self.max_deflection.to(u.rad).value,
"x": xloc,
"y": yloc,
"v": v,
"x0": x0loc,
"y0": y0loc,
"v0": v0,
}
# *************************************************************************
# Synthetic diagnostic methods (creating output)
# *************************************************************************
[docs]
def synthetic_radiograph(obj, size=None, bins=None, ignore_grid: bool = False): # noqa: C901, PLR0912
r"""
Calculate a "synthetic radiograph" (particle count histogram in the
image plane).
.. |Tracker| replace:: `~plasmapy.diagnostics.charged_particle_radiography.synthetic_radiography.Tracker`
.. |results_dict| replace:: `~plasmapy.diagnostics.charged_particle_radiography.synthetic_radiography.Tracker.results_dict`
Parameters
----------
obj: `dict` or `~pathlib.Path` or |Tracker|
Either a |Tracker|
object that has been run, a dictionary equivalent to
|results_dict|, or path to a saved output file
from a |Tracker| object (HDF5 file).
size : `~astropy.units.Quantity`, shape ``(2, 2)``, optional
The size of the detector array, specified as the minimum
and maximum values included in both the horizontal and vertical
directions in the detector plane coordinates. Shape is
``((hmin, hmax), (vmin, vmax))``. If not specified, the size will be
set to include all particles on the detector. Units must be convertible
to meters.
bins : array of integers, shape ``(2)``
The number of bins in each direction in the format
``(hbins, vbins)``. The default is ``(200, 200)``.
ignore_grid: `bool`
If `True`, returns the intensity in the image plane in the absence
of simulated fields.
Returns
-------
hax : `~astropy.units.Quantity` array shape ``(hbins,)``
The horizontal axis of the synthetic radiograph in meters.
vax : `~astropy.units.Quantity` array shape ``(vbins, )``
The vertical axis of the synthetic radiograph in meters.
intensity : `~numpy.ndarray`, shape ``(hbins, vbins)``
The number of particles counted in each bin of the histogram.
Notes
-----
This function ignores any particles that are stopped or removed before
reaching the detector plane.
"""
# condition `obj` input
if isinstance(obj, Tracker):
# results_dict raises an error if the simulation has not been run.
results_dict = obj.results_dict
elif isinstance(obj, dict):
results_dict = obj
elif isinstance(obj, str | Path):
results_dict = {}
obj = Path(obj)
# Create a dictionary of all of the datasets and attributes in the save file
# Equivalent to |results_dict|
with h5py.File(obj, "r") as f:
for key in f:
results_dict[key] = f[key][...]
for key in f.attrs:
results_dict[key] = f.attrs[key][...]
else:
raise TypeError(
f"Expected type `Path`, `dict` or {Tracker} for argument `obj`, but "
f"got type {type(obj)}."
)
if bins is None:
bins = [200, 200]
# Note that, at the end of the simulation, all particles were moved
# into the image plane.
# If ignore_grid is True, use the predicted positions in the absence of
# simulated fields
if ignore_grid:
xloc = results_dict["x0"]
yloc = results_dict["y0"]
v = results_dict["v0"][:, 0]
else:
xloc = results_dict["x"]
yloc = results_dict["y"]
v = results_dict["v"][:, 0]
if size is None:
# If a detector size is not given, choose a size based on the
# particle positions
w = np.max([np.nanmax(np.abs(xloc)), np.nanmax(np.abs(yloc))])
size = np.array([[-w, w], [-w, w]]) * u.m
elif not isinstance(size, u.Quantity):
raise TypeError(
"Argument `size` must be an astropy.units.Quantity object with "
"units convertible to meters."
)
elif not size.unit.is_equivalent(u.m):
raise ValueError("Argument `size` must have units convertible to meters.")
elif size.shape != (2, 2):
raise ValueError(
f"Argument `size` must have shape (2, 2), but got {size.shape}."
)
# Exclude NaN positions (deleted particles) and velocities
# (stopped particles)
nan_mask = ~np.isnan(xloc) * ~np.isnan(yloc) * ~np.isnan(v)
sanitized_xloc = xloc[nan_mask]
sanitized_yloc = yloc[nan_mask]
# Generate the histogram
intensity, h, v = np.histogram2d(
sanitized_xloc, sanitized_yloc, range=size.to(u.m).value, bins=bins
)
# h, v are the bin edges: compute the centers to produce arrays
# of the right length (then trim off the extra point)
h = ((h + np.roll(h, -1)) / 2)[:-1]
v = ((v + np.roll(v, -1)) / 2)[:-1]
# Throw a warning if < 50% of the particles are included on the
# histogram
percentage = np.sum(intensity) / results_dict["nparticles"]
if percentage < 0.5:
warnings.warn(
f"Only {percentage:.2%} of the particles are shown "
"on this synthetic radiograph. Consider increasing "
"the size to include more.",
RuntimeWarning,
)
return h * u.m, v * u.m, intensity