Source code for nrefocus.propg

import multiprocessing as mp
import numpy as np

from . import iface


__all__ = ["refocus", "refocus_stack"]


_cpu_count = mp.cpu_count()


[docs]def refocus(field, d, nm, res, method="helmholtz", padding=True): """Refocus a 1D or 2D field Parameters ---------- field : 1d or 2d array 1D or 2D background corrected electric field (Ex/BEx) d : float Distance to be propagated in pixels (negative for backwards) nm : float Refractive index of medium res : float Wavelenth in pixels method : str Defines the method of propagation; one of - "helmholtz" : the optical transfer function `exp(idkₘ(M-1))` - "fresnel" : paraxial approximation `exp(idk²/kₘ)` padding : bool perform padding with linear ramp from edge to average to reduce ringing artifacts. .. versionadded:: 0.1.4 Returns ------- Electric field at `d`. Notes ----- This method uses :class:`nrefocus.RefocusNumpy` for refocusing of 2D fields. This is because the :func:`nrefocus.refocus_stack` function uses `async` which appears to not work with e.g. :mod:`pyfftw`. """ fshape = len(field.shape) if fshape == 1: # 1D field rfcls = iface.RefocusNumpy1D elif fshape == 2: # 2D field rfcls = iface.RefocusNumpy else: raise AssertionError("Dimension of `field` must be 1 or 2.") # use a made-up pixel size so we can use the new `Refocus` interface pixel_size = 1e-6 rf = rfcls(field=field, wavelength=res*pixel_size, pixel_size=pixel_size, medium_index=nm, distance=0, kernel=method, padding=padding ) refoc = rf.propagate(distance=d*pixel_size) return refoc
[docs]def refocus_stack(fieldstack, d, nm, res, method="helmholtz", num_cpus=_cpu_count, copy=True, padding=True): """Refocus a stack of 1D or 2D fields Parameters ---------- fieldstack : 2d or 3d array Stack of 1D or 2D background corrected electric fields (Ex/BEx). The first axis iterates through the individual fields. d : float Distance to be propagated in pixels (negative for backwards) nm : float Refractive index of medium res : float Wavelenth in pixels method : str Defines the method of propagation; one of - "helmholtz" : the optical transfer function `exp(idkₘ(M-1))` - "fresnel" : paraxial approximation `exp(idk²/kₘ)` num_cpus : int Defines the number of CPUs to be used for refocusing. copy : bool If False, overwrites input stack. padding : bool Perform padding with linear ramp from edge to average to reduce ringing artifacts. .. versionadded:: 0.1.4 Returns ------- Electric field stack at `d`. """ func = refocus names = func.__code__.co_varnames[:func.__code__.co_argcount] loc = locals() vardict = dict() for name in names: if name in loc.keys(): vardict[name] = loc[name] # default keyword arguments func_def = func.__defaults__[::-1] vardict["padding"] = padding M = fieldstack.shape[0] stackargs = list() # Create individual arglists for all fields for m in range(M): kwarg = vardict.copy() kwarg["field"] = fieldstack[m] # now we turn the kwarg into an arglist args = list() for i, a in enumerate(names[::-1]): # first set default if i < len(func_def): val = func_def[i] if a in kwarg: val = kwarg[a] args.append(val) stackargs.append(args[::-1]) p = mp.Pool(num_cpus) result = p.map_async(_refocus_wrapper, stackargs).get() p.close() p.terminate() p.join() if copy: data = np.zeros(fieldstack.shape, dtype=result[0].dtype) else: data = fieldstack for m in range(M): data[m] = result[m] return data
def _refocus_wrapper(args): """Just calls autofocus with *args. Needed for multiprocessing pool. """ return refocus(*args)