import astromatic as amt import argparse as ap from astropy.io import fits from concurrent.futures import ProcessPoolExecutor from scipy.optimize import least_squares import numpy as np import matplotlib.pyplot as plt import pathlib as pt import os # # Observation process utilities # # compute residuals from polynomial model and data # model = poly(t,x), x - independent variable and t - polynomial coefficients # res = model - y def polyModelResid(poly_coeffs, x, y): """ Compute residuals from polynomial model and data model = poly(x, poly_coeffs), where poly_coeff = [c0, c1, c2] and model = c0 + c1*x + c2*x*x res = model - y """ xx = np.asarray(x) f = poly_coeffs[0] + poly_coeffs[1] * xx + poly_coeffs[2] * xx * xx return f - y def getFocusCalcPars(**user_kwds): """ """ foc_kwds = { "focus_kwd": "FOCUS", # FITS keyword name with focus value "do_filter": True, # SExtractor's FILTER configuration option "conv_fwhm": 5, # gaussian convolution filter FWHM (in pixels) "conv_dim": [9, 9], # dimension of gaussian convolution filter (in pixels) "flux_frac": 0.7, # SExtractor's PHOT_FLUXFRAC configuration option # SExtractor's commandline options "sex_cmdline_opts": [ "-DETECT_MINAREA 5", "-DETECT_THRESH 3.5", "-DEBLEND_MINCONT 0.5", "-PHOT_AUTOPARAMS 2.5,1.0", ], "filt_flag": 0, # SExtractor's FLAGS parameter value for sutable objects "work_area": None, # Working area ('xc,yc,radius' or 'xmin,ymin,xmax,ymax') # (if the value is None - use of entire image area) "snr_thresh": None, # SExtractor's SNR value threshold to be used for selected objects # (if SNR is None - use of all objects) "flux_rad_thresh": None, # SExtractor's FLUX_RADIUS threshold to be used for selected objects, # (if the value is None - use of all objects) # 2D histogram parameters (fr_step, mag_step) to be computed from 'FLUX_RADIUS-MAG_AUTO' diagram "hist_step": [1, 1], # loss function name (see scipy.optimize.least_squares). if None then use of non-robust fitting "loss_func": "soft_l1", # Result focussing curve image filename (see Matplotlib reference to supported graphics formats) "focus_curve_filename": None, "result_data_filename": None, # Result focussing curve values filename # Saving options for result focussing curve (see NumPy savetxt function) "result_data_save_opts": { "fmt": "%g", "header": " Focussing curve:\n [focus value] [flux radius]", "comments": "#", }, } # user values for key, value in user_kwds.items(): if key in foc_kwds.keys(): foc_kwds[key] = value return foc_kwds def getFocusSequencePars(**user_kwds): """ """ seq_kwds = { "focus_start": None, "focus_stop": None, "focus_step": None, "root_filename": "focus.fits", "start_idx": 1, # start index of files in sequence "acq_per_value": 1, # number of acquisitions per focus value "exp_time": 10, # exposure time in seconds "set_to_best": True, # set focus to the best value } | getFocusCalcPars() # user values for key, value in user_kwds.items(): if key in seq_kwds.keys(): seq_kwds[key] = value return seq_kwds def fitFocusCurve( focus_value, flux_rad, loss_func="soft_l1", result_data_filename=None, focus_curve_filename=None, result_data_save_opts={"fmt": "%g", "header": "Focussing curve", "comments": "#"}, **unused ): # non-robust fit to estimate "errors" nr_res = np.polynomial.polynomial.Polynomial.fit( focus_value, flux_rad, 2 ) # parabola coeffs = nr_res.convert().coef if loss_func is not None: bounds = ( [-np.inf, -np.inf, 0.0], np.inf, ) # parabola with minimum (2nd derivative must be positive) if coeffs[2] < 0: # it seems the non-robust fit failed! coeffs[2] = 0 # so, guess curve is just constant! coeffs[1] = 0 coeffs[0] = np.nanmedian(flux_rad) scale = np.nanmedian(np.abs(flux_rad - coeffs[0])) else: nr_model = nr_res(np.asarray(focus_value)) scale = np.nanmedian(np.abs(flux_rad - nr_model)) # if coeffs[2] < 0: # coeffs[2] = 0 # nr_model = nr_res(np.asarray(focus_value)) # scale = np.abs(np.nanmedian(flux_rad - nr_model)) # robust fitting res = least_squares( polyModelResid, coeffs, bounds=bounds, loss=loss_func, f_scale=scale, args=(focus_value, flux_rad), ) coeffs = res.x best_foc = -coeffs[1] / coeffs[2] / 2.0 else: best_foc = -coeffs[1] / coeffs[2] / 2.0 if result_data_filename is not None: np.savetxt( result_data_filename, np.transpose((focus_value, flux_rad)), **result_data_save_opts ) if focus_curve_filename is not None: focus_start = np.min(focus_value) focus_stop = np.max(focus_value) x = np.linspace(focus_start, focus_stop, 100) y = coeffs[0] + coeffs[1] * x + coeffs[2] * x * x fig = plt.figure() plt.subplot(111, xlabel="Focus value", ylabel="Flux fraction radius") plt.title(label="Focusing curve: best focus {:g}".format(best_foc)) plt.plot(x, y, "r-") plt.plot(focus_value, flux_rad, "b*") fig.savefig(focus_curve_filename) return (best_foc, coeffs, loss_func is not None) def computeFocus( image_filenames, max_wks=None, log_output=None, log_ident="\t", **foc_cal_kwds ): """ Compute focussing curve values from a sequence of files. Input parameters: image_filenames: iterable of FITS filenames max_wks: max workers to be used to compute best focus value log_ouput: file-like (stream) object fo log output. if None - no log log_ident: ident string (symbols before output log messages) Returns: {"focus_value": best focus value (float or None) "coeffs": coefficients of best fit parabola (np.array or None) "is_robust": is robust fit method was used (boolean) "ret_code": return code } "ret_code": 0: OK -1: at least 3 files must be given -2: cannot perform calculations (see log_output) """ ret_val = { "focus_value": None, "coeffs": None, "is_robust": False, "ret_code": 0, } if len(image_filenames) < 3: ret_val["ret_code"] = -1 return ret_val f_kwds = getFocusCalcPars(**foc_cal_kwds) focus_value = [] futures = [] flux_rad = [] if log_output is None: log_output = open(os.devnull, "w") executor = ProcessPoolExecutor(max_workers=max_wks) for fname in image_filenames: print(log_ident, end="") print("read focus value from {} ...\t".format(fname), file=log_output, end="") try: hdr = fits.getheader(fname) # focus_value.append(hdr[f_kwds["focus_kwd"]]) focus_value.append(float(hdr[f_kwds["focus_kwd"]])) except FileNotFoundError: print("FAIL!", file=log_output) print(log_ident, end="") print("WARNING: cannot find file {}! Skip!".format(fname), file=log_output) continue except NameError: print("FAIL!", file=log_output) print(log_ident, end="") print( "WARNING: cannot find FITS keyword {} in file {}! Skip!".format( f_kwds["focus_kwd"], fname ), file=log_output, ) continue except ValueError: print("FAIL!", file=log_output) print(log_ident, end="") print( "WARNING: cannot convert FITS keyword '{}' value '{}' to number! Skip!".format( f_kwds["focus_kwd"], hdr[f_kwds["focus_kwd"]] ), file=log_output, ) continue print("OK", file=log_output) futures.append(executor.submit(amt.compute_mean_flux_radius, fname, **f_kwds)) if len(futures): print("", file=log_output) print(log_ident, file=log_output, end="") print( "waiting for the end of calculations ...\t", file=log_output, end="", flush=True, ) for i in range(len(futures)): flux_rad.append(futures[i].result()) print("OK\n", file=log_output) else: print(log_ident, end="") print("ERROR: It seems there were no valid FITS files! Stop!", file=log_output) ret_val["ret_code"] = -2 return ret_val fit_res = fitFocusCurve(focus_value, flux_rad, **f_kwds) ret_val["focus_value"] = fit_res[0] ret_val["coeffs"] = fit_res[1] ret_val["is_robust"] = fit_res[2] print(log_ident, end="") print("best focus value is {:g}".format(ret_val["focus_value"]), file=log_output) return ret_val def focussingSequence( focus_range, init_seq_func, set_focus_func, get_image_func, max_wks=None, log_output=None, log_ident="\t", **kwds ): """ Focussing sequence focus_range: [start, stop] (7 points) or [start, stop, step] init_seq_func: a function with signature: init_seq_func(seq_kwds), where seq_kwds - sequence control dictionary (see getFocusSequencePars function) set_focus_func: a function of signature: set_focus_func(foc_val) get_image_func: a function of signature: get_image_func(image_filename, exp_time) exp_time in seconds log_ouput: file-like (stream) object fo log output. if None - no log log_ident: ident string (symbols before output log messages) Returns: {"focus_value": best focus value (float or None) "coeffs": coefficients of best fit parabola (np.array or None) "is_robust": is robust fit method was used (boolean) "ret_code": return code } "ret_code": 0: OK -1: invalid focus range -2: cannot perform calculations (see log_output) """ ret_val = { "focus_value": None, "coeffs": None, "is_robust": False, "ret_code": 0, } if log_output is None: log_output = open(os.devnull, "w") if len(focus_range) < 2: ret_val["ret_code"] = -1 return ret_val N = 5 if len(focus_range) > 2: if np.isclose(focus_range[2], 0): print(log_ident, file=log_output, end="") print("Invalid (zero) focus step value!", file=log_output) ret_val["ret_code"] = -1 return ret_val if np.isclose(focus_range[0], focus_range[1]): print(log_ident, file=log_output, end="") print("Invalid focus range parameters!", file=log_output) ret_val["ret_code"] = -1 return ret_val if focus_range[0] < focus_range[1]: if focus_range[2] < 0: print(log_ident, file=log_output, end="") print("Invalid focus range parameters!", file=log_output) ret_val["ret_code"] = -1 return ret_val else: if focus_range[2] > 0: print(log_ident, file=log_output, end="") print("Invalid focus range parameters!", file=log_output) ret_val["ret_code"] = -1 return ret_val N = int((focus_range[1] - focus_range[0]) / focus_range[2] + 1) if N < 3: ret_val["ret_code"] = -1 print(log_ident, file=log_output, end="") print("Number of focus values must be at greater than 2!", file=log_output) return ret_val if focus_range[0] < focus_range[1]: focus_value = np.linspace(focus_range[0], focus_range[1], N) else: focus_value = np.flip(np.linspace(focus_range[1], focus_range[0], N)) seq_kwds = getFocusSequencePars(**kwds) init_seq_func(seq_kwds) if seq_kwds["acq_per_value"] > 1: focus_value = ( np.array([focus_value] * seq_kwds["acq_per_value"]).transpose().flatten() ) rname = pt.Path(seq_kwds["root_filename"]).absolute() # dir = rname.parent # fname = rname.stem # file_ext = rname.suffix # if file_ext == "": # file_ext = ".fit" futures = [] flux_rad = [] executor = ProcessPoolExecutor(max_workers=max_wks) i_foc = seq_kwds["start_idx"] ret_code = 0 for foc_val in focus_value: print(log_ident, file=log_output, end="") print( "set focus value to {:g} ...\t".format(foc_val), file=log_output, end="", flush=True, ) ret_code = set_focus_func(foc_val) if ret_code: ret_val["ret_code"] = ret_code print("FAIL!", file=log_output, flush=True) break else: print("OK", file=log_output, flush=True) # filename = "{}_{:04d}{}".format(fname, i_foc, file_ext) # full_filename = str(pt.Path.joinpath(dir, pt.Path(filename))) full_filename = str(rname.with_stem("{}_{:04d}".format(rname.stem, i_foc))) print(log_ident, file=log_output, end="") print( "get image '{}' ...\t".format(full_filename), file=log_output, end="", flush=True, ) ret_code = get_image_func(full_filename, seq_kwds["exp_time"]) if ret_code: ret_val["ret_code"] = ret_code print("FAIL!", file=log_output, flush=True) break else: print("OK", file=log_output, flush=True) futures.append( executor.submit(amt.compute_mean_flux_radius, full_filename, **seq_kwds) ) i_foc += 1 print("", file=log_output, flush=True) print(log_ident, file=log_output, end="", flush=True) print( "waiting for the end of calculations ...\t", file=log_output, end="", flush=True ) for i in range(len(futures)): flux_rad.append(futures[i].result()) print("OK", file=log_output, flush=True) if ret_code == 0: fit_res = fitFocusCurve(focus_value, flux_rad, **seq_kwds) ret_val["focus_value"] = fit_res[0] ret_val["coeffs"] = fit_res[1] ret_val["is_robust"] = fit_res[2] else: print( "THERE WERE ERRORS DURING FOCUSSING SEQUENCE SO NO BEST FOCUS IS AVAILABLE!!!", file=log_output, end="", flush=True, ) return ret_val # ------ commandline helper functions ----- def getFocusCalcCmdlinePars(parser): """ Add to 'argparse' parser object focus calculation related commandline arguments """ parser.add_argument( "--fkwd", help="FITS keyword name with focus value. Default is 'FOCUS'", default="FOCUS", type=str, ) parser.add_argument( "-p", "--flux-frac", help="SExtractor's PHOT_FLUXFRAC configuration option. " + "Default is 0.7", type=float, default=0.7, ) parser.add_argument( "--hist", help="2D histogram parameters (fr_step, mag_step) to be computed from 'FLUX_RADIUS-MAG_AUTO' diagram. Default is '1,1'", default="1,1", type=str, ) parser.add_argument( "-f", "--conv-fwhm", help="SExtractor's gaussian convolution filter FWHM" + " (in pixels). Default is 5", type=float, default=5.0, ) parser.add_argument( "-d", "--conv-dim", help="SExtractor's gaussian convolution filter dimensions" + " (in pixels: xsize,ysize). Default is '9,9'", default="9,9", type=str, ) parser.add_argument( "--flag", help="SExtractor's FLAGS parameter value for sutable objects. Default is 0.", type=int, default=0, ) parser.add_argument( "--frad-thresh", help="SExtractor's FLUX_RADIUS threshold to be used for selected objects.\ Default is use of all objects", type=float, default=-1, ) parser.add_argument( "-s", "--snr", help="SExtractor's SNR value threshold to be used for selected objects.\ Default is use of all objects", type=float, default=-1, ) parser.add_argument( "-a", "--area", help="Working area ('xc,yc,radius' or 'xmin,ymin,xmax,ymax'). " + "The coordinates must be in FITS pixel notation! " + "Default is use of entire image", type=str, default=None, ) parser.add_argument( "--sex-cmd", help="SExtractor additional commandline arguments as a string " + '(e.g. --sex-cmd "-GAIN 1.1 -SEEING 1.5").', action="append", default=None, ) parser.add_argument( "--loss", help="Loss function name (see SciPy 'least_squres' function) for robust fitting. " + "To use non-robust fitting set the argument to 'linear' or 'none'. Default is 'soft_l1'", default="soft_l1", ) parser.add_argument( "-c", "--focussing-curve", help="filename of focussing curve image", type=str, default=None, ) parser.add_argument( "-o", "--data-curve", help="filename of focussing curve ASCII data file", type=str, default=None, ) return parser def getFocussingSequenceCmdlinePars(parser): """ Add to 'argparse' parser object focus calculation and focussing sequence related commandline arguments """ parser = getFocusCalcCmdlinePars(parser) parser.add_argument( "-r", "--repeat", help="Number of acquisitions per focus value. Default is 1.", default=1, type=int, ) parser.add_argument( "-t", "--exp-time", help="Exposure time in seconds. Default is 10.", type=float, default=10, ) parser.add_argument( "--foc-file", help="Rootname of output focussing images. Default is 'focus.fits'. The algorithm generates files of sequence as " "'rootname_0001.rootname_ext, rootname_0002.rootname_ext ... rootname_NNNN.rootname_ext', where NNNN is an index number of acquisition.", type=str, default="focus.fits", ) parser.add_argument( "--do-not-set", help="Do not set focus to the best value", action="store_true", default=False, ) return parser def parsFocusCalcCmdlinePars(parser_args, parser_msg=[]): if parser_args.sex_cmd is None: parser_args.sex_cmd = [ "-DETECT_MINAREA 5", "-DETECT_THRESH 3.5", "-DEBLEND_MINCONT 0.5", "-PHOT_AUTOPARAMS 2.5,1.0", ] # convolution filter dimension v = parser_args.conv_dim.split(",") if len(v) < 2: parser_msg.append("Invalid convolution filter dimension! Use of default value!") parser_args.conv_dim = [9, 9] else: vv = [0, 0] for i in range(2): try: vv[i] = int(v[i]) except ValueError: parser_msg.append( "Invalid convolution filter dimension! Use of default value!" ) vv = [9, 9] break parser_args.conv_dim = vv # working area if parser_args.area is not None: v = parser_args.area.split(",") if len(v) < 3: parser_msg.append("Invalid working area! Use of default value!") parser_args.area = None else: vv = [] for i, el in enumerate(v): try: vv.append(int(el)) except ValueError: parser_msg.append("Invalid working area! Use of default value!") vv = None break parser_args.area = vv if parser_args.flag < 0: parser_args.flag = 0 if parser_args.snr <= 0: parser_args.snr = None if parser_args.frad_thresh < 0: parser_args.frac_thresh = None v = parser_args.hist.split(",") if len(v) < 2: parser_msg.append("Invalid 2D-histogram parameters! Use of default value!") parser_args.hist = [1, 1] else: vv = [0, 0] for i in range(2): try: vv[i] = float(v[i]) except ValueError: parser_msg.append( "Invalid 2D-histogram parameters! Use of default value!" ) vv = [1, 1] break parser_args.hist = vv kwds = getFocusCalcPars() kwds["focus_kwd"] = parser_args.fkwd kwds["do_filter"] = True kwds["conv_fwhm"] = parser_args.conv_fwhm kwds["conv_dim"] = parser_args.conv_dim kwds["flux_frac"] = parser_args.flux_frac kwds["sex_cmdline_opts"] = parser_args.sex_cmd kwds["filt_flag"] = parser_args.flag kwds["work_area"] = parser_args.area kwds["snr_thresh"] = parser_args.snr kwds["flux_rad_thresh"] = parser_args.frad_thresh kwds["hist_step"] = parser_args.hist kwds["loss_func"] = ( parser_args.loss if parser_args.loss != "linear" or parser_args.loss != "none" else None ) kwds["focus_curve_filename"] = parser_args.focussing_curve kwds["result_data_filename"] = parser_args.data_curve return kwds def parsFocussingSequenceCmdlinePars(parser_args, parser_msg=[]): """ """ foc_kwds = parsFocusCalcCmdlinePars(parser_args, parser_msg) kwds = getFocusSequencePars() | foc_kwds if parser_args.foc_file == "": parser_msg.append("Invalid '--foc_file' argument! Use of defaul value!") kwds["root_filename"] = ( parser_args.foc_file if parser_args.foc_file != "" else "focus.fits" ) if parser_args.repeat <= 0: parser_msg.append("Invalid '--repeat' argument! Use of defaul value!") if parser_args.exp_time <= 0: parser_msg.append("Invalid '--exp_time' argument! Use of defaul value!") kwds["acq_per_value"] = parser_args.repeat if parser_args.repeat > 0 else 1 kwds["exp_time"] = parser_args.exp_time if parser_args.exp_time > 0 else 10 kwds["set_to_best"] = not parser_args.do_not_set return kwds