Source code for calibration

# calibration/__init__.py --- class handling the conversion from ADU to phisical units and viceversa
#
# Copyright (C) 2018 Stefano Sartor - stefano.sartor@inaf.it

from collections import namedtuple
import logging as log
from pathlib import Path

import pandas as pd
import numpy as np

from config import Config
from striptease import (
    StripConnection,
    normalize_polarimeter_name,
    get_lna_num,
)

__all__ = [
    "CalibrationCurve",
    "physical_units_to_adu",
    "adu_to_physical_units",
    "CalibrationTables",
]

#: Linear calibration curve (from physical units to ADU) for an
#: housekeeping parameter.
#:
#: .. py:attribute:: slope
#:
#:    Floating-point value for the slope of the calibration curve
#:
#: .. py:attribute:: intercept
#:
#:    Floating-point value for the additive offset of the calibration curve
#:
#: .. py:attribute:: mul
#:
#:    Integer numerator of the slope of the calibration curve
#:
#: .. py:attribute:: div
#:
#:    Integer denominator of the slope of the calibration curve
#:
#: .. py:attribute:: add
#:
#:    Integer value for the additive offset of the calibration curve
#:
CalibrationCurve = namedtuple(
    "CalibrationCurve", ["slope", "intercept", "mul", "div", "add"]
)


def physical_units_to_adu(
    physical_value, calibration_curve: CalibrationCurve, step=1.0
):
    """Convert physical units to an ADU value, using a linear interpolation

    See also :meth:`.adu_to_physical_units`.

    Args:

        physical_value (float): the value to be converted; the measurement units
            depend on the kind of calibration curve used

        calibration_curve (CalibrationCurve): the calibration curve to use for
            the conversion from physical units to ADUs

        step (float): if provided, the physical value will be multiplied by
            this factor before the conversion. The default value is 1.0.

    Return:

        The physical value converted to ADU units, expressed as a
        positive integer number.

    """

    adu = physical_value * step * calibration_curve.slope + calibration_curve.intercept
    if adu < 0:
        adu = 0
    return int(adu + 0.5)


def adu_to_physical_units(adu_value, calibration_curve: CalibrationCurve):
    """Convert physical units to an ADU value, using a linear interpolation

    See also :meth:`.physical_units_to_adu`.

    Args:

        adu_value (int): the value to be converted

        calibration_curve (CalibrationCurve): the calibration curve to use for
            the conversion from physical units to ADUs

    Return:

        The physical value corresponding to the ADUs in `adu_value`.
        The measurement unit used in the result depends on the kind of
        calibration curve provided in `calibration_curve`.

    """
    return (adu_value - calibration_curve.intercept) / calibration_curve.slope


def read_board_xlsx(path):
    log.debug(f"Reading Excel file {path}")
    board = []
    excel_file_data = pd.read_excel(path, header=None, sheet_name=None)
    for cur_sheet in excel_file_data:
        cur_sheet_dict = {}
        pol = excel_file_data[cur_sheet].transpose()
        line_count = 0
        current_item = np.nan
        current_fit = np.nan
        for r in pol:
            row = pol[r]
            if line_count <= 1:
                line_count += 1
                continue
            elif type(row[0]) == str and row[0].strip() == "ITEM":
                line_count += 1
                continue
            else:
                if type(row[0]) == str:
                    current_item = row[0].replace("\n", " ")
                if type(row[1]) == str:
                    current_fit = row[1].replace("\n", " ")
                if cur_sheet_dict.get(current_item) is None:
                    cur_sheet_dict[current_item] = {}
                if cur_sheet_dict[current_item].get(current_fit) is None:
                    cur_sheet_dict[current_item][current_fit] = {}
                cur_sheet_dict[current_item][current_fit][row[2]] = CalibrationCurve(
                    slope=float(row[3]),
                    intercept=float(row[4]),
                    mul=int(row[5]),
                    div=int(row[6]),
                    add=int(row[7]),
                )
            line_count += 1
        board.append(cur_sheet_dict)
    return board


def pol_name_to_dict_key(name: str):
    board_name = name[0]
    idx = int(name[1])

    return (board_name, idx)


[docs]class CalibrationTables(object): """Calibration tables used to convert HK ADUs into physical units and back This class loads the calibration tables for housekeeping parameters from a set of Excel files. You can pass a :class:`striptease.Config` class to this class. Not doing so is equivalent to the following code:: from calibration import CalibrationTables from config import Config from striptease import StripConnection with StripConnection() as conn: conf = Config() conf.load(conn) cal_tables = CalibrationTables(conf) """ def __init__(self, config=None): if not config: with StripConnection() as conn: config = Config() config.load(conn) self.conf = config # Each of these dictionaries associates a key in the form # (BOARD, POL_IDX) (like "R", 0) with a list of # CalibrationCurve objects self.calibration_curves = { "vdrain": {}, "idrain": {}, "vgate": {}, "vphsw": {}, "iphsw": {}, } base_path = Path(__file__).parent.parent / "data" for cur_board in self.conf.boards: board_name = cur_board["name"] board_id = cur_board["id"] filename = base_path / self.conf.get_board_bias_file(board_id) try: board_conf = read_board_xlsx(filename) except Exception as exc: log.warning( f"No suitable bias file for board {board_name} found: {exc}" ) continue for polidx, pol in enumerate(board_conf): key = f"{board_name.upper()}{polidx}" self.calibration_curves["vdrain"][key] = pol["DRAIN"]["SET VOLTAGE"] self.calibration_curves["idrain"][key] = pol["DRAIN"]["SET CURRENT"] self.calibration_curves["vgate"][key] = pol["GATE"]["SET VOLTAGE"] self.calibration_curves["vphsw"][key] = pol["PIN DIODES"]["SET VOLTAGE"] self.calibration_curves["iphsw"][key] = pol["PIN DIODES"]["SET CURRENT"]
[docs] def get_calibration_curve(self, polarimeter, hk, component): """Return a :class:`CalibrationCurve` object for an housekeeping parameter Args: polarimeter (str): the name of the polarimeter, e.g., `I4` or `W3` hk (str): one of the following strings: - ``vdrain``: drain voltage - ``idrain``: drain current - ``vgate``: gate voltage - ``vphsw``: voltage pin for a phase switch - ``iphsw``: current pin for a phase switch component (str): name of the component within the polarimeter. For LNAs, you can use whatever string works with :meth:`striptease.get_lna_num`. For phase switches, you must pass an integer number in the range 0…3. Return: A :class:`.CalibrationCurve` object. """ hk_key = hk.lower() polarimeter_id = normalize_polarimeter_name(polarimeter) if hk_key in ("vdrain", "idrain", "vgate"): component_id = get_lna_num(component) else: component_id = component return self.calibration_curves[hk_key][polarimeter_id][component_id]
[docs] def adu_to_physical_units(self, polarimeter, hk, component, value): """Convert ADUs into physical units for an housekeeping parameter. The meaning of the parameters `polarimeter`, `hk`, and `component` is the same as in :meth:`.get_calibration_curve`. """ calibration_curve = self.get_calibration_curve(polarimeter, hk, component) return adu_to_physical_units(value, calibration_curve=calibration_curve)
[docs] def physical_units_to_adu(self, polarimeter, hk, component, value): """Convert physical units into ADUs for an housekeeping parameter. The meaning of the parameters `polarimeter`, `hk`, and `component` is the same as in :meth:`.get_calibration_curve`. """ calibration_curve = self.get_calibration_curve(polarimeter, hk, component) return physical_units_to_adu(value, calibration_curve=calibration_curve)