Source code for nrefocus._autofocus

import multiprocessing as mp
import numpy as np

from . import metrics
from . import pad
from ._propagate import fft_propagate, refocus_stack


__all__ = [
    "autofocus",
    "autofocus_stack",
    "minimize_metric",
]


_cpu_count = mp.cpu_count()


[docs]def autofocus(field, nm, res, ival, roi=None, metric="average gradient", padding=True, ret_d=False, ret_grad=False, num_cpus=1): """Numerical autofocusing of a field using the Helmholtz equation. Parameters ---------- field: 1d or 2d ndarray Electric field is BG-Corrected, i.e. field = EX/BEx nm: float Refractive index of medium. res: float Size of wavelength in pixels. ival: tuple of floats Approximate interval to search for optimal focus in px. roi: rectangular region of interest (x1, y1, x2, y2) Region of interest of `field` for which the metric will be minimized. If not given, the entire `field` will be used. metric: str - "average gradient" : average gradient metric of amplitude - "rms contrast" : RMS contrast of phase data - "spectrum" : sum of filtered Fourier coefficients padding: bool Perform padding with linear ramp from edge to average to reduce ringing artifacts. .. versionchanged:: 0.1.4 improved padding value and padding location ret_d: bool Return the autofocusing distance in pixels. Defaults to False. ret_grad: bool Return the computed gradients as a list. num_cpus: int Not implemented. Returns ------- field, [d, [grad]] The focused field and optionally, the optimal focusing distance and the computed gradients. """ if metric == "average gradient": def metric_func(x): return metrics.average_gradient(np.abs(x)) elif metric == "rms contrast": def metric_func(x): return -metrics.contrast_rms(np.angle(x)) elif metric == "spectrum": def metric_func(x): return metrics.spectral(np.abs(x), res) else: raise ValueError("No such metric: {}".format(metric)) field, d, grad = minimize_metric(field, metric_func, nm, res, ival, roi=roi, padding=padding) ret_list = [field] if ret_d: ret_list += [d] if ret_grad: ret_list += [grad] if len(ret_list) == 1: return ret_list[0] else: return tuple(ret_list)
[docs]def autofocus_stack(fieldstack, nm, res, ival, roi=None, metric="average gradient", padding=True, same_dist=False, ret_ds=False, ret_grads=False, num_cpus=_cpu_count, copy=True): """Numerical autofocusing of a stack using the Helmholtz equation. Parameters ---------- fieldstack: 2d or 3d ndarray Electric field is BG-Corrected, i.e. Field = EX/BEx nm: float Refractive index of medium. res: float Size of wavelength in pixels. ival: tuple of floats Approximate interval to search for optimal focus in px. metric: str see `autofocus_field`. padding: bool Perform padding with linear ramp from edge to average to reduce ringing artifacts. .. versionchanged:: 0.1.4 improved padding value and padding location same_dist: bool Refocus entire sinogram with one distance. ret_ds: bool Return the autofocusing distances in pixels. Defaults to False. If sam_dist is True, still returns autofocusing distances of first pass. The used refocusing distance is the average. ret_grads: bool Return the computed gradients as a list. num_cpus: int Number of CPUs to use copy: bool If False, overwrites input array. Returns ------- The focused field (and the refocussing distance + data if d is None) """ dopt = list() grad = list() M = fieldstack.shape[0] # setup arguments stackargs = list() for s in range(M): stackargs.append([np.array(fieldstack[s], copy=copy), nm, res, ival, roi, metric, padding, True, True, 1]) # perform first pass p = mp.Pool(num_cpus) result = p.map_async(_autofocus_wrapper, stackargs).get() p.close() p.terminate() p.join() # result = [] # for arg in stackargs: # result += _autofocus_wrapper(arg) newstack = np.zeros(fieldstack.shape, dtype=fieldstack.dtype) for s in range(M): field, ds, gs = result[s] dopt.append(ds) grad.append(gs) newstack[s] = field # perform second pass if `same_dist` is True if same_dist: # find average dopt davg = np.average(dopt) newstack = refocus_stack(fieldstack, davg, nm, res, num_cpus=num_cpus, copy=copy, padding=padding) ret_list = [newstack] if ret_ds: ret_list += [dopt] if ret_grads: ret_list += [grad] if len(ret_list) == 1: return ret_list[0] else: return tuple(ret_list)
def minimize_metric(field, metric_func, nm, res, ival, roi=None, coarse_acc=1, fine_acc=.005, return_gradient=True, padding=True): """Find the focus by minimizing the `metric` of an image Parameters ---------- field : 2d array electric field metric_func : callable some metric to be minimized ival : tuple of floats (minimum, maximum) of interval to search in pixels nm : float RI of medium res : float wavelength in pixels roi : rectangular region of interest (x1, y1, x2, y2) Region of interest of `field` for which the metric will be minimized. If not given, the entire `field` will be used. coarse_acc : float accuracy for determination of global minimum in pixels fine_acc : float accuracy for fine localization percentage of gradient change return_gradient: return x and y values of computed gradient padding : bool perform padding with linear ramp from edge to average to reduce ringing artifacts. .. versionchanged:: 0.1.4 improved padding value and padding location """ if roi is not None: assert len(roi) == len(field.shape) * \ 2, "ROI must match field dimension" initshape = field.shape Fshape = len(initshape) propfunc = fft_propagate if roi is None: if Fshape == 2: roi = (0, 0, field.shape[0], field.shape[1]) else: roi = (0, field.shape[0]) roi = 1*np.array(roi) if padding: # Pad with correct complex number field = pad.pad_add(field) if ival[0] > ival[1]: ival = (ival[1], ival[0]) # set coarse interval # coarse_acc = int(np.ceil(ival[1]-ival[0]))/100 N = int(100 / coarse_acc) zc = np.linspace(ival[0], ival[1], N, endpoint=True) # compute fft of field fftfield = np.fft.fftn(field) # fftplan = fftw3.Plan(fftfield.copy(), None, nthreads = _ncores, # direction="backward", flags=_fftwflags) # initiate gradient vector gradc = np.zeros(zc.shape) for i in range(len(zc)): d = zc[i] # fsp = propfunc(fftfield, d, nm, res, fftplan=fftplan) fsp = propfunc(fftfield, d, nm, res) if Fshape == 2: gradc[i] = metric_func(fsp[roi[0]:roi[2], roi[1]:roi[3]]) else: gradc[i] = metric_func(fsp[roi[0]:roi[1]]) minid = np.argmin(gradc) if minid == 0: zc -= zc[1] - zc[0] minid += 1 if minid == len(zc) - 1: zc += zc[1] - zc[0] minid -= 1 zf = 1*zc gradf = 1 * gradc numfine = 10 mingrad = gradc[minid] while True: gradf = np.zeros(numfine) ival = (zf[minid - 1], zf[minid + 1]) zf = np.linspace(ival[0], ival[1], numfine) for i in range(len(zf)): d = zf[i] fsp = propfunc(fftfield, d, nm, res) if Fshape == 2: gradf[i] = metric_func(fsp[roi[0]:roi[2], roi[1]:roi[3]]) else: gradf[i] = metric_func(fsp[roi[0]:roi[1]]) minid = np.argmin(gradf) if minid == 0: zf -= zf[1] - zf[0] minid += 1 if minid == len(zf) - 1: zf += zf[1] - zf[0] minid -= 1 if abs(mingrad - gradf[minid]) / 100 < fine_acc: break minid = np.argmin(gradf) fsp = propfunc(fftfield, zf[minid], nm, res) if padding: fsp = pad.pad_rem(fsp) if return_gradient: return fsp, zf[minid], [(zc, gradc), (zf, gradf)] return fsp, zf[minid] def _autofocus_wrapper(args): """Calls autofocus with *args. Needed for multiprocessing pool. """ return autofocus(*args)