import numpy as np
import tqdm
from .make_object_map import make_object_map
[docs]def calc_error(data, mask, W, dij_n, I, pixel_map, n0, m0, subpixel=False, verbose=True):
r"""
Parameters
----------
data : ndarray
Input data, of shape (N, M, L).
mask : ndarray
Boolean array of shape (M, L), where True indicates a good
pixel and False a bad pixel.
W : ndarray
Float array of shape (M, L) containing the estimated whitefield.
dij_n : ndarray
Float array of shape (N, 2) containing the object translations
that have been mapped onto the detector's frame of reference.
I : ndarray
Float array of shape (U, V), this is essentially an object map.
pixel_map : ndarray, (2, M, L)
An array containing the pixel mapping
between a detector frame and the object, such that:
.. math::
I^{z_1}_{\phi}[n, i, j]
= W[i, j] I^\infty[&\text{ij}_\text{map}[0, i, j] - \Delta ij[n, 0] + n_0,\\
&\text{ij}_\text{map}[1, i, j] - \Delta ij[n, 1] + m_0]
n0 : float
Slow scan offset to the pixel mapping such that:
.. math::
\text{ij}_\text{map}[0, i, j] - \Delta ij[n, 0] + n_0 \ge -0.5
\quad\text{for all } i,j
m0 : float
Fast scan offset to the pixel mapping such that:
.. math::
\text{ij}_\text{map}[1, i, j] - \Delta ij[n, 1] + m_0 \ge -0.5
\quad\text{for all } i,j
subpixel : bool, optional
If True then use bilinear subpixel interpolation non-integer pixel mappings.
verbose : bool, optional
print what I'm doing.
minimum_overlap : float or None, optional
Default is None. If float then the the object will be set to -1
where the number of data points contributing to that value is less
than "minimum_overlap".
Returns
-------
error_total : float
The global error value, :math:`\varepsilon = \sum_{n,i,j} \varepsilon[n, i, j]`.
error_frame : ndarray
Float array of shape (N,). The average pixel error per detector frame,
:math:`\varepsilon_\text{frame}[n] = \langle \varepsilon[n, i, j] \rangle_{i,j}`.
error_pixel : ndarray
Float array of shape (M, L). The average pixel error per detector pixel,
:math:`\varepsilon_\text{pixel}[i, j] = \langle \varepsilon[n, i, j]\rangle_{n}`.
Notes
-----
The error, per pixel and per frame, is given by:
.. math::
\begin{align}
\varepsilon[n, i, j] = M[i,j] \bigg[ I_\Phi[n, i, j] -
W[i, j] I_0[&\text{ij}_\text{map}[0, i, j] - \Delta ij[n, 0] + n_0,\\
&\text{ij}_\text{map}[1, i, j] - \Delta ij[n, 1] + m_0]\bigg]^2
\end{align}
"""
# mask the pixel mapping
ij = np.array([pixel_map[0], pixel_map[1]])
error_total = 0.
error_residual = np.zeros(data.shape, dtype=np.float32)
error_frame = np.zeros(data.shape[0])
error_pixel = np.zeros(data.shape[1:])
norm = np.zeros(data.shape[1:])
flux_corr = np.zeros(data.shape[0])
#sig = np.std(data, axis=0)
#sig[sig <= 0] = 1
for n in tqdm.trange(data.shape[0], desc='calculating errors'):
if subpixel:
# define the coordinate mapping and round to int
ss = pixel_map[0] - dij_n[n, 0] + n0
fs = pixel_map[1] - dij_n[n, 1] + m0
#
I0 = W * bilinear_interpolation_array(I, ss, fs, fill=-1, invalid=-1)
#I0 = I0[mask]
else :
# define the coordinate mapping and round to int
ss = np.rint((ij[0] - dij_n[n, 0] + n0)).astype(np.int)
fs = np.rint((ij[1] - dij_n[n, 1] + m0)).astype(np.int)
#
#I0 = I[ss, fs] * W[mask]
I0 = I[ss, fs] * W
d = data[n]
m = (I0>0)*(d>0)*mask
# flux correction factor
flux_corr[n] = np.sum(m * I0 * data[n]) / np.sum(m * data[n]**2)
#error_map = m*(I0 - d)**2 / sig[mask]
error_map = m*(I0 - d)**2
error_map[~m] = -1
error_residual[n] = error_map
norm += m*(W - d)**2
norm /= data.shape[0]
norm[norm==0] = 1
# normalise the residual
error_residual = error_residual / norm
# now map the errors to object space
error_reference, n0, m0 = make_object_map(error_residual, mask, np.ones_like(W), dij_n, pixel_map, subpixel=True)
# get rid of the negatives before summing
error_residual[error_residual<0] = 0
error_frame = np.sum(error_residual, axis = (1,2))
error_pixel = np.sum(error_residual, axis = 0)
error_total = np.sum(error_frame)
return error_total, error_frame, error_pixel, error_residual, error_reference, norm, flux_corr
def make_pixel_map_err(data, mask, W, O, pixel_map, n0, m0, dij_n, roi, search_window=20, grid=[20, 20]):
# demand that the data is float32 to avoid excess mem. usage
assert(data.dtype == np.float32)
import time
t0 = time.time()
##################################################################
# OpenCL crap
##################################################################
import os
import pyopencl as cl
## Step #1. Obtain an OpenCL platform.
# with a cpu device
for p in cl.get_platforms():
devices = p.get_devices(cl.device_type.CPU)
if len(devices) > 0:
platform = p
device = devices[0]
break
## Step #3. Create a context for the selected device.
context = cl.Context([device])
queue = cl.CommandQueue(context)
# load and compile the update_pixel_map opencl code
here = os.path.split(os.path.abspath(__file__))[0]
kernelsource = os.path.join(here, 'update_pixel_map.cl')
kernelsource = open(kernelsource).read()
program = cl.Program(context, kernelsource).build()
make_error_map_subpixel = program.make_error_map_subpixel
make_error_map_subpixel.set_scalar_arg_dtypes(
[None, None, None, None, None, None, None, None, None,
np.float32, np.float32,
np.int32, np.int32, np.int32, np.int32,
np.int32, np.int32, np.int32, np.int32,
np.int32, np.int32])
# Get the max work group size for the kernel test on our device
max_comp = device.max_compute_units
max_size = make_error_map_subpixel.get_work_group_info(
cl.kernel_work_group_info.WORK_GROUP_SIZE, device)
#print('maximum workgroup size:', max_size)
#print('maximum compute units :', max_comp)
# allocate local memory and dtype conversion
localmem = cl.LocalMemory(np.dtype(np.float32).itemsize * data.shape[0])
# inputs:
Win = W.astype(np.float32)
pixel_mapin = pixel_map.astype(np.float32)
Oin = O.astype(np.float32)
dij_nin = dij_n.astype(np.float32)
maskin = mask.astype(np.int32)
# outputs:
err_map = np.zeros((grid[0]*grid[1], search_window**2), dtype=np.float32)
pixel_mapout = pixel_map.astype(np.float32)
##################################################################
if type(search_window) is int :
s_ss = search_window
s_fs = search_window
else :
s_ss, s_fs = search_window
ss_min, ss_max = (-(s_ss-1)//2, (s_ss+1)//2)
fs_min, fs_max = (-(s_fs-1)//2, (s_fs+1)//2)
# list the pixels for which to calculate the error grid
ijs = []
for i in np.linspace(roi[0], roi[1]-1, grid[0]):
for j in np.linspace(roi[2], roi[3]-1, grid[1]):
ijs.append([round(i), round(j)])
ijs = np.array(ijs).astype(np.int32)
for i in tqdm.trange(1, desc='calculating pixel map shift errors'):
make_error_map_subpixel(queue, (1, ijs.shape[0]), (1, 1),
cl.SVM(Win),
cl.SVM(data),
localmem,
cl.SVM(err_map),
cl.SVM(Oin),
cl.SVM(pixel_mapout),
cl.SVM(dij_nin),
cl.SVM(maskin),
cl.SVM(ijs),
n0, m0, ijs.shape[0],
data.shape[0], data.shape[1], data.shape[2],
O.shape[0], O.shape[1], ss_min, ss_max, fs_min, fs_max)
queue.finish()
t1 = time.time()
t = t1-t0
res = make_pixel_map_err_report(ijs, err_map, mask, search_window, roi, t)
return ijs, err_map, res
def make_pixel_map_err_report(ijs, err_map, mask, search_window, roi, t):
"""
Should inform the user what the next call to update_pixel_map should be.
We need:
suggested grid,
suggested window size (I guess this is really a measure of uncertainty)
"""
# remove masked pixels
m = mask[ijs[:, 0], ijs[:, 1]]
# report on error land scape:
# find the average distance between starting point and global min
ijs_min = np.argmin(err_map, axis=1)
i0, j0 = search_window//2, search_window//2
isol, jsol = np.unravel_index(ijs_min, (search_window, search_window))
dist = np.sqrt((i0-isol)**2 + (j0-jsol)**2)
errs = np.array([err_map[i, ijs_min[i]] for i in range(err_map.shape[0])])
med_errs = np.median(err_map, axis=1)
with np.errstate(invalid='ignore'):
rat = errs / med_errs
rat_mask = ~np.isnan(rat)
# distance shift / distance pixel
con = []
for i in range(ijs[m].shape[0]):
for j in range(i):
shift_diff = np.sqrt( (isol[m][i] - isol[m][j])**2 + (jsol[m][i]- jsol[m][j])**2)
pixel_dist = np.sqrt( (ijs[m][i][0] - ijs[m][j][0])**2 + (ijs[m][i][1] - ijs[m][j][1])**2)
con.append( shift_diff / pixel_dist)
con = np.array(con)
print('-----------------------------------------------------------------')
print('Report on pixel map error landscape: median +- standard deviation')
print('-----------------------------------------------------------------')
print('pixel map shift amount :',np.median(dist[m]), '+-', np.std(dist[m]))
print('global minimum error :',np.median(errs[m]), '+-', np.std(errs[m]))
print('median error level :',np.median(med_errs[m]), '+-', np.std(med_errs[m]))
print('minimum error / median err :',np.median(rat[rat_mask]), '+-', np.std(rat[rat_mask]))
print('pixel map shift / physical pixel dist :',np.median(con), '+-', np.std(con))
#
# remove outliers
err_thresh = np.median(errs[m]) + 2*np.std(errs[m])
rat_thresh = np.median(rat[rat_mask]) + 2*np.std(rat[rat_mask])
rat[~rat_mask] = 2*rat_thresh
m = mask[ijs[:, 0], ijs[:, 1]] * (errs < err_thresh) * (rat < rat_thresh)
print('suggested error threshold :',err_thresh)
print('suggested ratio threshold :',rat_thresh)
# recalculate
rat = errs[m] / med_errs[m]
con = []
con_ss = []
con_fs = []
for i in range(ijs[m].shape[0]):
for j in range(i):
shift_diff_ss = (isol[m][i] - isol[m][j])**2
shift_diff_fs = (jsol[m][i] - jsol[m][j])**2
pixel_dist = (ijs[m][i][0] - ijs[m][j][0])**2 + (ijs[m][i][1] - ijs[m][j][1])**2
con.append( np.sqrt( (shift_diff_ss + shift_diff_fs) / (pixel_dist + pixel_dist) ))
con_ss.append( np.sqrt( shift_diff_ss / pixel_dist))
con_fs.append( np.sqrt( shift_diff_fs / pixel_dist))
con = np.array(con)
con_ss = np.array(con_ss)
con_fs = np.array(con_fs)
print('-----------------------------------------------------------------')
print('After outlier removal (error threshold and ratio threshold)')
print('-----------------------------------------------------------------')
print('pixel map shift amount :',np.median(dist[m]), '+-', np.std(dist[m]))
print('global minimum error :',np.median(errs[m]), '+-', np.std(errs[m]))
print('median error level :',np.median(med_errs[m]), '+-', np.std(med_errs[m]))
print('minimum error / median err :',np.median(rat), '+-', np.std(rat))
print('pixel map shift / physical pixel dist :',np.median(con), '+-', np.std(con))
dist_ss = np.sqrt((i0-isol)**2)
dist_fs = np.sqrt((j0-jsol)**2)
# choose window size:
sug_search_window = [int(round(4*(np.median(dist_ss[m]) + 2*np.std(dist_ss[m])))),
int(round(4*(np.median(dist_fs[m]) + 2*np.std(dist_fs[m]))))]
# choose sampling grid:
# the tolerable number of pixels between samples is
# the number of pixels before con * pixel dist > search_window // 2
# where search_window is the next smaller search window used for interpolation
sw_next = 4
a = (np.median(con_ss) + 2*np.std(con_ss))
b = (np.median(con_fs) + 2*np.std(con_fs))
grid = [1, 1]
if a != 0 :
pixel_dist_ss = (sw_next // 2) / a
grid[0] = 2*int(round((roi[1]-roi[0])) / pixel_dist_ss)
if b != 0 :
pixel_dist_fs = (sw_next // 2) / b
grid[1] = 2*int(round((roi[3]-roi[2]) / pixel_dist_fs))
# estimated time
sec_per_elem = t / (search_window**2 * ijs.shape[0])
res = {'search_window': sug_search_window, 'grid': grid, 'error_threshold': err_thresh}
print('\n')
print('suggested search window :', res['search_window'])
print('suggested sampling grid :', res['grid'])
print('estimated calculation time (min):', round(sec_per_elem * sug_search_window[0]*sug_search_window[1] * grid[0] * grid[1] / 60.,1))
return res
def bilinear_interpolation_array(array, ss, fs, fill = -1, invalid=-1):
"""
See https://en.wikipedia.org/wiki/Bilinear_interpolation
"""
out = np.zeros(ss.shape)
s0, s1 = np.floor(ss).astype(np.uint32), np.ceil(ss).astype(np.uint32)
f0, f1 = np.floor(fs).astype(np.uint32), np.ceil(fs).astype(np.uint32)
# check out of bounds
m = (ss > 0) * (ss <= (array.shape[0]-1)) * (fs > 0) * (fs <= (array.shape[1]-1))
s0[~m] = 0
s1[~m] = 0
f0[~m] = 0
f1[~m] = 0
# careful with edges
s1[(s1==s0)*(s0==0)] += 1
s0[(s1==s0)*(s0!=0)] -= 1
f1[(f1==f0)*(f0==0)] += 1
f0[(f1==f0)*(f0!=0)] -= 1
# make the weighting function
w00 = (s1-ss)*(f1-fs)
w01 = (s1-ss)*(fs-f0)
w10 = (ss-s0)*(f1-fs)
w11 = (ss-s0)*(fs-f0)
# renormalise for invalid pixels
w00[array[s0,f0]==invalid] = 0.
w01[array[s0,f1]==invalid] = 0.
w10[array[s1,f0]==invalid] = 0.
w11[array[s1,f1]==invalid] = 0.
# if all pixels are invalid then return fill
s = w00+w10+w01+w11
m = (s!=0)*m
out[m] = w00[m] * array[s0[m],f0[m]] \
+ w10[m] * array[s1[m],f0[m]] \
+ w01[m] * array[s0[m],f1[m]] \
+ w11[m] * array[s1[m],f1[m]]
out[m] /= s[m]
out[~m] = fill
return out
def bilinear_interpolation(array, ss, fs, fill = -1, invalid=-1):
"""
See https://en.wikipedia.org/wiki/Bilinear_interpolation
"""
import math
# check out of bounds
if (ss < 0) or (ss> (array.shape[0]-1)) or (fs < 0) or (fs > (array.shape[1]-1)):
return fill
s0, s1 = math.floor(ss), math.ceil(ss)
f0, f1 = math.floor(fs), math.ceil(fs)
# careful with edges
if s1==s0 :
if s0 == 0 :
s1 += 1
else :
s0 -= 1
if f1==f0 :
if f0 == 0 :
f1 += 1
else :
f0 -= 1
# make the weighting function
w = np.zeros((2,2), dtype=float)
a = np.array([[array[s0, f0], array[s0, f1]],
array[s1, f0], array[s1, f1]])
w[0, 0] = (s1-ss)*(f1-fs)
w[0, 1] = (s1-ss)*(fs-f0)
w[1, 0] = (ss-s0)*(f1-fs)
w[1, 1] = (ss-s0)*(fs-f0)
# renormalise for invalid pixels
w[a==invalid] = 0.
s = np.sum(w)
# if all pixels are invalid then return fill
if s == 0 :
return fill
w = w/np.sum(w)
return np.sum( w * a )
def flux_correction(data, W, O, n0, m0, u1, dij_n, mask):
cs = []
for n in tqdm.trange(data.shape[0], desc='calculating errors'):
# define the coordinate mapping and round to int
ss = u1[0] - dij_n[n, 0] + n0
fs = u1[1] - dij_n[n, 1] + m0
#
I0 = W * bilinear_interpolation_array(O, ss, fs, fill=-1, invalid=-1)
#
m = I0>0
c = np.sum(mask * m * I0 * data[n]) / np.sum(mask * m * data[n]**2)
cs.append(c)
return (data.T * np.array(cs)).T.astype(np.float32)