Source code for hera_pspec.container

import numpy as np
import h5py
import argparse
import time
from functools import wraps
import warnings

from . import uvpspec, __version__, utils


def transactional(fn):
    """
    Handle 'transactional' operations on PSpecContainer, where the HDF5 file is 
    opened and then closed again for every operation. This is done when 
    keep_open = False.

    For calling a @transactional function within another @transactional function,
    feed the kwarg nested=True in the nested function call, and it will not close
    the container upon exit even if keep_open = False, until the outer-most
    @transactional function exits, in which case it will close the container
    if keep_open = False.
    """
    @wraps(fn)
    def wrapper(*args, **kwargs):
        psc = args[0] # self object
        
        # Open HDF5 file if needed
        if psc.data is None:
            psc._open()

        # if passed 'nested' kwarg get it
        nested = kwargs.pop('nested', False)

        # Run function
        try:
            f = fn(*args, **kwargs)
        except Exception as err:
            # Close file before raising error
            if not psc.keep_open:
                psc._close()
            raise err

        # Close HDF5 file if necessary
        if not psc.keep_open and not nested:
            psc._close()
        
        # Return function result
        return f
    
    return wrapper


[docs]class PSpecContainer(object): """ Container class for managing multiple UVPSpec objects. """ def __init__(self, filename, mode='r', keep_open=True, swmr=False, tsleep=0.1, maxiter=1): """ Manage a collection of UVPSpec objects that are stored in a structured HDF5 file. Note: one should not create new groups or datasets with SWMR. See page 6 of https://support.hdfgroup.org/HDF5/docNewFeatures/SWMR/HDF5_SWMR_Users_Guide.pdf for SWMR limitations. Parameters ---------- filename : str Path to HDF5 file to store power spectra in. mode : str Whether to load the HDF5 file as read/write ('rw') or read-only ('r'). If 'rw' is specified and the file doesn't exist, an empty one will be created. keep_open : bool, optional Whether the HDF5 file should be kept open, or opened and then closed again each time an operation is performed. Setting `keep_open=False` is helpful for multi-process access patterns. Default: True (keep file open). swmr : bool, optional Enable Single-Writer Multiple-Reader (SWMR) feature of HDF5. Note that SWMR can only be used on POSIX-compliant filesystems, and so may not work on some network filesystems. Default: False (do not use SWMR) tsleep : float, optional Time to wait in seconds after each attempt at opening the file. maxiter : int, optional Maximum number of attempts to open file (useful for concurrent access when file may be locked temporarily by other processes). """ self.filename = filename self.keep_open = keep_open self.mode = mode self.tsleep = tsleep self.maxiter = maxiter self.swmr = swmr if mode not in ['r', 'rw']: raise ValueError("Must set mode to either 'r' or 'rw'.") # Open file ready for reading and/or writing (if not in transactional mode) self.data = None if keep_open: self._open() def _open(self): """ Open HDF5 file ready for reading/writing. Does nothing if the file is already open. This method uses HDF5's single writer, multiple reader (swmr) mode, which allows multiple handles to exist for the same file at the same time, as long as only one is in rw mode. The rw instance should be the *first* one that is created; if a read-only instance is already open when a rw instance is created, an error will be raised by h5py. """ if self.data is not None: return # Convert user-specified mode to a mode that HDF5 recognizes. We only # allow non-destructive operations! if self.mode == 'rw': mode = 'a' else: mode = 'r' if self.swmr and self.mode == 'r': swmr = True else: swmr = False # check HDF5 version if swmr if swmr: hdf5_v = float('.'.join(h5py.version.hdf5_version.split('.')[:2])) if hdf5_v < 1.1: raise NotImplementedError("HDF5 version is {}: must " "be >= 1.10 for SWMR".format(hdf5_v)) # Try to open the file Ncount = 0 while True: try: self.data = h5py.File(self.filename, mode, libver='latest', swmr=swmr) if self.mode == 'rw': try: # Enable single writer, multiple reader mode on HDF5 file. # This allows multiple handles to exist for the same file # at the same time, as long as only one is in rw mode if self.swmr: self.data.swmr_mode = True except ValueError: pass break except (IOError, OSError): # raise Exception if exceeded maxiter if Ncount >= self.maxiter: if self.mode == 'rw': raise OSError( "Failed to open HDF5 file. Another process may " "be holding it open; use \nkeep_open=False to " "help prevent this from happening (single " "process), or use the\nlock kwarg (multiple " "processes).") else: raise # sleep and try again Ncount += 1 time.sleep(self.tsleep) # Update header info if self.mode == 'rw': # Update header self._update_header() # Denote as Container if 'pspec_type' not in list(self.data.attrs.keys()): self.data.attrs['pspec_type'] = self.__class__.__name__ def _close(self): """ Close HDF5 file. DOes nothing if file is already closed. """ if self.data is None: return self.data.close() self.data = None def _store_pspec(self, pspec_group, uvp): """ Store a UVPSpec object as group of datasets within the HDF5 file. Parameters ---------- pspec_group : HDF5 group HDF5 group to store power spectrum data in. uvp : UVPSpec Object containing power spectrum and related data. """ if self.mode == 'r': raise IOError("HDF5 file was opened read-only; cannot write to file.") # Get data and attributes from UVPSpec object (stored in dicts) assert isinstance(uvp, uvpspec.UVPSpec) # Write UVPSpec to group uvp.write_to_group(pspec_group, run_check=True) def _load_pspec(self, pspec_group, **kwargs): """ Load a new UVPSpec object from a HDF5 group. Parameters ---------- pspec_group : HDF5 group Group containing datasets that contain power spectrum and supporting information, in a standard format expected by UVPSpec. kwargs : dict, optional UVPSpec.read_from_group partial IO keyword arguments Returns ------- uvp : UVPSpec Returns a UVPSpec object constructed from the input HDF5 group. """ # Check that group is tagged as containing UVPSpec (pspec_type attribute) if 'pspec_type' in list(pspec_group.attrs.keys()): # Convert bytes -> str if needed pspec_type = pspec_group.attrs['pspec_type'] if isinstance(pspec_type, bytes): pspec_type = pspec_type.decode() if pspec_type != uvpspec.UVPSpec.__name__: raise TypeError("HDF5 group is not tagged as a UVPSpec object.") else: raise TypeError("HDF5 group is not tagged as a UVPSpec object.") # Create new UVPSpec object and fill with data from this group uvp = uvpspec.UVPSpec() uvp.read_from_group(pspec_group, **kwargs) return uvp def _update_header(self): """ Update the header in the HDF5 file with useful metadata, including the version of hera_pspec. """ if 'header' in list(self.data.keys()): hdr = self.data['header'] elif not self.swmr: hdr = self.data.create_group('header') else: raise ValueError("Cannot create a header group with SWMR") # Check if versions of hera_pspec are the same # with backward-compatibility if file generated # before version control was introduced if 'hera_pspec.git_hash' in hdr.attrs or ( 'hera_pspec.version' in hdr.attrs and hdr.attrs['hera_pspec.version'] != __version__ ): warnings.warn("HDF5 file was created by a different version of hera_pspec") if not self.swmr: hdr.attrs['hera_pspec.version'] = __version__
[docs] @transactional def set_pspec(self, group, psname, pspec, overwrite=False): """ Store a delay power spectrum in the container. Parameters ---------- group : str Which group the power spectrum belongs to. psname : str or list of str The name(s) of the power spectrum to return from within the group. pspec : UVPSpec or list of UVPSpec Power spectrum object(s) to store in the container. overwrite : bool, optional If the power spectrum already exists in the file, whether it should overwrite it or raise an error. Default: False (does not overwrite). """ if self.mode == 'r': raise IOError("HDF5 file was opened read-only; cannot write to file.") if isinstance(group, (tuple, list, dict)): raise ValueError("Only one group can be specified at a time.") # Handle input arguments that are iterable (i.e. sequences, but not str) if isinstance(psname, list): if isinstance(pspec, list) and len(pspec) == len(psname): # Recursively call set_pspec() on each item of the list for _psname, _pspec in zip(psname, pspec): if not isinstance(_pspec, uvpspec.UVPSpec): raise TypeError("pspec lists must only contain UVPSpec " "objects.") self.set_pspec(group, _psname, _pspec, overwrite=overwrite) return else: # Raise exception if psname is a list, but pspec is not raise ValueError("If psname is a list, pspec must be a list of " "the same length.") if isinstance(pspec, list) and not isinstance(psname, list): raise ValueError("If pspec is a list, psname must also be a list.") # No lists should pass beyond this point # check for swmr write of new group or dataset if self.swmr: tree = self.tree(return_str=False, nested=True) if group not in tree or psname not in tree[group]: raise ValueError("Cannot write new group or dataset with SWMR") # Check that input is of the correct type if not isinstance(pspec, uvpspec.UVPSpec): print("pspec:", type(pspec), pspec) raise TypeError("pspec must be a UVPSpec object.") key1 = "%s" % group key2 = "%s" % psname # Check that the group exists if key1 not in list(self.data.keys()): grp = self.data.create_group(key1) else: grp = self.data[key1] # Check that the psname exists if key2 not in list(grp.keys()): # Create group if it doesn't exist psgrp = grp.create_group(key2) else: if overwrite: # Delete group and recreate del grp[key2] psgrp = grp.create_group(key2) else: raise AttributeError( "Power spectrum %s/%s already exists and overwrite=False." \ % (key1, key2) ) # Add power spectrum to this group self._store_pspec(psgrp, pspec) # Store info about what kind of power spectra are in the group psgrp.attrs['pspec_type'] = pspec.__class__.__name__
[docs] @transactional def get_pspec(self, group, psname=None, **kwargs): """ Get a UVPSpec power spectrum object from a given group. Parameters ---------- group : str Which group the power spectrum belongs to. psname : str, optional The name of the power spectrum to return. If None, extract all available power spectra. kwargs : dict UVPSpec.read_from_group partial IO keyword arguments Returns ------- uvp : UVPSpec or list of UVPSpec The specified power spectrum as a UVPSpec object (or a list of all power spectra in the group, if psname was not specified). """ # Check that group is in keys and extract it if so key1 = "%s" % group if key1 in list(self.data.keys()): grp = self.data[key1] else: raise KeyError("No group named '%s'" % key1) # If psname was specified, check that it exists and extract if psname is not None: key2 = "%s" % psname # Load power spectrum if it exists if key2 in list(grp.keys()): return self._load_pspec(grp[key2], **kwargs) else: raise KeyError("No pspec named '%s' in group '%s'" % (key2, key1)) # Otherwise, extract all available power spectra uvp = [] def pspec_filter(n, obj): if u'pspec_type' in list(obj.attrs.keys()): uvp.append(self._load_pspec(obj, **kwargs)) # Traverse the entire set of groups/datasets looking for pspecs grp.visititems(pspec_filter) # This adds power spectra to the uvp list return uvp
[docs] @transactional def spectra(self, group): """ Return list of available power spectra. Parameters ---------- group : str Which group to list power spectra from. Returns ------- ps_list : list of str List of names of power spectra in the group. """ # Check that group is in keys and extract it if so key1 = "%s" % group if key1 in list(self.data.keys()): grp = self.data[key1] else: raise KeyError("No group named '%s'" % key1) # Filter to look for pspec objects ps_list = [] def pspec_filter(n, obj): if u'pspec_type' in list(obj.attrs.keys()): ps_list.append(n) # Traverse the entire set of groups/datasets looking for pspecs grp.visititems(pspec_filter) return ps_list
[docs] @transactional def groups(self): """ Return list of groups in the container. Returns ------- group_list : list of str List of group names. """ groups = list(self.data.keys()) if u'header' in groups: groups.remove(u'header') return groups
[docs] @transactional def tree(self, return_str=True): """ Output a string containing a tree diagram of groups and the power spectra that they contain. Parameters ---------- return_str : bool, optional If True, return the tree as a string, otherwise return as a dictionary Returns ------- str or dict Tree structure of HDF5 file """ grps = self.groups(nested=True) tree = dict() for grp in grps: tree[grp] = self.spectra(grp, nested=True) if not return_str: return tree else: s = "" for grp in grps: s += "(%s)\n" % grp for pspec in tree[grp]: s += " |--%s\n" % pspec return s
[docs] @transactional def save(self): """ Force HDF5 file to flush to disk. """ self.data.flush()
def __del__(self): """ Make sure that HDF5 file is closed on destruct. """ # Uses try-except construct just as a safeguard try: self.data.close() except: pass
def combine_psc_spectra(psc, groups=None, dset_split_str='_x_', ext_split_str='_', merge_history=True, verbose=True, overwrite=False): """ Iterate through a PSpecContainer and, within each specified group, combine UVPSpec (i.e. spectra) of similar name but varying psname extension. Power spectra to-be-merged are assumed to follow the naming convention dset1_x_dset2_ext1, dset1_x_dset2_ext2, ... where _x_ is the default dset_split_str, and _ is the default ext_split_str. The spectra names are first split by dset_split_str, and then by ext_split_str. In this particular case, all instances of dset1_x_dset2* will be merged together. In order to merge spectra names with no dset distinction and only an extension, feed dset_split_str as '' or None. Example, to merge together: uvp_1, uvp_2, uvp_3, feed dset_split_str=None and ext_split_str='_'. Note this is a destructive and inplace operation, all of the *_ext1 objects are removed after merge. Parameters ---------- psc : PSpecContainer object A PSpecContainer object with one or more groups and spectra. groups : list A list of groupnames to operate on. Default is all groups. dset_split_str : str The pattern used to split dset1 from dset2 in the psname. ext_split_str : str The pattern used to split the dset name from its extension in the psname. merge_history : bool If True merge UVPSpec histories. Else use zeroth object's history. verbose : bool If True, report feedback to stdout. overwrite : bool If True, overwrite output spectra if they exist. """ # Load container if isinstance(psc, str): psc = PSpecContainer(psc, mode='rw') else: assert isinstance(psc, PSpecContainer) # Get groups _groups = psc.groups() if groups is None: groups = _groups else: groups = [grp for grp in groups if grp in _groups] assert len(groups) > 0, "no specified groups exist in this Container object" # Iterate over groups for grp in groups: # Get spectra in this group spectra = list(psc.data[grp].keys()) # Get unique spectra by splitting and then re-joining unique_spectra = [] for spc in spectra: if dset_split_str == '' or dset_split_str is None: sp = spc.split(ext_split_str)[0] else: sp = utils.flatten([s.split(ext_split_str) for s in spc.split(dset_split_str)])[:2] sp = dset_split_str.join(sp) if sp not in unique_spectra: unique_spectra.append(sp) # Iterate over each unique spectra, and merge all spectra extensions for spc in unique_spectra: # check for overwrite if spc in spectra and overwrite == False: if verbose: print("spectra {}/{} already exists and overwrite == False, " "skipping...".format(grp, spc)) continue # get merge list to_merge = [spectra[i] for i in \ np.where([spc in _sp for _sp in spectra])[0]] try: # merge uvps = [psc.get_pspec(grp, uvp) for uvp in to_merge] if len(uvps) > 1: merged_uvp = uvpspec.combine_uvpspec(uvps, merge_history=merge_history, verbose=verbose) else: merged_uvp = uvps[0] # write to file psc.set_pspec(grp, spc, merged_uvp, overwrite=True) # if successful merge, remove uvps for uvp_name in to_merge: if uvp_name != spc: del psc.data[grp][uvp_name] except Exception as exc: # merge failed, so continue if verbose: print("uvp merge failed for spectra {}/{}, exception: " \ "{}".format(grp, spc, exc)) def get_combine_psc_spectra_argparser(): a = argparse.ArgumentParser( description="argument parser for hera_pspec.container.combine_psc_spectra") # Add list of arguments a.add_argument("filename", type=str, help="Filename of HDF5 container (PSpecContainer) containing " "groups / input power spectra.") a.add_argument("--dset_split_str", default='_x_', type=str, help='The pattern used to split dset1 from dset2 in the ' 'psname.') a.add_argument("--ext_split_str", default='_', type=str, help='The pattern used to split the dset names from their ' 'extension in the psname (if it exists).') a.add_argument("--verbose", default=False, action='store_true', help='Report feedback to stdout.') return a