import multiprocessing as mp
import numpy as np
from . import iface
from . import metrics
from .minimizers import minimize_legacy
from .propg import refocus_stack
__all__ = [
"autofocus",
"autofocus_stack",
]
_cpu_count = mp.cpu_count()
[docs]def autofocus(field, nm, res, ival, roi=None,
metric="average gradient", padding=True,
ret_d=False, ret_grad=False, num_cpus=1):
"""Numerical autofocusing of a field using the Helmholtz equation.
Parameters
----------
field: 1d or 2d ndarray
Electric field is BG-Corrected, i.e. field = EX/BEx
nm: float
Refractive index of medium.
res: float
Size of wavelength in pixels.
ival: tuple of floats
Approximate interval to search for optimal focus in px.
roi: rectangular region of interest (x1, y1, x2, y2)
Region of interest of `field` for which the metric will be
minimized. If not given, the entire `field` will be used.
metric: str
- "average gradient" : average gradient metric of amplitude
- "rms contrast" : RMS contrast of phase data
- "spectrum" : sum of filtered Fourier coefficients
padding: bool
Perform padding with linear ramp from edge to average
to reduce ringing artifacts.
.. versionchanged:: 0.1.4
improved padding value and padding location
ret_d: bool
Return the autofocusing distance in pixels. Defaults to False.
ret_grad: bool
Return the computed gradients as a list.
num_cpus: int
Not implemented.
Returns
-------
field, [d, [grad]]
The focused field and optionally, the optimal focusing distance and
the computed gradients.
"""
fshape = len(field.shape)
if fshape == 1:
# 1D field
rfcls = iface.RefocusNumpy1D
elif fshape == 2:
# 2D field
rfcls = iface.RefocusNumpy
else:
raise AssertionError("Dimension of `field` must be 1 or 2.")
metric_func = metrics.METRICS[metric]
# use a made-up pixel size so we can use the new `Refocus` interface
pixel_size = 1e-6
rf = rfcls(field=field,
wavelength=res*pixel_size,
pixel_size=pixel_size,
medium_index=nm,
distance=0,
kernel="helmholtz",
padding=padding
)
field, d, grad = minimize_legacy(rf=rf,
metric_func=metric_func,
interval=ival,
roi=roi,
padding=padding)
ret_list = [field]
if ret_d:
ret_list += [d]
if ret_grad:
ret_list += [grad]
if len(ret_list) == 1:
return ret_list[0]
else:
return tuple(ret_list)
[docs]def autofocus_stack(fieldstack, nm, res, ival, roi=None,
metric="average gradient", padding=True,
same_dist=False, ret_ds=False, ret_grads=False,
num_cpus=_cpu_count, copy=True):
"""Numerical autofocusing of a stack using the Helmholtz equation.
Parameters
----------
fieldstack: 2d or 3d ndarray
Electric field is BG-Corrected, i.e. Field = EX/BEx
nm: float
Refractive index of medium.
res: float
Size of wavelength in pixels.
ival: tuple of floats
Approximate interval to search for optimal focus in px.
roi: rectangular region of interest (x1, y1, x2, y2)
Region of interest of `field` for which the metric will be
minimized. If not given, the entire `field` will be used.
metric: str
see `autofocus_field`.
padding: bool
Perform padding with linear ramp from edge to average
to reduce ringing artifacts.
.. versionchanged:: 0.1.4
improved padding value and padding location
same_dist: bool
Refocus entire sinogram with one distance.
ret_ds: bool
Return the autofocusing distances in pixels. Defaults to False.
If sam_dist is True, still returns autofocusing distances
of first pass. The used refocusing distance is the
average.
ret_grads: bool
Return the computed gradients as a list.
num_cpus: int
Number of CPUs to use
copy: bool
If False, overwrites input array.
Returns
-------
The focused field (and the refocussing distance + data if d is None)
"""
dopt = list()
grad = list()
M = fieldstack.shape[0]
# setup arguments
stackargs = list()
for s in range(M):
stackargs.append([np.array(fieldstack[s], copy=copy), nm, res, ival,
roi, metric, padding, True, True, 1])
# perform first pass
p = mp.Pool(num_cpus)
result = p.map_async(_autofocus_wrapper, stackargs).get()
p.close()
p.terminate()
p.join()
# result = []
# for arg in stackargs:
# result += _autofocus_wrapper(arg)
newstack = np.zeros(fieldstack.shape, dtype=fieldstack.dtype)
for s in range(M):
field, ds, gs = result[s]
dopt.append(ds)
grad.append(gs)
newstack[s] = field
# perform second pass if `same_dist` is True
if same_dist:
# find average dopt
davg = np.average(dopt)
newstack = refocus_stack(fieldstack, davg, nm, res,
num_cpus=num_cpus, copy=copy,
padding=padding)
ret_list = [newstack]
if ret_ds:
ret_list += [dopt]
if ret_grads:
ret_list += [grad]
if len(ret_list) == 1:
return ret_list[0]
else:
return tuple(ret_list)
def _autofocus_wrapper(args):
"""Calls autofocus with *args. Needed for multiprocessing pool.
"""
return autofocus(*args)