Initial commit
This commit is contained in:
11
__init__.py
Normal file
11
__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from .focus_utils import (
|
||||
getFocusCalcPars,
|
||||
getFocusSequencePars,
|
||||
fitFocusCurve,
|
||||
computeFocus,
|
||||
focussingSequence,
|
||||
getFocusCalcCmdlinePars,
|
||||
getFocussingSequenceCmdlinePars,
|
||||
parsFocusCalcCmdlinePars,
|
||||
parsFocussingSequenceCmdlinePars,
|
||||
)
|
||||
55
best_focus.py
Executable file
55
best_focus.py
Executable file
@@ -0,0 +1,55 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
#
|
||||
# Compute the best focus value from sequence of FITS images
|
||||
#
|
||||
|
||||
|
||||
import OBSUTILS as obsutil
|
||||
import argparse as ap
|
||||
import sys
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = ap.ArgumentParser(prog=sys.argv[1])
|
||||
|
||||
parser.add_argument(
|
||||
"files", help="Acquired FITS image filenames of focussing sequence", nargs="*"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", help="Verbose output", action="store_true", default=False
|
||||
)
|
||||
|
||||
parser = obsutil.getFocusCalcCmdlinePars(parser)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
log = []
|
||||
foc_kwds = obsutil.parsFocusCalcCmdlinePars(args, log)
|
||||
|
||||
if args.verbose:
|
||||
if len(log):
|
||||
for log_str in log:
|
||||
print(log_str)
|
||||
|
||||
if args.verbose:
|
||||
print("START FOCUS MEASUREMENT: \n")
|
||||
|
||||
if args.verbose:
|
||||
result = obsutil.computeFocus(args.files, log_output=sys.stdout, **foc_kwds)
|
||||
else:
|
||||
result = obsutil.computeFocus(args.files, log_output=None, **foc_kwds)
|
||||
|
||||
# if args.verbose:
|
||||
# for log_str in result["log"]:
|
||||
# print("\t", log_str)
|
||||
|
||||
if result["ret_code"] != 0:
|
||||
if args.verbose:
|
||||
print("\nFAIL!")
|
||||
sys.exit(result["ret_code"])
|
||||
|
||||
if args.verbose:
|
||||
print("\nALL DONE")
|
||||
|
||||
sys.exit(0)
|
||||
181
focus_sequence_FLI.py
Executable file
181
focus_sequence_FLI.py
Executable file
@@ -0,0 +1,181 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
#
|
||||
# Get sequence of focussing images and
|
||||
# compute the best focus value
|
||||
#
|
||||
# This variant of the script uses Eddy Emelianov's
|
||||
# 'fli_control' executable
|
||||
#
|
||||
|
||||
|
||||
import OBSUTILS as obsutil
|
||||
import argparse as ap
|
||||
import sys
|
||||
import numpy as np
|
||||
import subprocess as sp
|
||||
|
||||
|
||||
# --- FLI-hardware related (ROBOTEL variant)
|
||||
|
||||
|
||||
def set_focus(foc_val):
|
||||
cmd = ["fli_control", "-g", str(foc_val)]
|
||||
ret = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
|
||||
return ret.returncode
|
||||
|
||||
|
||||
def get_image(filename, exp_time):
|
||||
cmd = [
|
||||
"fli_control",
|
||||
"-r",
|
||||
"/tmp/10micron.fitsheader",
|
||||
"-x",
|
||||
"{:d}".format(np.round(exp_time * 1000)), # to microseconds
|
||||
filename,
|
||||
]
|
||||
|
||||
ret = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
|
||||
return ret.returncode
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = ap.ArgumentParser(
|
||||
prog=sys.argv[1],
|
||||
description="FLI CCD and focuser hardware: focussing sequence implementation. It is assumed that 'fli_control' software is installed in the OS.",
|
||||
)
|
||||
|
||||
parser.add_argument("focus_start", help="focus start value", type=float)
|
||||
parser.add_argument("focus_stop", help="focus stop value", type=float)
|
||||
parser.add_argument(
|
||||
"focus_step", help="focus step value", type=float, nargs="*", default=np.nan
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--guess",
|
||||
help="Use of two-steps focussing strategy. Input focus start and stop values are interpretated"
|
||||
+ "as a guess range while step value is one for precise measurements.",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-N",
|
||||
"--num",
|
||||
help="Number of points for precise stage measurement. Default is 7. Ignored if no '--guess'.",
|
||||
type=np.uint64,
|
||||
default=7,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", help="Verbose output", action="store_true", default=False
|
||||
)
|
||||
|
||||
parser = obsutil.getFocusCalcCmdlinePars(parser)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if (args.num < 3) and (args.guess):
|
||||
if args.verbose:
|
||||
print("Number of focus values must be greater than 2! Abort!")
|
||||
sys.exit(-1)
|
||||
|
||||
focus_start = float(args.focus_start)
|
||||
focus_stop = float(args.focus_stop)
|
||||
focus_step = float(args.focus_step)
|
||||
|
||||
if np.isfinite(focus_step):
|
||||
if np.isclose(focus_step, 0.0):
|
||||
if args.verbose:
|
||||
print("Invalid (zero) focus step value! Abort!")
|
||||
sys.exit(1)
|
||||
|
||||
if np.isclose(focus_start, focus_stop):
|
||||
if args.verbose:
|
||||
print("Invalid focus range parameters (an empty range)! Abort!")
|
||||
sys.exit(1)
|
||||
|
||||
if (focus_stop - focus_start) * focus_step < 0:
|
||||
if args.verbose:
|
||||
print("Invalid focus range parameters! Abort!")
|
||||
sys.exit(1)
|
||||
|
||||
# if (focus_start > focus_stop) and (focus_step > 0):
|
||||
# if args.verbose:
|
||||
# print("Invalid focus range parameters! Abort!")
|
||||
# sys.exit(1)
|
||||
|
||||
# if (focus_start < focus_stop) and (focus_step < 0):
|
||||
# if args.verbose:
|
||||
# print("Invalid focus range parameters! Abort!")
|
||||
# sys.exit(1)
|
||||
else:
|
||||
if args.guess:
|
||||
if args.verbose:
|
||||
print("If '--guess' is set then step value must be given! Abort!")
|
||||
exit(1)
|
||||
|
||||
log = []
|
||||
|
||||
seq_kwds = obsutil.parsFocussingSequenceCmdlinePars(args, log)
|
||||
|
||||
if args.verbose:
|
||||
if len(log):
|
||||
for log_str in log:
|
||||
print(log_str)
|
||||
|
||||
if args.verbose:
|
||||
print("START FOCUSSING SEQUENCE ...")
|
||||
|
||||
if args.guess: # rough focus estimation
|
||||
result = obsutil.focussingSequence(
|
||||
[focus_start, focus_stop],
|
||||
set_focus,
|
||||
get_image,
|
||||
None,
|
||||
sys.stdout,
|
||||
**seq_kwds
|
||||
)
|
||||
|
||||
if result["ret_code"] != 0:
|
||||
print("\n\tERROR: Cannot perform rought focus estimation! Abort!")
|
||||
print("\nFAIL!")
|
||||
sys.exit(result["ret_code"])
|
||||
|
||||
# compute precise range start and stop values
|
||||
r = args.num * focus_step / 2.0
|
||||
focus_start = result["focus_value"] - r
|
||||
focus_stop = result["focus_value"] + r
|
||||
|
||||
if np.isfinite(args.focus_step):
|
||||
result = obsutil.focussingSequence(
|
||||
[focus_start, focus_stop, focus_step],
|
||||
set_focus,
|
||||
get_image,
|
||||
None,
|
||||
sys.stdout,
|
||||
**seq_kwds
|
||||
)
|
||||
else:
|
||||
result = obsutil.focussingSequence(
|
||||
[focus_start, focus_stop],
|
||||
set_focus,
|
||||
get_image,
|
||||
None,
|
||||
sys.stdout,
|
||||
**seq_kwds
|
||||
)
|
||||
|
||||
if args.verbose:
|
||||
print("\tThe best focus value: {:g}".format(result["focus_value"]))
|
||||
|
||||
if not args.do_no_set:
|
||||
if args.verbose:
|
||||
print("\n\tSet focus to the best value ...", end="")
|
||||
set_focus(result["focus_value"])
|
||||
print("\tOK")
|
||||
|
||||
if args.verbose:
|
||||
print("DONE.")
|
||||
|
||||
sys.exit(0)
|
||||
164
focus_sequence_SKEL.py
Executable file
164
focus_sequence_SKEL.py
Executable file
@@ -0,0 +1,164 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
#
|
||||
# Get sequence of focussing images and
|
||||
# compute the best focus value
|
||||
#
|
||||
#
|
||||
|
||||
|
||||
import OBSUTILS as obsutil
|
||||
import argparse as ap
|
||||
import sys
|
||||
import numpy as np
|
||||
import subprocess as sp
|
||||
|
||||
|
||||
# --- FLI-hardware related (ROBOTEL variant)
|
||||
|
||||
|
||||
def set_focus(foc_val):
|
||||
pass
|
||||
|
||||
|
||||
def get_image(filename, exp_time):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = ap.ArgumentParser(prog=sys.argv[1])
|
||||
|
||||
parser.add_argument("focus_start", help="focus start value", type=float)
|
||||
parser.add_argument("focus_stop", help="focus stop value", type=float)
|
||||
parser.add_argument(
|
||||
"focus_step", help="focus step value", type=float, nargs="*", default=np.nan
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--guess",
|
||||
help="Use of two-steps focussing strategy. Input focus start and stop values are interpretated"
|
||||
+ "as a guess range while step value is one for precise measurements.",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-N",
|
||||
"--num",
|
||||
help="Number of points for precise stage measurement. Default is 7. Ignored if no '--guess'.",
|
||||
type=np.uint64,
|
||||
default=7,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-v", "--verbose", help="Verbose output", action="store_true", default=False
|
||||
)
|
||||
|
||||
parser = obsutil.getFocusCalcCmdlinePars(parser)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if (args.num < 3) and (args.guess):
|
||||
if args.verbose:
|
||||
print("Number of focus values must be greater than 2! Abort!")
|
||||
sys.exit(-1)
|
||||
|
||||
focus_start = float(args.focus_start)
|
||||
focus_stop = float(args.focus_stop)
|
||||
focus_step = float(args.focus_step)
|
||||
|
||||
if np.isfinite(focus_step):
|
||||
if np.isclose(focus_step, 0.0):
|
||||
if args.verbose:
|
||||
print("Invalid (zero) focus step value! Abort!")
|
||||
sys.exit(1)
|
||||
|
||||
if np.isclose(focus_start, focus_stop):
|
||||
if args.verbose:
|
||||
print("Invalid focus range parameters (an empty range)! Abort!")
|
||||
sys.exit(1)
|
||||
|
||||
if (focus_stop - focus_start) * focus_step < 0:
|
||||
if args.verbose:
|
||||
print("Invalid focus range parameters! Abort!")
|
||||
sys.exit(1)
|
||||
|
||||
# if (focus_start > focus_stop) and (focus_step > 0):
|
||||
# if args.verbose:
|
||||
# print("Invalid focus range parameters! Abort!")
|
||||
# sys.exit(1)
|
||||
|
||||
# if (focus_start < focus_stop) and (focus_step < 0):
|
||||
# if args.verbose:
|
||||
# print("Invalid focus range parameters! Abort!")
|
||||
# sys.exit(1)
|
||||
else:
|
||||
if args.guess:
|
||||
if args.verbose:
|
||||
print("If '--guess' is set then step value must be given! Abort!")
|
||||
exit(1)
|
||||
|
||||
log = []
|
||||
|
||||
seq_kwds = obsutil.parsFocussingSequenceCmdlinePars(args, log)
|
||||
|
||||
if args.verbose:
|
||||
if len(log):
|
||||
for log_str in log:
|
||||
print(log_str)
|
||||
|
||||
if args.verbose:
|
||||
print("START FOCUSSING SEQUENCE ...")
|
||||
|
||||
if args.guess: # rough focus estimation
|
||||
result = obsutil.focussingSequence(
|
||||
[focus_start, focus_stop],
|
||||
set_focus,
|
||||
get_image,
|
||||
None,
|
||||
sys.stdout,
|
||||
**seq_kwds
|
||||
)
|
||||
|
||||
if result["ret_code"] != 0:
|
||||
print("\n\tERROR: Cannot perform rought focus estimation! Abort!")
|
||||
print("\nFAIL!")
|
||||
sys.exit(result["ret_code"])
|
||||
|
||||
# compute precise range start and stop values
|
||||
r = args.num * focus_step / 2.0
|
||||
focus_start = result["focus_value"] - r
|
||||
focus_stop = result["focus_value"] + r
|
||||
|
||||
if np.isfinite(args.focus_step):
|
||||
result = obsutil.focussingSequence(
|
||||
[focus_start, focus_stop, focus_step],
|
||||
set_focus,
|
||||
get_image,
|
||||
None,
|
||||
sys.stdout,
|
||||
**seq_kwds
|
||||
)
|
||||
else:
|
||||
result = obsutil.focussingSequence(
|
||||
[focus_start, focus_stop],
|
||||
set_focus,
|
||||
get_image,
|
||||
None,
|
||||
sys.stdout,
|
||||
**seq_kwds
|
||||
)
|
||||
|
||||
if args.verbose:
|
||||
print("\tThe best focus value: {:g}".format(result["focus_value"]))
|
||||
|
||||
if not args.do_no_set:
|
||||
if args.verbose:
|
||||
print("\n\tSet focus to the best value ...", end="")
|
||||
set_focus(result["focus_value"])
|
||||
print("\tOK")
|
||||
|
||||
if args.verbose:
|
||||
print("DONE.")
|
||||
|
||||
sys.exit(0)
|
||||
705
focus_utils.py
Normal file
705
focus_utils.py
Normal file
@@ -0,0 +1,705 @@
|
||||
import astromatic as amt
|
||||
import argparse as ap
|
||||
from astropy.io import fits
|
||||
from concurrent.futures import ProcessPoolExecutor
|
||||
from scipy.optimize import least_squares
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
import pathlib as pt
|
||||
import os
|
||||
|
||||
|
||||
#
|
||||
# Observation process utilities
|
||||
#
|
||||
|
||||
|
||||
# compute residuals from polynomial model and data
|
||||
# model = poly(t,x), x - independent variable and t - polynomial coefficients
|
||||
# res = model - y
|
||||
def polyModelResid(poly_coeffs, x, y):
|
||||
"""
|
||||
Compute residuals from polynomial model and data
|
||||
model = poly(x, poly_coeffs), where poly_coeff = [c0, c1, c2] and model = c0 + c1*x + c2*x*x
|
||||
res = model - y
|
||||
|
||||
"""
|
||||
|
||||
xx = np.asarray(x)
|
||||
f = poly_coeffs[0] + poly_coeffs[1] * xx + poly_coeffs[2] * xx * xx
|
||||
|
||||
return f - y
|
||||
|
||||
|
||||
def getFocusCalcPars(**user_kwds):
|
||||
""" """
|
||||
|
||||
foc_kwds = {
|
||||
"focus_kwd": "FOCUS", # FITS keyword name with focus value
|
||||
"do_filter": True, # SExtractor's FILTER configuration option
|
||||
"conv_fwhm": 5, # gaussian convolution filter FWHM (in pixels)
|
||||
"conv_dim": [9, 9], # dimension of gaussian convolution filter (in pixels)
|
||||
"flux_frac": 0.7, # SExtractor's PHOT_FLUXFRAC configuration option
|
||||
# SExtractor's commandline options
|
||||
"sex_cmdline_opts": [
|
||||
"-DETECT_MINAREA 5",
|
||||
"-DETECT_THRESH 3.5",
|
||||
"-DEBLEND_MINCONT 0.5",
|
||||
"-PHOT_AUTOPARAMS 2.5,1.0",
|
||||
],
|
||||
"filt_flag": 0, # SExtractor's FLAGS parameter value for sutable objects
|
||||
"work_area": None, # Working area ('xc,yc,radius' or 'xmin,ymin,xmax,ymax')
|
||||
# (if the value is None - use of entire image area)
|
||||
"snr_thresh": None, # SExtractor's SNR value threshold to be used for selected objects
|
||||
# (if SNR is None - use of all objects)
|
||||
"flux_rad_thresh": None, # SExtractor's FLUX_RADIUS threshold to be used for selected objects,
|
||||
# (if the value is None - use of all objects)
|
||||
# 2D histogram parameters (fr_step, mag_step) to be computed from 'FLUX_RADIUS-MAG_AUTO' diagram
|
||||
"hist_step": [1, 1],
|
||||
# loss function name (see scipy.optimize.least_squares). if None then use of non-robust fitting
|
||||
"loss_func": "soft_l1",
|
||||
# Result focussing curve image filename (see Matplotlib reference to supported graphics formats)
|
||||
"focus_curve_filename": None,
|
||||
"result_data_filename": None, # Result focussing curve values filename
|
||||
# Saving options for result focussing curve (see NumPy savetxt function)
|
||||
"result_data_save_opts": {
|
||||
"fmt": "%g",
|
||||
"header": " Focussing curve:\n [focus value] [flux radius]",
|
||||
"comments": "#",
|
||||
},
|
||||
}
|
||||
|
||||
# user values
|
||||
for key, value in user_kwds.items():
|
||||
if key in foc_kwds.keys():
|
||||
foc_kwds[key] = value
|
||||
|
||||
return foc_kwds
|
||||
|
||||
|
||||
def getFocusSequencePars(**user_kwds):
|
||||
""" """
|
||||
|
||||
seq_kwds = {
|
||||
"focus_start": None,
|
||||
"focus_stop": None,
|
||||
"focus_step": None,
|
||||
"root_filename": "focus.fits",
|
||||
"acq_per_value": 1, # number of acquisitions per focus value
|
||||
"exp_time": 10, # exposure time in seconds
|
||||
"set_to_best": True, # set focus to the best value
|
||||
} | getFocusCalcPars()
|
||||
|
||||
# user values
|
||||
for key, value in user_kwds.items():
|
||||
if key in seq_kwds.keys():
|
||||
seq_kwds[key] = value
|
||||
|
||||
return seq_kwds
|
||||
|
||||
|
||||
def fitFocusCurve(
|
||||
focus_value,
|
||||
flux_rad,
|
||||
loss_func="soft_l1",
|
||||
result_data_filename=None,
|
||||
focus_curve_filename=None,
|
||||
result_data_save_opts={"fmt": "%g", "header": "Focussing curve", "comments": "#"},
|
||||
**unused
|
||||
):
|
||||
# non-robust fit to estimate "errors"
|
||||
nr_res = np.polynomial.polynomial.Polynomial.fit(
|
||||
focus_value, flux_rad, 2
|
||||
) # parabola
|
||||
|
||||
coeffs = nr_res.convert().coef
|
||||
|
||||
if loss_func is not None:
|
||||
bounds = (
|
||||
[-np.inf, -np.inf, 0.0],
|
||||
np.inf,
|
||||
) # parabola with minimum (2nd derivative must be positive)
|
||||
|
||||
if coeffs[2] < 0:
|
||||
coeffs[2] = 0
|
||||
|
||||
scale = np.abs(np.nanmedian(flux_rad - nr_res(focus_value)))
|
||||
# robust fitting
|
||||
res = least_squares(
|
||||
polyModelResid,
|
||||
coeffs,
|
||||
bounds=bounds,
|
||||
loss=loss_func,
|
||||
f_scale=scale,
|
||||
args=(focus_value, flux_rad),
|
||||
)
|
||||
coeffs = res.x
|
||||
best_foc = -coeffs[1] / coeffs[2] / 2.0
|
||||
else:
|
||||
best_foc = -coeffs[1] / coeffs[2] / 2.0
|
||||
|
||||
if result_data_filename is not None:
|
||||
np.savetxt(
|
||||
result_data_filename,
|
||||
np.transpose((focus_value, flux_rad)),
|
||||
**result_data_save_opts
|
||||
)
|
||||
|
||||
if focus_curve_filename is not None:
|
||||
focus_start = np.min(focus_value)
|
||||
focus_stop = np.max(focus_value)
|
||||
|
||||
x = np.linspace(focus_start, focus_stop, 100)
|
||||
y = coeffs[0] + coeffs[1] * x + coeffs[2] * x * x
|
||||
fig = plt.figure()
|
||||
plt.subplot(111, xlabel="Focus value", ylabel="Flux fraction radius")
|
||||
plt.title(label="Focusing curve: best focus {:g}".format(best_foc))
|
||||
plt.plot(x, y, "r-")
|
||||
plt.plot(focus_value, flux_rad, "b*")
|
||||
fig.savefig(focus_curve_filename)
|
||||
|
||||
return (best_foc, coeffs, loss_func is not None)
|
||||
|
||||
|
||||
def computeFocus(
|
||||
image_filenames, max_wks=None, log_output=None, log_ident="\t", **foc_cal_kwds
|
||||
):
|
||||
"""
|
||||
Compute focussing curve values from a sequence of files.
|
||||
|
||||
Input parameters:
|
||||
image_filenames: iterable of FITS filenames
|
||||
max_wks: max workers to be used to compute best focus value
|
||||
log_ouput: file-like (stream) object fo log output. if None - no log
|
||||
log_ident: ident string (symbols before output log messages)
|
||||
|
||||
Returns:
|
||||
{"focus_value": best focus value (float or None)
|
||||
"coeffs": coefficients of best fit parabola (np.array or None)
|
||||
"is_robust": is robust fit method was used (boolean)
|
||||
"ret_code": return code
|
||||
}
|
||||
|
||||
"ret_code":
|
||||
0: OK
|
||||
-1: at least 3 files must be given
|
||||
-2: cannot perform calculations (see log_output)
|
||||
|
||||
"""
|
||||
|
||||
ret_val = {
|
||||
"focus_value": None,
|
||||
"coeffs": None,
|
||||
"is_robust": False,
|
||||
"ret_code": 0,
|
||||
}
|
||||
|
||||
if len(image_filenames) < 3:
|
||||
ret_val["ret_code"] = -1
|
||||
return ret_val
|
||||
|
||||
f_kwds = getFocusCalcPars(**foc_cal_kwds)
|
||||
|
||||
focus_value = []
|
||||
futures = []
|
||||
flux_rad = []
|
||||
|
||||
if log_output is None:
|
||||
log_output = open(os.devnull, "w")
|
||||
|
||||
executor = ProcessPoolExecutor(max_workers=max_wks)
|
||||
|
||||
for fname in image_filenames:
|
||||
print(log_ident, end="")
|
||||
print("read focus value from {} ...\t".format(fname), file=log_output, end="")
|
||||
|
||||
try:
|
||||
hdr = fits.getheader(fname)
|
||||
# focus_value.append(hdr[f_kwds["focus_kwd"]])
|
||||
focus_value.append(float(hdr[f_kwds["focus_kwd"]]))
|
||||
except FileNotFoundError:
|
||||
print("FAIL!", file=log_output)
|
||||
print(log_ident, end="")
|
||||
print("WARNING: cannot find file {}! Skip!".format(fname), file=log_output)
|
||||
continue
|
||||
except NameError:
|
||||
print("FAIL!", file=log_output)
|
||||
print(log_ident, end="")
|
||||
print(
|
||||
"WARNING: cannot find FITS keyword {} in file {}! Skip!".format(
|
||||
f_kwds["focus_kwd"], fname
|
||||
),
|
||||
file=log_output,
|
||||
)
|
||||
continue
|
||||
except ValueError:
|
||||
print("FAIL!", file=log_output)
|
||||
print(log_ident, end="")
|
||||
print(
|
||||
"WARNING: cannot convert FITS keyword '{}' value '{}' to number! Skip!".format(
|
||||
f_kwds["focus_kwd"], hdr[f_kwds["focus_kwd"]]
|
||||
),
|
||||
file=log_output,
|
||||
)
|
||||
continue
|
||||
|
||||
print("OK", file=log_output)
|
||||
|
||||
futures.append(executor.submit(amt.compute_mean_flux_radius, fname, **f_kwds))
|
||||
|
||||
if len(futures):
|
||||
print("", file=log_output)
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("waiting for the end of calculations ...\t", file=log_output, end="")
|
||||
|
||||
for i in range(len(futures)):
|
||||
flux_rad.append(futures[i].result())
|
||||
|
||||
print("OK\n", file=log_output)
|
||||
|
||||
else:
|
||||
print(log_ident, end="")
|
||||
print("ERROR: It seems there were no valid FITS files! Stop!", file=log_output)
|
||||
ret_val["ret_code"] = -2
|
||||
return ret_val
|
||||
|
||||
fit_res = fitFocusCurve(focus_value, flux_rad, **f_kwds)
|
||||
|
||||
ret_val["focus_value"] = fit_res[0]
|
||||
ret_val["coeffs"] = fit_res[1]
|
||||
ret_val["is_robust"] = fit_res[2]
|
||||
|
||||
print(log_ident, end="")
|
||||
print("best focus value is {:g}".format(ret_val["focus_value"]), file=log_output)
|
||||
|
||||
return ret_val
|
||||
|
||||
|
||||
def focussingSequence(
|
||||
focus_range,
|
||||
set_focus_func,
|
||||
get_image_func,
|
||||
max_wks=None,
|
||||
log_output=None,
|
||||
log_ident="\t",
|
||||
**kwds
|
||||
):
|
||||
"""
|
||||
Focussing sequence
|
||||
|
||||
focus_range: [start, stop] (7 points)
|
||||
or [start, stop, step]
|
||||
set_focus_func: a function of signature:
|
||||
set_focus_func(foc_val)
|
||||
get_image_func: a function of signature:
|
||||
get_image_func(image_filename, exp_time)
|
||||
exp_time in seconds
|
||||
log_ouput: file-like (stream) object fo log output. if None - no log
|
||||
log_ident: ident string (symbols before output log messages)
|
||||
|
||||
Returns:
|
||||
{"focus_value": best focus value (float or None)
|
||||
"coeffs": coefficients of best fit parabola (np.array or None)
|
||||
"is_robust": is robust fit method was used (boolean)
|
||||
"ret_code": return code
|
||||
}
|
||||
|
||||
"ret_code":
|
||||
0: OK
|
||||
-1: invalid focus range
|
||||
-2: cannot perform calculations (see log_output)
|
||||
"""
|
||||
|
||||
ret_val = {
|
||||
"focus_value": None,
|
||||
"coeffs": None,
|
||||
"is_robust": False,
|
||||
"ret_code": 0,
|
||||
}
|
||||
|
||||
if log_output is None:
|
||||
log_output = open(os.devnull, "w")
|
||||
|
||||
if len(focus_range) < 2:
|
||||
ret_val["ret_code"] = -1
|
||||
return ret_val
|
||||
|
||||
N = 7
|
||||
|
||||
if len(focus_range) > 2:
|
||||
if np.isclose(focus_range[2], 0):
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("Invalid (zero) focus step value!", file=log_output)
|
||||
ret_val["ret_code"] = -1
|
||||
return ret_val
|
||||
|
||||
if np.isclose(focus_range[0], focus_range[1]):
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("Invalid focus range parameters!", file=log_output)
|
||||
ret_val["ret_code"] = -1
|
||||
return ret_val
|
||||
|
||||
if focus_range[0] < focus_range[1]:
|
||||
if focus_range[2] < 0:
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("Invalid focus range parameters!", file=log_output)
|
||||
ret_val["ret_code"] = -1
|
||||
return ret_val
|
||||
else:
|
||||
if focus_range[2] > 0:
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("Invalid focus range parameters!", file=log_output)
|
||||
ret_val["ret_code"] = -1
|
||||
return ret_val
|
||||
|
||||
N = (focus_range[1] - focus_range[0]) / focus_range[2] + 1
|
||||
|
||||
if N < 3:
|
||||
ret_val["ret_code"] = -1
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("Number of focus values must be at greater than 2!", file=log_output)
|
||||
return ret_val
|
||||
|
||||
if focus_range[0] < focus_range[1]:
|
||||
focus_value = np.linspace(focus_range[0], focus_range[1], N)
|
||||
else:
|
||||
focus_value = np.flip(np.linspace(focus_range[1], focus_range[0], N))
|
||||
|
||||
seq_kwds = getFocusSequencePars(**kwds)
|
||||
|
||||
if seq_kwds["acq_per_value"] > 1:
|
||||
focus_value = (
|
||||
np.array([focus_value] * seq_kwds["acq_per_value"]).transpose().flatten()
|
||||
)
|
||||
|
||||
rname = pt.Path(seq_kwds["root_filename"]).absolute()
|
||||
|
||||
dir = rname.parent
|
||||
fname = rname.stem
|
||||
file_ext = rname.suffix
|
||||
if file_ext == "":
|
||||
file_ext = ".fits"
|
||||
|
||||
focus_value = []
|
||||
futures = []
|
||||
flux_rad = []
|
||||
|
||||
executor = ProcessPoolExecutor(max_workers=max_wks)
|
||||
|
||||
i_foc = 0
|
||||
for foc_val in focus_value:
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("set focus value to {:g} ...\t".format(foc_val), file=log_output, end="")
|
||||
|
||||
set_focus_func(foc_val)
|
||||
|
||||
print("OK", file=log_output)
|
||||
|
||||
filename = "{}_{:04d}{}".format(fname, i_foc, file_ext)
|
||||
full_filename = pt.Path.joinpath(dir, pt.Path(filename))
|
||||
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("get image '{}' ...\t".format(full_filename), file=log_output, end="")
|
||||
|
||||
get_image_func(full_filename, seq_kwds["exp_time"])
|
||||
|
||||
print("OK", file=log_output)
|
||||
|
||||
futures.append(executor.submit(amt.compute_mean_flux_radius, fname, **seq_kwds))
|
||||
|
||||
print("", file=log_output)
|
||||
print(log_ident, file=log_output, end="")
|
||||
print("waiting for the end of calculations ...\t", file=log_output, end="")
|
||||
|
||||
for i in range(len(futures)):
|
||||
flux_rad.append(futures[i].result())
|
||||
|
||||
print("OK", file=log_output)
|
||||
|
||||
fit_res = fitFocusCurve(focus_value, flux_rad, **seq_kwds)
|
||||
|
||||
ret_val["focus_value"] = fit_res[0]
|
||||
ret_val["coeffs"] = fit_res[1]
|
||||
ret_val["is_robust"] = fit_res[2]
|
||||
|
||||
return ret_val
|
||||
|
||||
|
||||
# ------ commandline helper functions -----
|
||||
|
||||
|
||||
def getFocusCalcCmdlinePars(parser):
|
||||
"""
|
||||
Add to 'argparse' parser object focus calculation related commandline arguments
|
||||
"""
|
||||
|
||||
parser.add_argument(
|
||||
"--fkwd",
|
||||
help="FITS keyword name with focus value. Default is 'FOCUS'",
|
||||
default="FOCUS",
|
||||
type=str,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--flux-frac",
|
||||
help="SExtractor's PHOT_FLUXFRAC configuration option. " + "Default is 0.7",
|
||||
type=float,
|
||||
default=0.7,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--hist",
|
||||
help="2D histogram parameters (fr_step, mag_step) to be computed from 'FLUX_RADIUS-MAG_AUTO' diagram. Default is '1,1'",
|
||||
default="1,1",
|
||||
type=str,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
"--conv-fwhm",
|
||||
help="SExtractor's gaussian convolution filter FWHM"
|
||||
+ " (in pixels). Default is 5",
|
||||
type=float,
|
||||
default=5.0,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--conv-dim",
|
||||
help="SExtractor's gaussian convolution filter dimensions"
|
||||
+ " (in pixels: xsize,ysize). Default is '9,9'",
|
||||
default="9,9",
|
||||
type=str,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--flag",
|
||||
help="SExtractor's FLAGS parameter value for sutable objects. Default is 0.",
|
||||
type=int,
|
||||
default=0,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--frad-thresh",
|
||||
help="SExtractor's FLUX_RADIUS threshold to be used for selected objects.\
|
||||
Default is use of all objects",
|
||||
type=float,
|
||||
default=-1,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--snr",
|
||||
help="SExtractor's SNR value threshold to be used for selected objects.\
|
||||
Default is use of all objects",
|
||||
type=float,
|
||||
default=-1,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-a",
|
||||
"--area",
|
||||
help="Working area ('xc,yc,radius' or 'xmin,ymin,xmax,ymax'). "
|
||||
+ "The coordinates must be in FITS pixel notation! "
|
||||
+ "Default is use of entire image",
|
||||
type=str,
|
||||
default=None,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--sex-cmd",
|
||||
help="SExtractor additional commandline arguments as a string "
|
||||
+ '(e.g. --sex-cmd "-GAIN 1.1 -SEEING 1.5").',
|
||||
action="append",
|
||||
default=None,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--loss",
|
||||
help="Loss function name (see SciPy 'least_squres' function) for robust fitting. "
|
||||
+ "To use non-robust fitting set the argument to 'linear' or 'none'. Default is 'soft_l1'",
|
||||
default="soft_l1",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-c",
|
||||
"--focussing-curve",
|
||||
help="filename of focussing curve image",
|
||||
type=str,
|
||||
default=None,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-o",
|
||||
"--data-curve",
|
||||
help="filename of focussing curve data file",
|
||||
type=str,
|
||||
default=None,
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def getFocussingSequenceCmdlinePars(parser):
|
||||
"""
|
||||
Add to 'argparse' parser object focus calculation and focussing sequence related commandline arguments
|
||||
"""
|
||||
|
||||
parser = getFocusCalcCmdlinePars(parser)
|
||||
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--repeat",
|
||||
help="Number of acquisitions per focus value. Default is 1.",
|
||||
default=1,
|
||||
type=int,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--exp-time",
|
||||
help="Exposure time in seconds. Default is 10.",
|
||||
type=float,
|
||||
default=10,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--foc-file",
|
||||
help="Rootname of output focussing images. Default is 'focus.fits'.",
|
||||
type=str,
|
||||
default="focus.fits",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--do-not-set",
|
||||
help="Do not set focus to the best value",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def parsFocusCalcCmdlinePars(parser_args, parser_msg=[]):
|
||||
if parser_args.sex_cmd is None:
|
||||
parser_args.sex_cmd = [
|
||||
"-DETECT_MINAREA 5",
|
||||
"-DETECT_THRESH 3.5",
|
||||
"-DEBLEND_MINCONT 0.5",
|
||||
"-PHOT_AUTOPARAMS 2.5,1.0",
|
||||
]
|
||||
|
||||
# convolution filter dimension
|
||||
v = parser_args.conv_dim.split(",")
|
||||
if len(v) < 2:
|
||||
parser_msg.append("Invalid convolution filter dimension! Use of default value!")
|
||||
parser_args.conv_dim = [9, 9]
|
||||
else:
|
||||
vv = [0, 0]
|
||||
for i in range(2):
|
||||
try:
|
||||
vv[i] = int(v[i])
|
||||
except ValueError:
|
||||
parser_msg.append(
|
||||
"Invalid convolution filter dimension! Use of default value!"
|
||||
)
|
||||
vv = [9, 9]
|
||||
break
|
||||
parser_args.conv_dim = vv
|
||||
|
||||
# working area
|
||||
if parser_args.area is not None:
|
||||
v = parser_args.area.split(",")
|
||||
if len(v) < 3:
|
||||
parser_msg.append("Invalid working area! Use of default value!")
|
||||
parser_args.area = None
|
||||
else:
|
||||
vv = []
|
||||
for i, el in enumerate(v):
|
||||
try:
|
||||
vv.append(int(el))
|
||||
except ValueError:
|
||||
parser_msg.append("Invalid working area! Use of default value!")
|
||||
vv = None
|
||||
break
|
||||
|
||||
parser_args.area = vv
|
||||
|
||||
if parser_args.flag < 0:
|
||||
parser_args.flag = 0
|
||||
|
||||
if parser_args.snr <= 0:
|
||||
parser_args.snr = None
|
||||
|
||||
if parser_args.frad_thresh < 0:
|
||||
parser_args.frac_thresh = None
|
||||
|
||||
v = parser_args.hist.split(",")
|
||||
if len(v) < 2:
|
||||
parser_msg.append("Invalid 2D-histogram parameters! Use of default value!")
|
||||
parser_args.hist = [1, 1]
|
||||
else:
|
||||
vv = [0, 0]
|
||||
for i in range(2):
|
||||
try:
|
||||
vv[i] = float(v[i])
|
||||
except ValueError:
|
||||
parser_msg.append(
|
||||
"Invalid 2D-histogram parameters! Use of default value!"
|
||||
)
|
||||
vv = [1, 1]
|
||||
break
|
||||
|
||||
parser_args.hist = vv
|
||||
|
||||
kwds = getFocusCalcPars()
|
||||
|
||||
kwds["focus_kwd"] = parser_args.fkwd
|
||||
kwds["do_filter"] = True
|
||||
kwds["conv_fwhm"] = parser_args.conv_fwhm
|
||||
kwds["conv_dim"] = parser_args.conv_dim
|
||||
kwds["flux_frac"] = parser_args.flux_frac
|
||||
kwds["sex_cmdline_opts"] = parser_args.sex_cmd
|
||||
kwds["filt_flag"] = parser_args.flag
|
||||
kwds["work_area"] = parser_args.area
|
||||
kwds["snr_thresh"] = parser_args.snr
|
||||
kwds["flux_rad_thresh"] = parser_args.frad_thresh
|
||||
kwds["hist_step"] = parser_args.hist
|
||||
kwds["loss_func"] = (
|
||||
parser_args.loss
|
||||
if parser_args.loss != "linear" or parser_args.loss != "none"
|
||||
else None
|
||||
)
|
||||
kwds["focus_curve_filename"] = parser_args.focussing_curve
|
||||
kwds["result_data_filename"] = parser_args.data_curve
|
||||
|
||||
return kwds
|
||||
|
||||
|
||||
def parsFocussingSequenceCmdlinePars(parser_args, parser_msg=[]):
|
||||
""" """
|
||||
|
||||
foc_kwds = parsFocusCalcCmdlinePars(parser_args, parser_msg)
|
||||
|
||||
kwds = getFocusSequencePars() | foc_kwds
|
||||
|
||||
if parser_args.foc_file == "":
|
||||
parser_msg.append("Invalid '--foc_file' argument! Use of defaul value!")
|
||||
|
||||
kwds["root_filename"] = (
|
||||
parser_args.foc_file if parser_args.foc_file != "" else "focus.fits"
|
||||
)
|
||||
|
||||
if parser_args.repeat <= 0:
|
||||
parser_msg.append("Invalid '--repeat' argument! Use of defaul value!")
|
||||
|
||||
if parser_args.exp_time <= 0:
|
||||
parser_msg.append("Invalid '--exp_time' argument! Use of defaul value!")
|
||||
|
||||
kwds["acq_per_value"] = parser_args.repeat if parser_args.repeat > 0 else 1
|
||||
kwds["exp_time"] = parser_args.exp_time if parser_args.exp_time > 0 else 10
|
||||
|
||||
kwds["set_to_best"] = not parser_args.do_not_set
|
||||
|
||||
return kwds
|
||||
Reference in New Issue
Block a user