# -*- encoding: utf-8 -*-
from io import BytesIO
from pathlib import Path
from typing import List, Set
# Compression libraries
import bz2
import gzip
import lzma
import pyzstd
from astropy.time import Time
import csv
import h5py
import numpy as np
from dataclasses import dataclass
from scipy.interpolate import interp1d
from .biases import BiasConfiguration
# Any MJD value smaller than this will be considered invalid. We must
# perform this kind of checks because the electronics has the nasty
# habit of registering a few samples here and there with dates that
# are way back in the past.
EARLIEST_ACCEPTABLE_MJD = Time("2018-01-01").mjd
VALID_GROUPS = ["BIAS", "DAQ"]
VALID_SUBGROUPS = ["POL", "BOARD"]
VALID_DETECTORS = ["Q1", "Q2", "U1", "U2"]
VALID_DATA_TYPES = ["PWR", "DEM"]
HDF5_GZIP_FILE_SUFFIXES = [".gz", ".gzip"]
HDF5_BZIP2_FILE_SUFFIXES = [".bz2", ".bzip2"]
HDF5_ZSTD_FILE_SUFFIXES = [".zst", ".zstd"]
HDF5_XZ_FILE_SUFFIXES = [".xz", ".lzma"]
HDF5_RAW_FILE_SUFFIXES = [".h5", ".hdf5"]
HDF5_FILE_SUFFIXES = (
HDF5_GZIP_FILE_SUFFIXES
+ HDF5_BZIP2_FILE_SUFFIXES
+ HDF5_ZSTD_FILE_SUFFIXES
+ HDF5_XZ_FILE_SUFFIXES
+ HDF5_RAW_FILE_SUFFIXES
)
class HDF5ReadError(Exception):
"""Raised when a HDF5 file does not matches the specifications"""
pass
#: Information about a tag loaded from a HDF5 file
#:
#: Fields are:
#:
#: - ``id``: unique integer number
#:
#: - ``mjd_start``: start time of the tag (MJD)
#:
#: - ``mjd_end``: stop time of the tag (MJD)
#:
#: - ``name``: string containing the name of the tag
#:
#: - ``start_comment``: comment put at the start
#:
#: - ``end_comment``: comment put at the end
[docs]@dataclass
class Tag:
id: int
mjd_start: float
mjd_end: float
name: str
start_comment: str
end_comment: str
def check_group_and_subgroup(group, subgroup):
if not group.upper() in VALID_GROUPS:
valid_choices = ", ".join(['"' + x + '"' for x in VALID_GROUPS])
raise ValueError(f"Group {group.upper()} must be one of {valid_choices}")
if not subgroup.upper() in VALID_SUBGROUPS:
valid_choices = ", ".join(['"' + x + '"' for x in VALID_SUBGROUPS])
raise ValueError(f"Subgroup {subgroup.upper()} must be one of {valid_choices}")
return True
def hk_list_file_name(group, subgroup):
return (
Path(__file__).parent.parent
/ "data"
/ "hk_pars_{}_{}.csv".format(subgroup.upper(), group.upper())
)
[docs]class HkDescriptionList:
"""Result of a call to get_hk_descriptions
This class acts like a dictionary that associates the name of an
housekeeping parameter with a description. It provides a nice
textual representation when printed on the screen::
l = get_hk_descriptions("BIAS", "POL")
# Print the description of one parameter
if "VG4A_SET" in l:
print(l["VG4A_SET"])
# Print all the descriptions in a nicely-formatted table
print(l)
"""
def __init__(self, group, subgroup, hklist):
self.group = subgroup
self.subgroup = group
self.hklist = hklist
def __contains__(self, k):
return self.hklist.__contains__(k)
def __iter__(self, k):
return self.hklist.__iter__(k)
def __len__(self):
return self.hklist.__len__()
def __getitem__(self, key):
return self.hklist.__getitem__(key)
def __str__(self):
result = f"Parameters for {self.group}/{self.subgroup}\n\n"
result += "{:15s}{}\n".format("HK name", "Description")
table_body = ""
linewidth = 0
for key in sorted(self.hklist.keys()):
cur_line = f"{key:15s}{self.hklist[key]}\n"
if len(cur_line) - 1 > linewidth:
linewidth = len(cur_line) - 1
table_body += cur_line
return result + ("-" * linewidth) + "\n" + table_body
def get_group_subgroup(parameter):
"""
Gets the group and subgroup names of a given parameter
Args:
parameter (str): The HK parameter name
Returns:
group, subgroup (str): the strings of the group and subgroup of the parameter
"""
for g in VALID_GROUPS:
for s in VALID_SUBGROUPS:
par_fname = hk_list_file_name(g, s)
with par_fname.open(mode="r") as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
if parameter == row["HK_PAR"]:
return g, s
print("Parameter %s does not exist" % parameter)
return None, None
[docs]def get_hk_descriptions(group, subgroup):
"""Reads the list of housekeeping parameters with their own description.
Args:
group (str): The subgroup. It must either be ``BIAS``
or ``DAQ``.
subgroup (str): The group to load. It can either be ``POL_XY`` or
``BOARD_X``, with `X` being the module letter, and `Y`
the number of the polarimeter.
Returns:
A dictionary containing the association between the name
of the housekeeping parameter and its description.
Examples::
list = get_hk_descriptions("DAQ", "POL_G0")
"""
check_group_and_subgroup(group, subgroup)
par_fname = hk_list_file_name(group, subgroup)
hklist = {}
with par_fname.open(mode="r") as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
hklist[row["HK_PAR"]] = row["Description"]
return HkDescriptionList(group, subgroup, hklist)
def scan_board_names(group_names: List[str]) -> Set[str]:
"""Scan a list of group names and return the set of boards in it.
Example::
>>> group_names(["BOARD_G", "COMMANDS", "LOG", "POL_G0", "POL_G6"])
set("G")
"""
result = set() # type: Set[str]
for curname in group_names:
if (len(curname) == 7) and (curname[0:6] == "BOARD_"):
result.add(curname[6].upper())
return result
def scan_polarimeter_names(group_names: List[str]) -> Set[str]:
"""Scan a list of group names and return the set of polarimeters in it.
Example::
>>> group_names(["BOARD_G", "COMMANDS", "LOG", "POL_G0", "POL_G6"])
set("G0", "G6")
"""
result = set() # type: Set[str]
for curname in group_names:
if (len(curname) == 6) and (curname[0:4] == "POL_"):
result.add(curname[4:6].upper())
return result
def extract_mean_from_time_range(times, values, time_range=None):
"""Calculate a mean value for a timeline
Both "times" and "values" must be lists of values with the same
length. The parameter `time_range` can either be `None` or a
2-element tuple specifying the range of times to consider.
"""
assert len(times) == len(values)
if time_range:
mjd_times = times.mjd
mask = (mjd_times >= time_range[0]) & (mjd_times <= time_range[1])
times = times[mask]
values = values[mask]
if len(values) > 3:
average = np.mean(values)
else:
# Too few samples in the interval, interpolate
# between the two extrema
interpfn = interp1d(times, values, kind="previous")
average = interpfn(time_range)
else:
average = np.mean(values)
return average
def find_first_and_last_samples_in_hdf5(hdf5_file):
"""Search the minimum and maximum MJD time in the HDF5 (slow!)
Return the MJD of the first and last samples recorded in the HDF5
file ``hdf5_file`` (which must have been already opened), as well
as a Boolean telling if the MJD was read from file or computed
by scanning the whole file. (In the latter case, the user might
want to save it back in the file to avoid recomputing it again.)
If the HDF5 contains the ``FIRST_SAMPLE`` and ``LAST_SAMPLE``, the
function returns these values; otherwise, it scans all the datasets
and find the minimum. The latter options is quite slow (it takes
several seconds for file), but it is needed because the data server
currently (2021/11/18) has a bug that prevents valid values from
being written into HDF5 files.
"""
min_mjd = hdf5_file.attrs.get("FIRST_SAMPLE", -1)
if min_mjd < EARLIEST_ACCEPTABLE_MJD:
min_mjd = -1
max_mjd = hdf5_file.attrs.get("LAST_SAMPLE", -1)
if max_mjd < EARLIEST_ACCEPTABLE_MJD:
max_mjd = -1
def find_extrema(name, obj):
nonlocal min_mjd, max_mjd
if isinstance(obj, h5py.Dataset) and len(obj) > 0:
if (obj.dtype.names is not None) and ("m_jd" in obj.dtype.names):
mjd = obj["m_jd"]
# Filter out bad values
mjd = mjd[mjd >= EARLIEST_ACCEPTABLE_MJD]
if len(mjd) == 0:
return
cur_min = mjd[0]
cur_max = mjd[-1]
if (min_mjd < 0) or (cur_min < min_mjd):
min_mjd = cur_min
if (max_mjd < 0) or (cur_max > max_mjd):
max_mjd = cur_max
if (min_mjd < 0) or (max_mjd < 0):
# No meaningful values for FIRST_SAMPLE and LAST_SAMPLE:
# use the slow approach of crawling through all the datasets
# in the file
hdf5_file.visititems(find_extrema)
computed_range = True
else:
computed_range = False
return min_mjd, max_mjd, computed_range
def _open_file(filepath, filemode):
"Open a HDF5 file, applying a decompression step if necessary"
suffix = Path(filepath).suffix
if suffix not in HDF5_FILE_SUFFIXES:
raise HDF5ReadError(f'Unknown file suffix {suffix} for file "{filepath}"')
decompressor_fn = None
if suffix in HDF5_GZIP_FILE_SUFFIXES:
decompressor_fn = gzip.decompress
elif suffix in HDF5_BZIP2_FILE_SUFFIXES:
decompressor_fn = bz2.decompress
elif suffix in HDF5_ZSTD_FILE_SUFFIXES:
decompressor_fn = pyzstd.decompress
elif suffix in HDF5_XZ_FILE_SUFFIXES:
decompressor_fn = lzma.decompress
elif suffix in HDF5_RAW_FILE_SUFFIXES:
# In this case we just call h5py.File with no pre-loading
return h5py.File(filepath, filemode)
assert filemode == "r", "Compressed HDF5 files are read-only"
assert (
decompressor_fn is not None
), f"Unhandled suffix {suffix} in DataFile._open_file"
with open(filepath, "rb") as inpf:
stream = BytesIO(decompressor_fn(inpf.read()))
return h5py.File(stream, filemode)
[docs]class DataFile:
"""A HDF5 file containing timelines acquired by Strip
This is basically a high-level wrapper over a `h5py.File`
object. It assumes that the HDF5 file was saved by the acquisition
software used in Bologna and Tenerife, and it provides some tools
to navigate through the data saved in the file.
Creating a `DataFile` object does not automatically open the file;
this is done to preserve space. The file is lazily opened once you
call one of the methods that need to access file data.
The two methods you are going to use most of the time are:
- :meth:`load_hk`
- :meth:`load_sci`
You can access these class fields directly:
- ``filepath``: a ``Path`` object containing the full path of the
HDF5 file
- ``datetime``: a Python ``datetime`` object containing the time
when the acquisition started
- ``mjd_range``: a pair of ``float`` numbers representing the
MJD of the first and last sample in the file. To initialize
this field, you must call ``DataFile.read_file_metadata``
first.
- ``computed_mjd_range``: a Boolean that is set to ``True`` if
the field ``mjd_range`` was computed through a complete
scanning of the datasets in the file or read from the
file attributes. (In the former case, the user might
want to save the MJD range back in the file and have it
ready for the future, as the scanning of a HDF5 file can
take considerable time.)
- ``hdf5_groups``: a list of ``str`` objects containing the names
of the groups in the HDF5 file. To initialize this field,
you must call ``DataFile.read_file_metadata`` first.
- ``polarimeters``: a Python ``set`` object containing the names
of the polarimeters whose measurements have been saved in
this file. To initialize this field, you must call
``DataFile.read_file_metadata`` first.
- ``hdf5_file``: if the file has been opened using
:meth:`read_file_metadata`, this is the `h5py.File` object.
- ``tags``: a list of Tag objects; you must call
:meth:`read_file_metadata` before reading it.
This class can be used in ``with`` statements; in this case, it will
automatically open and close the file::
with DataFile(myfile) as inpf:
# The variable "inpf" is a DataFile object in this context
"""
def __init__(self, filepath, filemode="r"):
self.filepath = Path(filepath)
self.filemode = filemode
self.mjd_range = None
self.hdf5_groups = []
self.tags = None
self.hdf5_file = None
def __str__(self):
return f'striptease.DataFile("{self.filepath}")'
def __enter__(self):
# Force opening the file and reading the metadata
self.read_file_metadata()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.hdf5_file.close()
del self.hdf5_file
[docs] def load_hk(self, group, subgroup, par, verbose=False):
"""Loads scientific data from one detector of a given polarimeter
Args:
group (str): Either ``BIAS`` or ``DAQ``
subgroup (str): Name of the housekeeping group. It can either
be ``POL_XY`` or ``BOARD_X``, with `X` being the
letter identifying the module, and `Y` the polarimeter
number within the module. Possible examples are
``POL_G0`` and ``BOARD_Y``.
par (str): Name of the housekeeping parameter,
e.g. ``ID4_DIV``.
verbose (bool): whether to echo the HK being loaded. Default is FALSE
Returns:
A tuple containing two NumPy arrays: the stream of times
(using the astropy.time.Time datatype), and the stream of
data.
Example::
from striptease.hdf5files import DataFile
f = DataFile(filename)
time, data = f.load_hk("POL_Y6", "BIAS", "VG4A_SET")
"""
if not self.hdf5_groups:
self.read_file_metadata()
if verbose:
print(f"{group.upper()}, {subgroup.upper()}, {par.upper()}")
datahk = self.hdf5_file[subgroup.upper()][group.upper()][par.upper()]
hk_time = Time(datahk["m_jd"], format="mjd")
hk_data = datahk["value"]
return hk_time, hk_data
[docs] def load_sci(self, polarimeter, data_type, detector=[]):
"""Loads scientific data from one detector of a given polarimeter
Args:
polarimeter (str): Name of the polarimeter, in the form
``POL_XY`` or ``XY`` for short, with `X` being the
module letter and `Y` the polarimeter number within
the module.
data_type (str): Type of data to load, either ``DEM`` or
``PWR``.
detector (str): Either ``Q1``, ``Q2``, ``U1`` or ``U2``.
You can also pass a list, e.g., ``["Q1", "Q2"]``. If
no value is provided for this parameter, all the four
detectors will be returned.
Returns:
A tuple containing two NumPy arrays: the stream of times
(using the astropy.time.Time datatype), and the stream of
data. For multiple detectors, the latter will be a list
of tuples, where each column is named either ``DEMnn`` or
``PWRnn``, where ``nn`` is the name of the detector.
Examples::
from striptease.hdf5files import DataFile
import numpy as np
f = DataFile(filename)
# Load the output of only one detector
time, data = my_data.load_sci("POL_G0", "DEM", "Q1")
print(f"Q1 mean output: {np.mean(data)}")
# Load the output of several detectors at once
time, data = my_data.load_sci("POL_G0", "DEM", ("Q1", "Q2"))
print(f"Q1 mean output: {np.mean(data['DEMQ1'])}")
# Load the output of all the four detectors
time, data = my_data.load_sci("POL_G0", "DEM")
print(f"Q1 mean output: {np.mean(data['DEMQ1'])}")
"""
if not self.hdf5_groups:
self.read_file_metadata()
if len(polarimeter) == 2:
polarimeter = "POL_" + polarimeter.upper()
if not data_type.upper() in VALID_DATA_TYPES:
raise ValueError(f"Invalid data type {data_type}")
data_type = data_type.upper()
scidata = self.hdf5_file[polarimeter]["pol_data"]
scitime = Time(scidata["m_jd"], format="mjd")
if isinstance(detector, str):
if not detector.upper() in VALID_DETECTORS:
raise ValueError(f"Invalid detector {detector}")
detector = detector.upper()
column_selector = f"{data_type}{detector}"
else:
if not detector:
detector = ["Q1", "Q2", "U1", "U2"]
column_selector = tuple([f"{data_type}{x}" for x in detector])
return scitime, scidata[column_selector]
[docs] def get_average_biases(
self, polarimeter, time_range=None, calibration_tables=None
) -> BiasConfiguration:
"""Return a :class:`BiasConfiguration` object containing the average
values of biases for a polarimeter.
The parameter `polarimeter` must be a string containing the
name of the polarimeter, e.g., ``Y0``. The parameter
`time_range`, if specified, is a 2-element tuple containing
the start and end MJDs to consider in the average. If
`calibration_tables` is specified, it must be an instance of
the :class:`.CalibrationTables` class.
The return value of this function is a :class:`BiasConfiguration` object
If `calibration_tables` is specified, the values returned by
this method are calibrated to physical units; otherwise, they
are expressed in ADUs.
"""
result = {}
hk_name_to_parameter = {"VPIN": "vphsw", "IPIN": "iphsw"}
for param_name in hk_name_to_parameter.keys():
for phsw_pin in (0, 1, 2, 3):
times, values = self.load_hk(
group="BIAS",
subgroup=f"POL_{polarimeter}",
par=f"{param_name}{phsw_pin}_HK",
)
average = extract_mean_from_time_range(times, values, time_range)
if calibration_tables:
average = calibration_tables.adu_to_physical_units(
polarimeter=polarimeter,
hk=hk_name_to_parameter[param_name],
component=phsw_pin,
value=average,
)
x = f"{param_name}{phsw_pin}".lower()
result[x] = average
parameter_to_hk_name = {"vgate": "vg", "vdrain": "vd", "idrain": "id"}
for parameter in parameter_to_hk_name.keys():
for amplifier in ["0", "1", "2", "3", "4", "4A", "5", "5A"]:
try:
times, values = self.load_hk(
group="BIAS",
subgroup=f"POL_{polarimeter}",
par=f"{parameter_to_hk_name[parameter]}{amplifier}_HK",
)
except KeyError:
# This usually happens with names like "VD4A_HK";
# we simply ignore them
continue
average = extract_mean_from_time_range(times, values, time_range)
if calibration_tables:
average = calibration_tables.adu_to_physical_units(
polarimeter=polarimeter,
hk=parameter,
component=f"H{amplifier}",
value=average,
)
x = f"{parameter_to_hk_name[parameter]}{amplifier}".lower()
result[x] = average
return BiasConfiguration(**result)