import copy
import multiprocessing as mp
import numpy as np
from . import iface
from .propg import refocus_stack
__all__ = [
"autofocus",
"autofocus_stack",
]
_cpu_count = mp.cpu_count()
[docs]def autofocus(field, nm, res, ival, roi=None,
metric="average gradient", minimizer="lmfit",
minimizer_kwargs=None, padding=True, num_cpus=1):
"""Numerical autofocusing of a field using the Helmholtz equation.
Parameters
----------
field: 1d or 2d ndarray
Electric field is BG-Corrected, i.e. field = EX/BEx
nm: float
Refractive index of medium.
res: float
Size of wavelength in pixels.
ival: tuple of floats
Approximate interval to search for optimal focus in px.
roi: rectangular region of interest (x1, y1, x2, y2)
Region of interest of `field` for which the metric will be
minimized. If not given, the entire `field` will be used.
metric: str
- "average gradient" : average gradient metric of amplitude
- "rms contrast" : RMS contrast of phase data
- "spectrum" : sum of filtered Fourier coefficients
- "std gradient" : standard deviation of gradient metric of amplitude
- "med gradient" : median gradient metric of amplitude
minimizer: str
- "lmfit" : lmfit-based minimizer
- "legacy" : only use for reproducing old results
minimizer_kwargs: dict
Optional keyword arguments to the `minimizer` function
padding: bool
Perform padding with linear ramp from edge to average
to reduce ringing artifacts.
.. versionchanged:: 0.1.4
improved padding value and padding location
num_cpus: int
Not implemented.
Returns
-------
d, field [, other]:
The focusing distance, the field, and optionally any other
data returned by the minimizer (specify via `minimizer_kwargs`).
Notes
-----
This method uses :class:`nrefocus.RefocusNumpy` for refocusing
of 2D fields. This is because the :func:`nrefocus.refocus_stack`
function uses `async` which appears to not work with e.g.
:mod:`pyfftw`.
"""
fshape = len(field.shape)
if fshape == 1:
# 1D field
rfcls = iface.RefocusNumpy1D
elif fshape == 2:
# 2D field
rfcls = iface.RefocusNumpy
else:
raise AssertionError("Dimension of `field` must be 1 or 2.")
if minimizer_kwargs is None:
minimizer_kwargs = {}
else:
minimizer_kwargs = copy.deepcopy(minimizer_kwargs)
# use a made-up pixel size so we can use the new `Refocus` interface
pixel_size = 1
rf = rfcls(field=field,
wavelength=res*pixel_size,
pixel_size=pixel_size,
medium_index=nm,
distance=0,
kernel="helmholtz",
padding=padding
)
data = rf.autofocus(metric=metric,
minimizer=minimizer,
interval=np.array(ival)*rf.pixel_size,
roi=roi,
minimizer_kwargs=minimizer_kwargs,
ret_grid=False,
ret_field=True,
)
return data
[docs]def autofocus_stack(fieldstack, nm, res, ival, roi=None,
metric="average gradient", minimizer="lmfit",
minimizer_kwargs=None, padding=True, same_dist=False,
num_cpus=_cpu_count, copy=True):
"""Numerical autofocusing of a stack using the Helmholtz equation.
Parameters
----------
fieldstack: 2d or 3d ndarray
Electric field is BG-Corrected, i.e. Field = EX/BEx
nm: float
Refractive index of medium.
res: float
Size of wavelength in pixels.
ival: tuple of floats
Approximate interval to search for optimal focus in px.
roi: rectangular region of interest (x1, y1, x2, y2)
Region of interest of `field` for which the metric will be
minimized. If not given, the entire `field` will be used.
metric: str
see `autofocus_field`.
minimizer: str
- "lmfit" : lmfit-based minimizer
- "legacy" : only use for reproducing old results
minimizer_kwargs: dict
Optional keyword arguments to the `minimizer` function
padding: bool
Perform padding with linear ramp from edge to average
to reduce ringing artifacts.
.. versionchanged:: 0.1.4
improved padding value and padding location
same_dist: bool
Refocus entire sinogram with one distance.
num_cpus: int
Number of CPUs to use
copy: bool
If False, overwrites input array.
Returns
-------
dopt: float or list of float
The focusing distance(s) (only one value if `same_dist`)
field_stack: np.ndarray
The refocused field stack
"""
dopt = list()
m = fieldstack.shape[0]
# setup arguments
stackargs = list()
for s in range(m):
stackargs.append([np.array(fieldstack[s], copy=copy), nm, res, ival,
roi, metric, minimizer, minimizer_kwargs,
padding, 1])
# perform first pass
p = mp.Pool(num_cpus)
result = p.map_async(_autofocus_wrapper, stackargs).get()
p.close()
p.terminate()
p.join()
newstack = np.zeros(fieldstack.shape, dtype=fieldstack.dtype)
for s in range(m):
if isinstance(result[s], list):
dopt.append(result[s][0])
newstack[s] = result[s][1]
# perform second pass if `same_dist` is True
if same_dist:
# find average dopt
davg = np.average(dopt)
newstack = refocus_stack(fieldstack, davg, nm, res,
num_cpus=num_cpus, copy=copy,
padding=padding)
return davg, newstack
else:
return dopt, newstack
def _autofocus_wrapper(args):
"""Calls autofocus with *args. Needed for multiprocessing pool.
"""
return autofocus(*args)