Files
OBSUTILS/focus_utils.py
2024-10-27 10:34:10 +03:00

763 lines
23 KiB
Python

import astromatic as amt
import argparse as ap
from astropy.io import fits
from concurrent.futures import ProcessPoolExecutor
from scipy.optimize import least_squares
import numpy as np
import matplotlib.pyplot as plt
import pathlib as pt
import os
#
# Observation process utilities
#
# compute residuals from polynomial model and data
# model = poly(t,x), x - independent variable and t - polynomial coefficients
# res = model - y
def polyModelResid(poly_coeffs, x, y):
"""
Compute residuals from polynomial model and data
model = poly(x, poly_coeffs), where poly_coeff = [c0, c1, c2] and model = c0 + c1*x + c2*x*x
res = model - y
"""
xx = np.asarray(x)
f = poly_coeffs[0] + poly_coeffs[1] * xx + poly_coeffs[2] * xx * xx
return f - y
def getFocusCalcPars(**user_kwds):
""" """
foc_kwds = {
"focus_kwd": "FOCUS", # FITS keyword name with focus value
"do_filter": True, # SExtractor's FILTER configuration option
"conv_fwhm": 5, # gaussian convolution filter FWHM (in pixels)
"conv_dim": [9, 9], # dimension of gaussian convolution filter (in pixels)
"flux_frac": 0.7, # SExtractor's PHOT_FLUXFRAC configuration option
# SExtractor's commandline options
"sex_cmdline_opts": [
"-DETECT_MINAREA 5",
"-DETECT_THRESH 3.5",
"-DEBLEND_MINCONT 0.5",
"-PHOT_AUTOPARAMS 2.5,1.0",
],
"filt_flag": 0, # SExtractor's FLAGS parameter value for sutable objects
"work_area": None, # Working area ('xc,yc,radius' or 'xmin,ymin,xmax,ymax')
# (if the value is None - use of entire image area)
"snr_thresh": None, # SExtractor's SNR value threshold to be used for selected objects
# (if SNR is None - use of all objects)
"flux_rad_thresh": None, # SExtractor's FLUX_RADIUS threshold to be used for selected objects,
# (if the value is None - use of all objects)
# 2D histogram parameters (fr_step, mag_step) to be computed from 'FLUX_RADIUS-MAG_AUTO' diagram
"hist_step": [1, 1],
# loss function name (see scipy.optimize.least_squares). if None then use of non-robust fitting
"loss_func": "soft_l1",
# Result focussing curve image filename (see Matplotlib reference to supported graphics formats)
"focus_curve_filename": None,
"result_data_filename": None, # Result focussing curve values filename
# Saving options for result focussing curve (see NumPy savetxt function)
"result_data_save_opts": {
"fmt": "%g",
"header": " Focussing curve:\n [focus value] [flux radius]",
"comments": "#",
},
}
# user values
for key, value in user_kwds.items():
if key in foc_kwds.keys():
foc_kwds[key] = value
return foc_kwds
def getFocusSequencePars(**user_kwds):
""" """
seq_kwds = {
"focus_start": None,
"focus_stop": None,
"focus_step": None,
"root_filename": "focus.fits",
"start_idx": 1, # start index of files in sequence
"acq_per_value": 1, # number of acquisitions per focus value
"exp_time": 10, # exposure time in seconds
"set_to_best": True, # set focus to the best value
} | getFocusCalcPars()
# user values
for key, value in user_kwds.items():
if key in seq_kwds.keys():
seq_kwds[key] = value
return seq_kwds
def fitFocusCurve(
focus_value,
flux_rad,
loss_func="soft_l1",
result_data_filename=None,
focus_curve_filename=None,
result_data_save_opts={"fmt": "%g", "header": "Focussing curve", "comments": "#"},
**unused
):
# non-robust fit to estimate "errors"
nr_res = np.polynomial.polynomial.Polynomial.fit(
focus_value, flux_rad, 2
) # parabola
coeffs = nr_res.convert().coef
if loss_func is not None:
bounds = (
[-np.inf, -np.inf, 0.0],
np.inf,
) # parabola with minimum (2nd derivative must be positive)
if coeffs[2] < 0: # it seems the non-robust fit failed!
coeffs[2] = 0 # so, guess curve is just constant!
coeffs[1] = 0
coeffs[0] = np.nanmedian(flux_rad)
scale = np.nanmedian(np.abs(flux_rad - coeffs[0]))
else:
nr_model = nr_res(np.asarray(focus_value))
scale = np.nanmedian(np.abs(flux_rad - nr_model))
# if coeffs[2] < 0:
# coeffs[2] = 0
# nr_model = nr_res(np.asarray(focus_value))
# scale = np.abs(np.nanmedian(flux_rad - nr_model))
# robust fitting
res = least_squares(
polyModelResid,
coeffs,
bounds=bounds,
loss=loss_func,
f_scale=scale,
args=(focus_value, flux_rad),
)
coeffs = res.x
best_foc = -coeffs[1] / coeffs[2] / 2.0
else:
best_foc = -coeffs[1] / coeffs[2] / 2.0
if result_data_filename is not None:
np.savetxt(
result_data_filename,
np.transpose((focus_value, flux_rad)),
**result_data_save_opts
)
if focus_curve_filename is not None:
focus_start = np.min(focus_value)
focus_stop = np.max(focus_value)
x = np.linspace(focus_start, focus_stop, 100)
y = coeffs[0] + coeffs[1] * x + coeffs[2] * x * x
fig = plt.figure()
plt.subplot(111, xlabel="Focus value", ylabel="Flux fraction radius")
plt.title(label="Focusing curve: best focus {:g}".format(best_foc))
plt.plot(x, y, "r-")
plt.plot(focus_value, flux_rad, "b*")
fig.savefig(focus_curve_filename)
return (best_foc, coeffs, loss_func is not None)
def computeFocus(
image_filenames, max_wks=None, log_output=None, log_ident="\t", **foc_cal_kwds
):
"""
Compute focussing curve values from a sequence of files.
Input parameters:
image_filenames: iterable of FITS filenames
max_wks: max workers to be used to compute best focus value
log_ouput: file-like (stream) object fo log output. if None - no log
log_ident: ident string (symbols before output log messages)
Returns:
{"focus_value": best focus value (float or None)
"coeffs": coefficients of best fit parabola (np.array or None)
"is_robust": is robust fit method was used (boolean)
"ret_code": return code
}
"ret_code":
0: OK
-1: at least 3 files must be given
-2: cannot perform calculations (see log_output)
"""
ret_val = {
"focus_value": None,
"coeffs": None,
"is_robust": False,
"ret_code": 0,
}
if len(image_filenames) < 3:
ret_val["ret_code"] = -1
return ret_val
f_kwds = getFocusCalcPars(**foc_cal_kwds)
focus_value = []
futures = []
flux_rad = []
if log_output is None:
log_output = open(os.devnull, "w")
executor = ProcessPoolExecutor(max_workers=max_wks)
for fname in image_filenames:
print(log_ident, end="")
print("read focus value from {} ...\t".format(fname), file=log_output, end="")
try:
hdr = fits.getheader(fname)
# focus_value.append(hdr[f_kwds["focus_kwd"]])
focus_value.append(float(hdr[f_kwds["focus_kwd"]]))
except FileNotFoundError:
print("FAIL!", file=log_output)
print(log_ident, end="")
print("WARNING: cannot find file {}! Skip!".format(fname), file=log_output)
continue
except NameError:
print("FAIL!", file=log_output)
print(log_ident, end="")
print(
"WARNING: cannot find FITS keyword {} in file {}! Skip!".format(
f_kwds["focus_kwd"], fname
),
file=log_output,
)
continue
except ValueError:
print("FAIL!", file=log_output)
print(log_ident, end="")
print(
"WARNING: cannot convert FITS keyword '{}' value '{}' to number! Skip!".format(
f_kwds["focus_kwd"], hdr[f_kwds["focus_kwd"]]
),
file=log_output,
)
continue
print("OK", file=log_output)
futures.append(executor.submit(amt.compute_mean_flux_radius, fname, **f_kwds))
if len(futures):
print("", file=log_output)
print(log_ident, file=log_output, end="")
print(
"waiting for the end of calculations ...\t",
file=log_output,
end="",
flush=True,
)
for i in range(len(futures)):
flux_rad.append(futures[i].result())
print("OK\n", file=log_output)
else:
print(log_ident, end="")
print("ERROR: It seems there were no valid FITS files! Stop!", file=log_output)
ret_val["ret_code"] = -2
return ret_val
fit_res = fitFocusCurve(focus_value, flux_rad, **f_kwds)
ret_val["focus_value"] = fit_res[0]
ret_val["coeffs"] = fit_res[1]
ret_val["is_robust"] = fit_res[2]
print(log_ident, end="")
print("best focus value is {:g}".format(ret_val["focus_value"]), file=log_output)
return ret_val
def focussingSequence(
focus_range,
init_seq_func,
set_focus_func,
get_image_func,
max_wks=None,
log_output=None,
log_ident="\t",
**kwds
):
"""
Focussing sequence
focus_range: [start, stop] (7 points)
or [start, stop, step]
init_seq_func: a function with signature:
init_seq_func(seq_kwds),
where seq_kwds - sequence control dictionary (see getFocusSequencePars function)
set_focus_func: a function of signature:
set_focus_func(foc_val)
get_image_func: a function of signature:
get_image_func(image_filename, exp_time)
exp_time in seconds
log_ouput: file-like (stream) object fo log output. if None - no log
log_ident: ident string (symbols before output log messages)
Returns:
{"focus_value": best focus value (float or None)
"coeffs": coefficients of best fit parabola (np.array or None)
"is_robust": is robust fit method was used (boolean)
"ret_code": return code
}
"ret_code":
0: OK
-1: invalid focus range
-2: cannot perform calculations (see log_output)
"""
ret_val = {
"focus_value": None,
"coeffs": None,
"is_robust": False,
"ret_code": 0,
}
if log_output is None:
log_output = open(os.devnull, "w")
if len(focus_range) < 2:
ret_val["ret_code"] = -1
return ret_val
N = 5
if len(focus_range) > 2:
if np.isclose(focus_range[2], 0):
print(log_ident, file=log_output, end="")
print("Invalid (zero) focus step value!", file=log_output)
ret_val["ret_code"] = -1
return ret_val
if np.isclose(focus_range[0], focus_range[1]):
print(log_ident, file=log_output, end="")
print("Invalid focus range parameters!", file=log_output)
ret_val["ret_code"] = -1
return ret_val
if focus_range[0] < focus_range[1]:
if focus_range[2] < 0:
print(log_ident, file=log_output, end="")
print("Invalid focus range parameters!", file=log_output)
ret_val["ret_code"] = -1
return ret_val
else:
if focus_range[2] > 0:
print(log_ident, file=log_output, end="")
print("Invalid focus range parameters!", file=log_output)
ret_val["ret_code"] = -1
return ret_val
N = int((focus_range[1] - focus_range[0]) / focus_range[2] + 1)
if N < 3:
ret_val["ret_code"] = -1
print(log_ident, file=log_output, end="")
print("Number of focus values must be at greater than 2!", file=log_output)
return ret_val
if focus_range[0] < focus_range[1]:
focus_value = np.linspace(focus_range[0], focus_range[1], N)
else:
focus_value = np.flip(np.linspace(focus_range[1], focus_range[0], N))
seq_kwds = getFocusSequencePars(**kwds)
init_seq_func(seq_kwds)
if seq_kwds["acq_per_value"] > 1:
focus_value = (
np.array([focus_value] * seq_kwds["acq_per_value"]).transpose().flatten()
)
rname = pt.Path(seq_kwds["root_filename"]).absolute()
# dir = rname.parent
# fname = rname.stem
# file_ext = rname.suffix
# if file_ext == "":
# file_ext = ".fit"
futures = []
flux_rad = []
executor = ProcessPoolExecutor(max_workers=max_wks)
i_foc = seq_kwds["start_idx"]
ret_code = 0
for foc_val in focus_value:
print(log_ident, file=log_output, end="")
print(
"set focus value to {:g} ...\t".format(foc_val),
file=log_output,
end="",
flush=True,
)
ret_code = set_focus_func(foc_val)
if ret_code:
ret_val["ret_code"] = ret_code
print("FAIL!", file=log_output, flush=True)
break
else:
print("OK", file=log_output, flush=True)
# filename = "{}_{:04d}{}".format(fname, i_foc, file_ext)
# full_filename = str(pt.Path.joinpath(dir, pt.Path(filename)))
full_filename = str(rname.with_stem("{}_{:04d}".format(rname.stem, i_foc)))
print(log_ident, file=log_output, end="")
print(
"get image '{}' ...\t".format(full_filename),
file=log_output,
end="",
flush=True,
)
ret_code = get_image_func(full_filename, seq_kwds["exp_time"])
if ret_code:
ret_val["ret_code"] = ret_code
print("FAIL!", file=log_output, flush=True)
break
else:
print("OK", file=log_output, flush=True)
futures.append(
executor.submit(amt.compute_mean_flux_radius, full_filename, **seq_kwds)
)
i_foc += 1
print("", file=log_output, flush=True)
print(log_ident, file=log_output, end="", flush=True)
print(
"waiting for the end of calculations ...\t", file=log_output, end="", flush=True
)
for i in range(len(futures)):
flux_rad.append(futures[i].result())
print("OK", file=log_output, flush=True)
if ret_code == 0:
fit_res = fitFocusCurve(focus_value, flux_rad, **seq_kwds)
ret_val["focus_value"] = fit_res[0]
ret_val["coeffs"] = fit_res[1]
ret_val["is_robust"] = fit_res[2]
else:
print(
"THERE WERE ERRORS DURING FOCUSSING SEQUENCE SO NO BEST FOCUS IS AVAILABLE!!!",
file=log_output,
end="",
flush=True,
)
return ret_val
# ------ commandline helper functions -----
def getFocusCalcCmdlinePars(parser):
"""
Add to 'argparse' parser object focus calculation related commandline arguments
"""
parser.add_argument(
"--fkwd",
help="FITS keyword name with focus value. Default is 'FOCUS'",
default="FOCUS",
type=str,
)
parser.add_argument(
"-p",
"--flux-frac",
help="SExtractor's PHOT_FLUXFRAC configuration option. " + "Default is 0.7",
type=float,
default=0.7,
)
parser.add_argument(
"--hist",
help="2D histogram parameters (fr_step, mag_step) to be computed from 'FLUX_RADIUS-MAG_AUTO' diagram. Default is '1,1'",
default="1,1",
type=str,
)
parser.add_argument(
"-f",
"--conv-fwhm",
help="SExtractor's gaussian convolution filter FWHM"
+ " (in pixels). Default is 5",
type=float,
default=5.0,
)
parser.add_argument(
"-d",
"--conv-dim",
help="SExtractor's gaussian convolution filter dimensions"
+ " (in pixels: xsize,ysize). Default is '9,9'",
default="9,9",
type=str,
)
parser.add_argument(
"--flag",
help="SExtractor's FLAGS parameter value for sutable objects. Default is 0.",
type=int,
default=0,
)
parser.add_argument(
"--frad-thresh",
help="SExtractor's FLUX_RADIUS threshold to be used for selected objects.\
Default is use of all objects",
type=float,
default=-1,
)
parser.add_argument(
"-s",
"--snr",
help="SExtractor's SNR value threshold to be used for selected objects.\
Default is use of all objects",
type=float,
default=-1,
)
parser.add_argument(
"-a",
"--area",
help="Working area ('xc,yc,radius' or 'xmin,ymin,xmax,ymax'). "
+ "The coordinates must be in FITS pixel notation! "
+ "Default is use of entire image",
type=str,
default=None,
)
parser.add_argument(
"--sex-cmd",
help="SExtractor additional commandline arguments as a string "
+ '(e.g. --sex-cmd "-GAIN 1.1 -SEEING 1.5").',
action="append",
default=None,
)
parser.add_argument(
"--loss",
help="Loss function name (see SciPy 'least_squres' function) for robust fitting. "
+ "To use non-robust fitting set the argument to 'linear' or 'none'. Default is 'soft_l1'",
default="soft_l1",
)
parser.add_argument(
"-c",
"--focussing-curve",
help="filename of focussing curve image",
type=str,
default=None,
)
parser.add_argument(
"-o",
"--data-curve",
help="filename of focussing curve ASCII data file",
type=str,
default=None,
)
return parser
def getFocussingSequenceCmdlinePars(parser):
"""
Add to 'argparse' parser object focus calculation and focussing sequence related commandline arguments
"""
parser = getFocusCalcCmdlinePars(parser)
parser.add_argument(
"-r",
"--repeat",
help="Number of acquisitions per focus value. Default is 1.",
default=1,
type=int,
)
parser.add_argument(
"-t",
"--exp-time",
help="Exposure time in seconds. Default is 10.",
type=float,
default=10,
)
parser.add_argument(
"--foc-file",
help="Rootname of output focussing images. Default is 'focus.fits'. The algorithm generates files of sequence as "
"'rootname_0001.rootname_ext, rootname_0002.rootname_ext ... rootname_NNNN.rootname_ext', where NNNN is an index number of acquisition.",
type=str,
default="focus.fits",
)
parser.add_argument(
"--do-not-set",
help="Do not set focus to the best value",
action="store_true",
default=False,
)
return parser
def parsFocusCalcCmdlinePars(parser_args, parser_msg=[]):
if parser_args.sex_cmd is None:
parser_args.sex_cmd = [
"-DETECT_MINAREA 5",
"-DETECT_THRESH 3.5",
"-DEBLEND_MINCONT 0.5",
"-PHOT_AUTOPARAMS 2.5,1.0",
]
# convolution filter dimension
v = parser_args.conv_dim.split(",")
if len(v) < 2:
parser_msg.append("Invalid convolution filter dimension! Use of default value!")
parser_args.conv_dim = [9, 9]
else:
vv = [0, 0]
for i in range(2):
try:
vv[i] = int(v[i])
except ValueError:
parser_msg.append(
"Invalid convolution filter dimension! Use of default value!"
)
vv = [9, 9]
break
parser_args.conv_dim = vv
# working area
if parser_args.area is not None:
v = parser_args.area.split(",")
if len(v) < 3:
parser_msg.append("Invalid working area! Use of default value!")
parser_args.area = None
else:
vv = []
for i, el in enumerate(v):
try:
vv.append(int(el))
except ValueError:
parser_msg.append("Invalid working area! Use of default value!")
vv = None
break
parser_args.area = vv
if parser_args.flag < 0:
parser_args.flag = 0
if parser_args.snr <= 0:
parser_args.snr = None
if parser_args.frad_thresh < 0:
parser_args.frac_thresh = None
v = parser_args.hist.split(",")
if len(v) < 2:
parser_msg.append("Invalid 2D-histogram parameters! Use of default value!")
parser_args.hist = [1, 1]
else:
vv = [0, 0]
for i in range(2):
try:
vv[i] = float(v[i])
except ValueError:
parser_msg.append(
"Invalid 2D-histogram parameters! Use of default value!"
)
vv = [1, 1]
break
parser_args.hist = vv
kwds = getFocusCalcPars()
kwds["focus_kwd"] = parser_args.fkwd
kwds["do_filter"] = True
kwds["conv_fwhm"] = parser_args.conv_fwhm
kwds["conv_dim"] = parser_args.conv_dim
kwds["flux_frac"] = parser_args.flux_frac
kwds["sex_cmdline_opts"] = parser_args.sex_cmd
kwds["filt_flag"] = parser_args.flag
kwds["work_area"] = parser_args.area
kwds["snr_thresh"] = parser_args.snr
kwds["flux_rad_thresh"] = parser_args.frad_thresh
kwds["hist_step"] = parser_args.hist
kwds["loss_func"] = (
parser_args.loss
if parser_args.loss != "linear" or parser_args.loss != "none"
else None
)
kwds["focus_curve_filename"] = parser_args.focussing_curve
kwds["result_data_filename"] = parser_args.data_curve
return kwds
def parsFocussingSequenceCmdlinePars(parser_args, parser_msg=[]):
""" """
foc_kwds = parsFocusCalcCmdlinePars(parser_args, parser_msg)
kwds = getFocusSequencePars() | foc_kwds
if parser_args.foc_file == "":
parser_msg.append("Invalid '--foc_file' argument! Use of defaul value!")
kwds["root_filename"] = (
parser_args.foc_file if parser_args.foc_file != "" else "focus.fits"
)
if parser_args.repeat <= 0:
parser_msg.append("Invalid '--repeat' argument! Use of defaul value!")
if parser_args.exp_time <= 0:
parser_msg.append("Invalid '--exp_time' argument! Use of defaul value!")
kwds["acq_per_value"] = parser_args.repeat if parser_args.repeat > 0 else 1
kwds["exp_time"] = parser_args.exp_time if parser_args.exp_time > 0 else 10
kwds["set_to_best"] = not parser_args.do_not_set
return kwds