Source code for hera_pspec.uvpspec

import numpy as np
from collections import OrderedDict as odict
import os, copy, shutil, operator, ast, fnmatch
from pyuvdata import utils as uvutils
import h5py
import warnings
import json

from . import conversions, noise, version, __version__, pspecbeam, grouping, utils, uvpspec_utils as uvputils
from .parameter import PSpecParam
from .uvwindow import UVWindow

[docs]class UVPSpec(object): """ An object for storing power spectra generated by hera_pspec and a file-format for its data and meta-data. """
[docs] def __init__(self): """ An object for storing power spectra and associated metadata generated by hera_pspec. """ # Summary attributes self._Ntimes = PSpecParam("Ntimes", description="Number of unique times.", expected_type=int) self._Ntpairs = PSpecParam("Ntpairs", description="Number of unique time-pairs.", expected_type=int) self._Nbltpairs = PSpecParam("Nbltpairs", description="Total number of baseline-time-pairs.", expected_type=int) self._Nblpairs = PSpecParam("Nblpairs", description="Total number of baseline-pairs.", expected_type=int) self._Nspwdlys = PSpecParam("Nspwdlys", description="Total number of delay bins across all spectral windows.", expected_type=int) self._Nspwfreqs = PSpecParam("Nspwfreqs", description="Total number of frequency bins across all spectral windows.", expected_type=int) self._Nspws = PSpecParam("Nspws", description="Number of unique spectral windows.", expected_type=int) self._Ndlys = PSpecParam("Ndlys", description="Number of unique delay bins.", expected_type=int) self._Nfreqs = PSpecParam("Nfreqs", description="Number of unique frequency bins in the data.", expected_type=int) self._Npols = PSpecParam("Npols", description="Number of polarizations in the data.", expected_type=int) self._history = PSpecParam("history", description='The file history.', expected_type=str) desc = ('A compressed json string version of the r_params dictionary of pspecdata object.' 'uvpspec only stores each unique weighting along with a list of unique baselines' 'corresponding to that weighting.') self._r_params = PSpecParam("r_params", description = desc, expected_type = str) # Data attributes desc = "A string indicating the covariance model of cov_array. Options are ['dsets', 'empirical']. See PSpecData.pspec() for details." self._cov_model = PSpecParam("cov_model", description=desc, expected_type=str) desc = "A boolean indicating if the window functions stored in window_function_array are exact or not." self._exact_windows = PSpecParam("exact_windows", description=desc, expected_type=bool) desc = "Power spectrum data dictionary with spw integer as keys and values as complex ndarrays." self._data_array = PSpecParam("data_array", description=desc, expected_type=np.complex128, form="(Nbltpairs, spw_Ndlys, Npols)") desc = "Power spectrum covariance dictionary with spw integer as keys and values as float ndarrays, stored separately for real and imaginary parts." self._cov_array_real = PSpecParam("cov_array_real", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)") self._cov_array_imag = PSpecParam("cov_array_imag", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)") desc = "Window function dictionary of bandpowers." self._window_function_array = PSpecParam("window_function_array", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)") desc = "Dictionary of bandpowers given the kperp grid used to compute the window functions." self._window_function_kperp = PSpecParam("window_function_kperp", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)") desc = "Dictionary of bandpowers given the kparallel grid used to compute the window functions." self._window_function_kpara = PSpecParam("window_function_kpara", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)") desc = "Weight dictionary for original two datasets. The second axis holds [dset1_wgts, dset2_wgts] in that order." self._wgt_array = PSpecParam("wgt_array", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Nfreqs, 2, Npols)") desc = "Integration time dictionary. This holds the average integration time [seconds] of each delay spectrum in the data. " \ "This is not necessarily equal to the integration time of the visibility data: If data have been coherently averaged " \ "(i.e. averaged before squaring), than this is the sum of each spectrum's integration time." self._integration_array = PSpecParam("integration_array", description=desc, expected_type=np.float64, form="(Nbltpairs, Npols)") desc = "Nsample dictionary, if the pspectra have been incoherently averaged (i.e. averaged after squaring), this is " \ "the effective number of samples in that average (float type). This is not the same as the pyuvdata.UVData nsample_array." self._nsample_array = PSpecParam("nsample_array", description=desc, expected_type=np.float64, form="(Nbltpairs, Npols)") desc = ("Power spectrum stats array with stats type and spw integer as keys and values as complex ndarrays with same shape" " as data_array") self._stats_array = PSpecParam("stats_array", description=desc, expected_type=np.complex128, form="(Nbltpairs, Ndlys, Npols)") self._spw_array = PSpecParam("spw_array", description="Integer array holding unique spectral windows.", form="(Nspws,)", expected_type=np.uint16) self._spw_dly_array = PSpecParam("spw_dly_array", description="Spw integer array for the dly_array.", form="(Nspwdlys,)", expected_type=np.uint16) self._spw_freq_array = PSpecParam("spw_freq_array", description="Spw integer array for the freq_array.", form="(Nspwfreqs,)", expected_type=np.uint16) self._freq_array = PSpecParam("freq_array", description="Frequency array of the original data in Hz.", form="(Nfreqs,)", expected_type=np.float64) self._dly_array = PSpecParam("dly_array", description="Delay array in seconds.", form="(Nspwdlys,)", expected_type=np.float64) desc = "Polarization pair integer, made up of two polarization integers concatenated in a standardized way." self._polpair_array = PSpecParam("polpair_array", description=desc, form="(Npols,)", expected_type=np.int32) self._lst_1_array = PSpecParam("lst_1_array", description="LST array of the first bl in the bl-pair [radians].", form="(Nbltpairs,)", expected_type=np.float64) self._lst_2_array = PSpecParam("lst_2_array", description="LST array of the second bl in the bl-pair [radians].", form="(Nbltpairs,)", expected_type=np.float64) self._lst_avg_array = PSpecParam("lst_avg_array", description="Average of the lst_1_array and lst_2_array [radians].", form="(Nbltpairs,)", expected_type=np.float64) self._time_1_array = PSpecParam("time_1_array", description="Time array of the first bl in the bl-pair [Julian Date].", form="(Nbltpairs,)", expected_type=np.float64) self._time_1_array = PSpecParam("time_2_array", description="Time array of the second bl in the bl-pair [Julian Date].", form="(Nbltpairs,)", expected_type=np.float64) self._time_avg_array = PSpecParam("time_avg_array", description="Average of the time_1_array and time_2_array [Julian Date].", form='(Nbltpairs,)', expected_type=np.float64) self._blpair_array = PSpecParam("blpair_array", description="Baseline-pair integer for all baseline-pair times.", form="(Nbltpairs,)", expected_type=np.int64) self._scalar_array = PSpecParam("scalar_array", description="Power spectrum normalization scalar from pspecbeam module.", expected_type=np.float64, form="(Nspws, Npols)") # Baseline attributes self._Nbls = PSpecParam("Nbls", description="Number of unique baseline integers.", expected_type=int) self._bl_vecs = PSpecParam("bl_vecs", description="ndarray of baseline separation vectors in the ITRF frame [meters]. To get it in ENU frame see self.get_ENU_bl_vecs().", expected_type=np.float64, form="(Nbls,)") self._bl_array = PSpecParam("bl_array", description="All unique baseline (antenna-pair) integers.", expected_type=np.int32, form="(Nbls,)") # Misc Attributes self._channel_width = PSpecParam("channel_width", description="width of visibility frequency channels in Hz.", form="(Nspwfreqs,)", expected_type=np.float64) self._telescope_location = PSpecParam("telescope_location", description="telescope location in ECEF frame [meters]. To get it in Lat/Lon/Alt see pyuvdata.utils.LatLonAlt_from_XYZ().", expected_type=np.float64) self._weighting = PSpecParam("weighting", description="Form of data weighting used when forming power spectra.", expected_type=str) self.set_symmetric_taper = PSpecParam("symmetric_taper", description="Specify whether Taper was applied symmetrically (True) or to the left(False).", expected_type=str) self._norm = PSpecParam("norm", description="Normalization method adopted in OQE (M matrix).", expected_type=str) self._taper = PSpecParam("taper", description='Taper function applied to visibility data before FT. See uvtools.dspec.gen_window for options."', expected_type=str) self._vis_units = PSpecParam("vis_units", description="Units of the original visibility data used to form the power spectra.", expected_type=str) self._norm_units = PSpecParam("norm_units", description="Power spectra normalization units, i.e. telescope units [Hz str] or cosmological [(h^-3) Mpc^3].", expected_type=str) self._labels = PSpecParam("labels", description="Array of dataset string labels.", expected_type=str) self._label_1_array = PSpecParam("label_1_array", description="Integer array w/ shape of data that indexes labels and gives label of dset1.", form="(Nspws, Nbltpairs, Npols)", expected_type=np.int32) self._label_2_array = PSpecParam("label_2_array", description="Integer array w/ shape of data that indexes labels and gives label of dset2.", form="(Nspws, Nbltpairs, Npols)", expected_type=np.int32) self._folded = PSpecParam("folded", description="if power spectra are folded (i.e. averaged) onto purely positive delay axis. Default is False", expected_type=bool) self._beamfile = PSpecParam("beamfile", description="filename of beam-model used to normalized pspectra.", expected_type=str) self._OmegaP = PSpecParam("OmegaP", description="Integral of unitless beam power over the sky [steradians].", form="(Nbeam_freqs, Npols)", expected_type=np.float64) self._OmegaPP = PSpecParam("OmegaP", description="Integral of unitless beam power squared over the sky [steradians].", form="(Nbeam_freqs, Npols)", expected_type=np.float64) self._beam_freqs = PSpecParam("beam_freqs", description="Frequency bins of the OmegaP and OmegaPP beam-integral arrays [Hz].", form="(Nbeam_freqs,)", expected_type=np.float64) self._cosmo = PSpecParam("cosmo", description="Instance of conversion.Cosmo_Conversions class.", expected_type=conversions.Cosmo_Conversions) # Collect all parameters: required and non-required self._all_params = sorted( [ p[1:] for p in fnmatch.filter(self.__dict__.keys(), '_*')]) # Specify required params: these are required for read / write and # self.check() self._req_params = ["Ntimes", "Ntpairs", "Nbltpairs", "Nblpairs", "Nspws", "Ndlys", "Npols", "Nfreqs", "history", "Nspwdlys", "Nspwfreqs", "r_params", "data_array", "wgt_array", "integration_array", "spw_array", "freq_array", "dly_array", "polpair_array", "lst_1_array", "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "Nbls", "bl_vecs", "bl_array", "channel_width", "telescope_location", "weighting", "vis_units", "norm_units", "taper", "norm", "nsample_array", "lst_avg_array", "time_avg_array", "folded", "scalar_array", "labels", "label_1_array", "label_2_array", "spw_dly_array", "spw_freq_array", "symmetric_taper"] # All parameters must fall into one and only one of the following # groups, which are used in __eq__ self._immutables = ["Ntimes", "Ntpairs", "Nbltpairs", "Nblpairs", "Nspwdlys", "Nspwfreqs", "Nspws", "Ndlys", "Npols", "Nfreqs", "history", "r_params", "cov_model", "Nbls", "weighting", "vis_units", "norm", "norm_units", "taper", "cosmo", "beamfile", 'folded', 'exact_windows'] self._ndarrays = ["spw_array", "freq_array", "dly_array", "polpair_array", "lst_1_array", "lst_avg_array", "time_avg_array", "channel_width", "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "OmegaP", "OmegaPP", "beam_freqs", "bl_vecs", "bl_array", "telescope_location", "scalar_array", "labels", "label_1_array", "label_2_array", "spw_freq_array", "spw_dly_array"] self._dicts = ["data_array", "wgt_array", "integration_array", "window_function_array", "window_function_kperp", "window_function_kpara", "nsample_array", "cov_array_real", "cov_array_imag"] self._dicts_of_dicts = ["stats_array"] # define which attributes are considered meta data. Large attrs should # be constructed as datasets self._meta_dsets = ["lst_1_array", "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "bl_vecs", "bl_array", "lst_avg_array", "time_avg_array", "OmegaP", "OmegaPP", "label_1_array", "label_2_array", "spw_dly_array", "spw_freq_array"] self._meta_attrs = sorted( set(self._all_params) - set(self._dicts) \ - set(self._meta_dsets) - set(self._dicts_of_dicts)) self._meta = sorted(set(self._meta_dsets).union(set(self._meta_attrs))) # Attributes that appeared in previous versions of UVPSpec and may be present # in HDF5 files written by these older versions. We can look for these attributes # when we read the files and # and deal with them appropriately to support backwards compatibility. self._meta_deprecated = ["Nblpairts"] self._meta_dsets_deprecated = [] # check all params are covered assert len( set(self._all_params) - set(self._dicts) \ - set(self._immutables) - set(self._ndarrays) \ - set(self._dicts_of_dicts) ) == 0 # Default parameter values self.folded = False # Set function docstrings self.get_red_bls.__func__.__doc__ = uvputils._get_red_bls.__doc__ self.get_red_blpairs.__func__.__doc__ = uvputils._get_red_blpairs.__doc__
[docs] def get_cov(self, key, component='real', omit_flags=False): """ Slice into covariance array with a specified data key in the format (spw, ((ant1, ant2),(ant3, ant4)), (pol1, pol2)) or (spw, blpair-integer, pol-integer) where spw is the spectral window integer, ant1 etc. are integers, and pol is either a tuple of polarization strings (ex. ('XX', 'YY')) or integers (ex. (-5,-5)). Parameters ---------- key: tuple Contains the baseline-pair, spw, polpair keys component : str "real" or "imag". Indicating which cov_array the function calls. omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- data : complex ndarray Shape (Ntimes, Ndlys, Ndlys) """ spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags) # Need to deal with folded data! # if data has been folded, return only positive delays if component == 'real': if hasattr(self,'cov_array_real'): if self.folded: Ndlys = np.count_nonzero(self.spw_dly_array == spw) return self.cov_array_real[spw][blpairts, -(Ndlys-Ndlys//2-1):, -(Ndlys-Ndlys//2-1):, polpair] else: return self.cov_array_real[spw][blpairts, :, :, polpair] else: raise AttributeError("No covariance array has been calculated.") elif component == 'imag': if hasattr(self,'cov_array_imag'): if self.folded: Ndlys = np.count_nonzero(self.spw_dly_array == spw) return self.cov_array_imag[spw][blpairts, -(Ndlys-Ndlys//2-1):, -(Ndlys-Ndlys//2-1):, polpair] else: return self.cov_array_imag[spw][blpairts, :, :, polpair] else: raise AttributeError("No covariance array has been calculated.") else: raise ValueError("No types besides real and imag.")
[docs] def get_window_function(self, key, omit_flags=False): """ Slice into window_function array with a specified data key in the format (spw, ((ant1, ant2),(ant3, ant4)), (pol1, pol2)) or (spw, blpair-integer, pol-integer) where spw is the spectral window integer, ant1 etc. are integers, and pol is either a tuple of polarization strings (ex. ('XX', 'YY')) or integers (ex. (-5,-5)). Parameters ---------- key: tuple Contains the baseline-pair, spw, polpair keys omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- data : complex ndarray Shape (Ntimes, Ndlys, Ndlys) """ spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags) # Need to deal with folded data! # if data has been folded, return only positive delays if self.folded: Ndlys = np.count_nonzero(self.spw_dly_array == spw) return self.window_function_array[spw][blpairts, -(Ndlys-Ndlys//2-1):, -(Ndlys-Ndlys//2-1):, polpair] else: return self.window_function_array[spw][blpairts, ..., polpair]
[docs] def get_data(self, key, omit_flags=False): """ Slice into data_array with a specified data key in the format (spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2)) or (spw, blpair-integer, polpair) where spw is the spectral window integer, ant1 etc. are integers, and polpair is either a tuple of polarization strings/ints or a polarizatio-pair integer. Parameters ---------- key : tuple Contains the baseline-pair, spw, polpair keys omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- data : complex ndarray Shape (Ntimes, Ndlys) """ spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags) # if data has been folded, return only positive delays if self.folded: Ndlys = np.count_nonzero(self.spw_dly_array == spw) return self.data_array[spw][blpairts, -(Ndlys-Ndlys//2-1):, polpair] # else return all delays else: return self.data_array[spw][blpairts, :, polpair]
[docs] def get_wgts(self, key, omit_flags=False): """ Slice into wgt_array with a specified data key in the format (spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2)) or (spw, blpair-integer, polpair-integer) where spw is the spectral window integer, ant1 etc. are integers, and polpair is either a polarization pair tuple or integer. Parameters ---------- key : tuple Contains the baseline-pair, spw, polpair keys omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- wgts : float ndarray Has shape (Ntimes, Nfreqs, 2), where the last axis holds [wgt_1, wgt_2] in that order. """ spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags) return self.wgt_array[spw][blpairts, :, :, polpair]
[docs] def get_integrations(self, key, omit_flags=False): """ Slice into integration_array with a specified data key in the format:: (spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2)) or (spw, blpair-integer, polpair-integer) where spw is the spectral window integer, ant1 etc. are integers, and polpair is either a polarization pair tuple or integer. Parameters ---------- key : tuple Contains the baseline-pair, spw, polpair keys omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- data : float ndarray Has shape (Ntimes,) """ spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags) return self.integration_array[spw][blpairts, polpair]
[docs] def get_r_params(self): """ Return an `r_params` dictionary. In a `PSpecData` object, the `r_params` are stored as a dictionary with one entry per baseline. In a `UVPSpec` object, the dictionary is compressed so that a single `r_param` entry corresponds to multiple baselines and is stored as a JSON format string. This function reads the compressed string and returns the dictionary with the correct following fields and structure. Returns ------- r_params : dict Dictionary of r_params for this object. """ return uvputils.decompress_r_params(self.r_params)
[docs] def get_nsamples(self, key, omit_flags=False): """ Slice into nsample_array with a specified data key in the format (spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2)) or (spw, blpair-integer, polpair-integer) where spw is the spectral window integer, ant1 etc. are integers, and polpair is either a polarization pair tuple or integer. Parameters ---------- key : tuple Contains the baseline-pair, spw, polpair keys omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- data : float ndarray with shape (Ntimes,) """ spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags) return self.nsample_array[spw][blpairts, polpair]
[docs] def get_dlys(self, key): """ Get array of delays given a spectral window selection. Parameters ---------- key : int, or tuple with integer Spectral window selection Returns ------- dlys : float ndarray, contains delays of pspectra in nanosec, given spw """ indices = self.spw_to_dly_indices(key) return self.dly_array[indices]
[docs] def get_blpair_seps(self): """ For each baseline-pair, get the average baseline separation in ENU frame in meters between its bl1 and bl2. Ordering matches self.get_blpairs() Returns ------- blp_avg_sep : float ndarray shape=(Nbltpairs,) """ # get list of bl separations bl_vecs = self.get_ENU_bl_vecs() bls = self.bl_array.tolist() blseps = np.array([np.linalg.norm(bl_vecs[bls.index(bl)]) for bl in self.bl_array]) # construct empty blp_avg_sep array blp_avg_sep = np.empty(self.Nbltpairs, float) # construct blpair_bls blpairs = _ordered_unique(self.blpair_array) bl1 = np.floor(blpairs / 1e6) blpair_bls = np.vstack([bl1, blpairs - bl1*1e6]).astype(np.int32).T # iterate over blpairs for blp, bl in zip(blpairs, blpair_bls): avg_sep = np.mean([blseps[bls.index(bl[0])], blseps[bls.index(bl[1])]]) inds = self.blpair_to_indices(blp) blp_avg_sep[inds] = avg_sep return blp_avg_sep
[docs] def get_blpair_blvecs(self, use_second_bl=False): """ For each baseline-pair, get the baseline vector in ENU frame of the first baseline in the pair. Ordering matches self.get_blpairs() Parameters ---------- use_second_bl : bool If True, return the vector of the second bl in the blpair. Returns ------- ndarray shape=(Nblpairs,) vector in ENU frame of the baseline """ # get list of bl vectors bl_vecs = self.get_ENU_bl_vecs() bl_array = self.bl_array.tolist() # construct blpair_bls i = 0 if use_second_bl: i = 1 blpairs = _ordered_unique(self.blpair_array) bls = [self.antnums_to_bl(self.blpair_to_antnums(blp)[i]) for blp in blpairs] blp_blvecs = [] for bl in bls: blp_blvecs.append(bl_vecs[bl_array.index(bl)]) return np.array(blp_blvecs)
[docs] def get_kperps(self, spw, little_h=True): """ Get transverse (perpendicular) cosmological wavevector for each baseline-pair given an adopted cosmology and a spw selection. Parameters ---------- spw : int, choice of spectral window little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc Returns ------- k_perp : float ndarray, transverse wave-vectors, shape=(Nblpairs,) """ # assert cosmo assert hasattr(self, 'cosmo'), \ "self.cosmo must exist to form cosmological " \ "wave-vectors. See self.set_cosmology()" # calculate mean redshift of band avg_z = self.cosmo.f2z( np.mean(self.freq_array[self.spw_to_freq_indices(spw)]) ) # get kperps blpair_seps = self.get_blpair_seps() k_perp = blpair_seps * self.cosmo.bl_to_kperp(avg_z, little_h=little_h) return k_perp
[docs] def get_kparas(self, spw, little_h=True): """ Get radial (parallel) cosmological wavevectors for power spectra given an adopted cosmology and a spw selection. Parameters ---------- spw : int Choice of spectral window little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc Returns (k_perp, k_para) ------- k_para : float ndarray Radial wave-vectors, shape=(Ndlys given spw) """ # assert cosmo assert hasattr(self, 'cosmo'), \ "self.cosmo must exist to form cosmological " \ "wave-vectors. See self.set_cosmology()" # calculate mean redshift of band avg_z = self.cosmo.f2z( np.mean(self.freq_array[self.spw_to_freq_indices(spw)]) ) # get kparas k_para = self.get_dlys(spw) \ * self.cosmo.tau_to_kpara(avg_z, little_h=little_h) return k_para
[docs] def get_blpairs(self): """ Returns a list of all blpair tuples in the data_array. """ return [self.blpair_to_antnums(blp) for blp in _ordered_unique(self.blpair_array)]
[docs] def get_polpairs(self): """ Returns a list of all unique polarization pairs in the data_array. """ polpairs = [uvputils.polpair_int2tuple(pp, pol_strings=True) for pp in self.polpair_array] return sorted(set(polpairs))
[docs] def get_all_keys(self): """ Returns a list of all possible tuple keys in the data_array, in the format: (spectral window, baseline-pair, polarization-string) """ # get unique blpair tuples blps = self.get_blpairs() all_keys = [] # loop over spw and pols and add to keys for spw in range(self.Nspws): for p in self.polpair_array: pol1, pol2 = uvputils.polpair_int2tuple(p) pstr = (uvutils.polnum2str(pol1), uvutils.polnum2str(pol2)) all_keys.extend([(spw, blp, pstr) for blp in blps]) return all_keys
[docs] def get_spw_ranges(self, spws=None): """ Get the frequency range and Nfreqs of each spectral window in the object. Parameters ---------- spws : list of spw integers, optional Default is all spws present in data. Returns ------- spw_ranges : list of len-4 tuples Tuples: (freq_start, freq_end, Nfreqs, Ndlys) in Hz. Contains start, stop and Nfreqs and Ndlys [Hz] of each spectral window. To turn this into the frequency array of the spectral window use `spw_freqs = np.linspace(*spw_range[:3], endpoint=False)` """ # type check if isinstance(spws, (int, np.integer)): spws = [spws] if spws is None: spws = np.arange(self.Nspws) spw_ranges = [] # iterate over spectral windows for spw in spws: spw_freqs = self.freq_array[self.spw_to_freq_indices(spw)] Nfreqs = len(spw_freqs) spw_df = np.median(np.diff(spw_freqs)) Ndlys = len(self.spw_to_dly_indices(spw)) spw_ranges.append( (spw_freqs.min(), spw_freqs.min() + Nfreqs * spw_df, Nfreqs, Ndlys) ) return spw_ranges
[docs] def get_stats(self, stat, key, omit_flags=False): """ Returns a statistic from the stats_array dictionary. Parameters ---------- stat : str The statistic to return. key : tuple A spw-blpair-polpair key parseable by UVPSpec.key_to_indices. omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- statistic : array_like A 2D (Ntimes, Ndlys) complex array holding desired bandpower statistic. """ if not hasattr(self, "stats_array"): raise AttributeError("No stats have been entered to this UVPSpec object") assert stat in self.stats_array.keys(), "Statistic name {} not found in stat keys.".format(stat) spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags) statistic = self.stats_array[stat] # if bandpowers have been folded, return only positive delays if self.folded: Ndlys = np.count_nonzero(self.spw_dly_array == spw) return statistic[spw][blpairts, -(Ndlys-Ndlys//2-1):, polpair] else: return statistic[spw][blpairts, :, polpair]
[docs] def set_stats(self, stat, key, statistic): """ Set a statistic waterfall (Ntimes, Ndlys) in the stats_array given the selection of spw, blpairts and polpair in key. Parameters ---------- stat : string Name of the statistic. key : tuple A spw-blpair-polpair key parseable by UVPSpec.key_to_indices. statistic : ndarray 2D ndarray with statistics to set. Must conform to shape of the slice of data_array produced by the specified key. """ spw, blpairts, polpair = self.key_to_indices(key) statistic = np.asarray(statistic) data_shape = self.data_array[spw][blpairts, :, polpair].shape if data_shape != statistic.shape: print(data_shape, statistic.shape) errmsg = "Input array shape {} must match " \ "data_array shape {}.".format(statistic.shape, data_shape) raise ValueError(errmsg) if not hasattr(self, "stats_array"): self.stats_array = odict() dtype = statistic.dtype if stat not in self.stats_array.keys(): self.stats_array[stat] = \ odict([[i, np.nan * np.ones(self.data_array[i].shape, dtype=dtype)] for i in range(self.Nspws)]) self.stats_array[stat][spw][blpairts, :, polpair] = statistic
[docs] def set_stats_slice(self, stat, m, b, above=False, val=1e20): """ For each baseline, set all delay bins in stats_array that fall above or below y = bl_len * m + b [nanosec] equal to val. Useful for downweighting foregrounds in spherical average. Parameters ---------- stat : str, name of stat in stat_array to set m : float, coefficient of bl_len [nanosec / meter] b : float, offset [nanosec] above : bool, if True, set stats above line, else set below line val : float, value to replace in stats_array """ assert stat in self.stats_array, "{} not found in stats_array".format(stat) blpairs = self.get_blpairs() bllens = self.get_blpair_seps() # iterate over spws for i in range(self.Nspws): # get this spw dlys dlys = self.get_dlys(i) * 1e9 # iterate over blpairs for blp in blpairs: # get time-blpair inds tinds = self.blpair_to_indices(blp) # get dly indices if above: dinds = np.abs(dlys) > (bllens[tinds].mean() * m + b) else: dinds = np.abs(dlys) < (bllens[tinds].mean() * m + b) # set stat_array for tind in tinds: self.stats_array[stat][i][tind, dinds, :] = val
[docs] def convert_to_deltasq(self, inplace=True): """ Convert from P(k) to Delta^2(k) by multiplying by k^3 / (2pi^2). The units of the output is therefore the current units (self.units) times (h^3) Mpc^-3, where the h^3 is only included if h^-3 is in self.norm_units Parameters ---------- inplace : bool, optional If True edit and overwrite arrays in self, else make a copy of self and return. Default: True. Notes: ------ Alters data_array, stats_array, and cov_array if present, as well as metadata like norm_units """ # copy object if inplace: uvp = self else: uvp = copy.deepcopy(self) # determine if little_h is required little_h = 'h^-3' in uvp.norm_units # loop over spectral windows for spw in range(uvp.Nspws): # get k vectors k_perp = uvp.get_kperps(spw, little_h=little_h) k_para = uvp.get_kparas(spw, little_h=little_h) k_mag = np.sqrt(k_perp[:, None, None]**2 + k_para[None, :, None]**2) # shape of (Nbltpairs, spw_Ndlys, Npols) coeff = k_mag**3 / (2 * np.pi**2) # multiply into data uvp.data_array[spw] *= coeff # update stats array if hasattr(uvp, 'stats_array'): for stat in uvp.stats_array: uvp.stats_array[stat][spw] *= coeff # update cov array if hasattr(uvp, 'cov_array_real'): cov = uvp.cov_array_real[spw] uvp.cov_array_real[spw] = np.einsum("tip,tijp,tjp->tijp", coeff, cov, coeff) cov = uvp.cov_array_imag[spw] uvp.cov_array_imag[spw] = np.einsum("tip,tijp,tjp->tijp", coeff, cov, coeff) # edit units uvp.norm_units += " k^3 / (2pi^2)" if inplace == False: return uvp
[docs] def blpair_to_antnums(self, blpair): """ Convert baseline-pair integer to nested tuple of antenna numbers. Parameters ---------- blpair : i12 int baseline-pair integer ID Returns ------- antnums : tuple Nested tuple containing baseline-pair antenna numbers. Ex. ((ant1, ant2), (ant3, ant4)) """ return uvputils._blpair_to_antnums(blpair)
[docs] def antnums_to_blpair(self, antnums): """ Convert nested tuple of antenna numbers to baseline-pair integer. Parameters ---------- antnums : tuple nested tuple containing integer antenna numbers for a baseline-pair. Ex. ((ant1, ant2), (ant3, ant4)) Returns ------- blpair : i12 integer baseline-pair integer """ return uvputils._antnums_to_blpair(antnums)
[docs] def bl_to_antnums(self, bl): """ Convert baseline (antenna-pair) integer to nested tuple of antenna numbers. Parameters ---------- bl : i6 int baseline integer ID Returns ------- antnums : tuple tuple containing baseline antenna numbers. Ex. (ant1, ant2) """ return uvputils._bl_to_antnums(bl)
[docs] def antnums_to_bl(self, antnums): """ Convert tuple of antenna numbers to baseline integer. Parameters ---------- antnums : tuple Tuple containing integer antenna numbers for a baseline. Ex. (ant1, ant2) Returns ------- bl : i6 integer Baseline integer. """ return uvputils._antnums_to_bl(antnums)
[docs] def blpair_to_indices(self, blpair): """ Convert a baseline-pair nested tuple ((ant1, ant2), (ant3, ant4)) or a baseline-pair integer into indices to index the blpairts axis of data_array. Parameters ---------- blpair : tuples or ints Nested tuple or blpair i12 integer, Ex. ((1, 2), (3, 4)), or list of blpairs. """ # convert blpair to integer if fed as tuple if isinstance(blpair, tuple): blpair = [self.antnums_to_blpair(blpair)] elif isinstance(blpair, (np.integer, int)): blpair = [blpair] elif isinstance(blpair, list): if isinstance(blpair[0], tuple): blpair = [self.antnums_to_blpair(blp) for blp in blpair] # assert exists in data assert np.array([b in self.blpair_array for b in blpair]).all(), \ "blpairs {} not all found in data".format(blpair) return np.arange(self.Nbltpairs)[ np.logical_or.reduce([self.blpair_array == b for b in blpair])]
[docs] def spw_to_freq_indices(self, spw): """ Convert a spectral window integer into a list of indices to index into the spw_freq_array or freq_array. Parameters ---------- spw : int or tuple Spectral window index, or spw tuple from get_spw_ranges(), or list of either. Returns ------- freq_indices : array_like Index array for specified spw(s) in spw_freq_array. """ # convert to list if int if isinstance(spw, (int, np.integer, tuple)): spw = [spw] # if spws is a list of spw tuples, convert to spw indices if np.all([isinstance(s, tuple) for s in spw]): spw_ranges = self.get_spw_ranges() spw = [spw_ranges.index(s) for s in spw] # assert exists in data assert np.array([s in self.spw_freq_array for s in spw]).all(), \ "spws {} not all found in data".format(spw) # get select boolean array select = np.logical_or.reduce([self.spw_freq_array == s for s in spw]) # get indices freq_indices = np.arange(self.Nspwfreqs)[select] return freq_indices
[docs] def spw_to_dly_indices(self, spw): """ Convert a spectral window integer into a list of indices to index into the spw_dly_array or dly_array. Parameters ---------- spw : int or tuple Spectral window index, or spw tuple from get_spw_ranges(), or list of either. Returns ------- dly_indices : array_like Index array for specified spw(s) in spw_dly_array. """ # convert to list if int if isinstance(spw, (int, np.integer, tuple)): spw = [spw] # if spws is a list of spw tuples, convert to spw indices if np.all([isinstance(s, tuple) for s in spw]): spw_ranges = self.get_spw_ranges() spw = [spw_ranges.index(s) for s in spw] # assert exists in data assert np.array([s in self.spw_dly_array for s in spw]).all(), \ "spws {} not all found in data".format(spw) # get select boolean array select = np.logical_or.reduce([self.spw_dly_array == s for s in spw]) if self.folded: select[self.dly_array < 1e-10] = False # get array dly_indices = np.arange(self.Nspwdlys)[select] return dly_indices
[docs] def spw_indices(self, spw): """ Convert a spectral window integer into a list of indices to index into the spw_array. Parameters ---------- spw : int or tuple Spectral window index, or spw tuple from get_spw_ranges(), or list of either. Returns ------- spw_indices : array_like Index array for specified spw(s) in spw_array. """ # convert to list if int if isinstance(spw, (int, np.integer, tuple)): spw = [spw] # if spws is a list of spw tuples, convert to spw indices if np.all([isinstance(s, tuple) for s in spw]): spw_ranges = self.get_spw_ranges() spw = [spw_ranges.index(s) for s in spw] # assert exists in data assert np.array([s in self.spw_array for s in spw]).all(), \ "spws {} not all found in data".format(spw) # get select boolean array #select = reduce(operator.add, [self.spw_array == s for s in spw]) select = np.logical_or.reduce([self.spw_array == s for s in spw]) # get array spw_indices = np.arange(self.Nspws)[select] return spw_indices
[docs] def polpair_to_indices(self, polpair): """ Map a polarization-pair integer or tuple to its index in polpair_array. Parameters ---------- polpair : (list of) int or tuple or str Polarization-pair integer or tuple of polarization strings/ints. Alternatively, if a single string is given, will assume that the specified polarization is to be cross-correlated withitself, e.g. 'XX' implies ('XX', 'XX'). Returns ------- indices : int Index of polpair in polpair_array. """ # Convert to list if not already a list if isinstance(polpair, (tuple, int, np.integer, str)): polpair = [polpair,] elif not isinstance(polpair, (list, np.ndarray)): raise TypeError("polpair must be list of tuple or int or str") # Convert strings to ints polpair = [uvputils.polpair_tuple2int((p,p)) if isinstance(p, str) else p for p in polpair] # Convert list items to standard format (polpair integers) polpair = [uvputils.polpair_tuple2int(p) if isinstance(p, tuple) else p for p in polpair] # Ensure all pols exist in data assert np.array([p in self.polpair_array for p in polpair]).all(), \ "pols {} not all found in data".format(polpair) idxs = np.logical_or.reduce([self.polpair_array == pp for pp in polpair]) indices = np.arange(self.polpair_array.size)[idxs] return indices
[docs] def time_to_indices(self, time, blpairs=None): """ Convert a time [Julian Date] from self.time_avg_array to an array that indexes the elements at which it appears in that array. Can optionally take a blpair selection to further select the indices at which both that time and blpair are satisfied. Parameters ---------- time : float Julian Date time from self.time_avg_array, Ex: 2458042.12242 blpairs : tuple or int, optional List of blpair tuples or integers that further selects the elements at which both time and blpairs are satisfied. Returns ------- indices : integer ndarray Contains indices at which selection is satisfied along the blpairts axis. """ time_select = np.isclose(self.time_avg_array, time, rtol=1e-10) if blpairs is None: return np.arange(self.Nbltpairs)[time_select] else: blp_select = np.zeros(self.Nbltpairs, bool) if isinstance(blpairs, (tuple, int, np.integer)): blpairs = [blpairs] for blp in blpairs: blp_select[self.blpair_to_indices(blp)] = True time_select *= blp_select return np.arange(self.Nbltpairs)[time_select]
[docs] def key_to_indices(self, key, omit_flags=False): """ Convert a data key into relevant slice arrays. A data key takes the form (spw_integer, ((ant1, ant2), (ant3, ant4)), (pol1, pol2)) or (spw, blpair-integer, pol-integer) where spw is the spectral window integer, ant1 etc. are integers, and polpair is either a polarization-pair integer or tuple. The key can also be a dictionary in the form:: key = { 'spw' : spw_integer, 'blpair' : ((ant1, ant2), (ant3, ant4)) 'polpair' : (pol1, pol2) } and it will parse the dictionary for you. Parameters ---------- key : tuple Baseline-pair, spw, and pol-pair key. omit_flags : bool, optional If True, remove time integrations (or spectra) that came from visibility data that were completely flagged across the spectral window (i.e. integration == 0). Returns ------- spw : int Spectral window index. blpairts : list List of integers to apply along blpairts axis. polpair : int Polarization pair index. """ # assert key length assert len(key) == 3, "length of key must be 3: (spw, blpair, polpair)" if isinstance(key, (odict, dict)): key = (key['spw'], key['blpair'], key['polpair']) # assign key elements spw_ind = key[0] blpair = key[1] polpair = key[2] # assert types assert isinstance(spw_ind, (int, np.integer)), "spw must be an integer" assert isinstance(blpair, (int, np.integer, tuple)), \ "blpair must be an integer or nested tuple" assert isinstance(polpair, (tuple, np.integer, int, str)), \ "polpair must be a tuple, integer, or str: %s / %s" % (polpair, key) # convert polpair string to tuple if isinstance(polpair, str): polpair = (polpair, polpair) # convert blpair to int if not int if isinstance(blpair, tuple): blpair = self.antnums_to_blpair(blpair) # convert pol to int if not int if isinstance(polpair, tuple): polpair = uvputils.polpair_tuple2int(polpair) # check attributes exist in data assert spw_ind in self.spw_freq_array and spw_ind in self.spw_dly_array, \ "spw {} not found in data".format(spw_ind) assert blpair in self.blpair_array, \ "blpair {} not found in data".format(blpair) assert polpair in self.polpair_array, \ "polpair {} not found in data".format(polpair) # index polarization array polpair_ind = self.polpair_to_indices(polpair) if isinstance(polpair_ind, (list, np.ndarray)): assert len(polpair_ind) == 1, \ "only one polpair can be specified in key_to_indices" polpair_ind = polpair_ind[0] # index blpairts blpairts_inds = self.blpair_to_indices(blpair) # omit flagged spectra: i.e. when integration_array == 0.0 if omit_flags: integs = self.integration_array[spw_ind][blpairts_inds, polpair_ind] keep = ~np.isclose(integs, 0.0) blpairts_inds = blpairts_inds[keep] return spw_ind, blpairts_inds, polpair_ind
[docs] def select(self, spws=None, bls=None, only_pairs_in_bls=True, blpairs=None, times=None, lsts=None, polpairs=None, inplace=True, run_check=True): """ Select function for selecting out certain slices of the data. Parameters ---------- spws : list, optional List of spectral window integers to select. If None, all will be selected. Default: None. bls : list of i6 baseline integers or baseline tuples, optional Example: (2,3). Select all baseline-pairs whose first _or_ second baseline are in bls list. This changes if only_pairs_in_bls == True. If None, all baselines are selected (subject to other options). Default: None. only_pairs_in_bls : bool, optional If True, keep only baseline-pairs whose first _and_ second baseline are found in bls list. Default: True. blpairs : list of baseline-pair tuples or integers, optional List of baseline-pairs to keep. If bls is also fed, this list is concatenated onto the baseline-pair list constructed from the bls selection. If None, all valid baseline pairs are selected. Default: None. times : array_like, optional Float ndarray of times from the time_avg_array to select. If None, all times are kept. Default: None. lsts : array_like, optional Float ndarray of lsts from the lst_avg_array to select. If None, all lsts are kept. Default: None. polpairs : list of tuple or int or str, optional List of polarization-pairs to keep. If None, all polarizations are kept. Default: None. (Strings are expanded into polarization pairs, and are not treated as individual polarizations, e.g. 'XX' becomes ('XX,'XX').) inplace : bool, optional If True, edit and overwrite arrays in self, else make a copy of self and return. Default: True. run_check : bool, optional If True, run uvp.check() after selection Returns ------- uvp : UVPSpec, optional If inplace=False, return a new UVPSpec object containing only the selected data. """ if inplace: uvp = self else: uvp = copy.deepcopy(self) uvputils._select(uvp, spws=spws, bls=bls, only_pairs_in_bls=only_pairs_in_bls, blpairs=blpairs, times=times, lsts=lsts, polpairs=polpairs) if run_check: uvp.check() if inplace == False: return uvp
[docs] def get_ENU_bl_vecs(self): """ Return baseline vector array in TOPO (ENU) frame in meters, with matched ordering of self.bl_vecs. Returns ------- blvecs : array_like Array of baseline vectors. """ return uvutils.ENU_from_ECEF( (self.bl_vecs + self.telescope_location), \ *uvutils.LatLonAlt_from_XYZ(self.telescope_location[None]))
[docs] def read_from_group(self, grp, just_meta=False, spws=None, bls=None, blpairs=None, times=None, lsts=None, polpairs=None, only_pairs_in_bls=False): """ Clear current UVPSpec object and load in data from specified HDF5 group. Parameters ---------- grp : HDF5 group HDF5 group to load data from. just_meta : bool, optional If True, read-in metadata but ignore data, wgts and integration arrays. Default: False. spws : list of tuple, optional List of spectral window integers to select. Default: None (loads all channels). bls : list of i6 baseline integers or baseline tuples Select all baseline-pairs whose first _or_ second baseline are in the list. This changes if only_pairs_in_bls == True. Example tuple: (2, 3). Default: None (loads all bl pairs). blpairs : list of baseline-pair tuples or integers List of baseline pairs to keep. If bls is also fed, this list is concatenated onto the baseline-pair list constructed from from the bls selection. times : float ndarray Times from the time_avg_array to keep. lsts : float ndarray lsts from the lst_avg_array to keep. polpairs : list of tuple or int or str List of polarization-pair integers, tuples, or strings to keep. Strings are expanded into polarization pairs, and are not treated as individual polarizations, e.g. 'XX' becomes ('XX,'XX'). only_pairs_in_bls : bool, optional If True, keep only baseline-pairs whose first _and_ second baseline are found in the 'bls' list. Default: False. """ # Make sure the group is a UVPSpec object assert 'pspec_type' in grp.attrs, "This object is not a UVPSpec object" pstype = grp.attrs['pspec_type'] pstype = pstype.decode() if isinstance(pstype, bytes) else pstype assert pstype == 'UVPSpec', "This object is not a UVPSpec object" # Clear all data in the current object self._clear() # Load-in meta data including deprecated. for k in grp.attrs: if k in self._meta_attrs or k in self._meta_deprecated: val = grp.attrs[k] if isinstance(val, bytes): val = val.decode() # bytes -> str setattr(self, k, val) for k in grp: if k in self._meta_dsets or k in self._meta_dsets_deprecated: setattr(self, k, grp[k][:]) # Backwards compatibility: pol_array exists (not polpair_array) if 'pol_array' in grp.attrs: warnings.warn("Stored UVPSpec contains pol_array attr, which has " "been superseded by polpair_array. Converting " "automatically.", UserWarning) pol_arr = grp.attrs['pol_array'] # Convert to polpair array polpair_arr = [uvputils.polpair_tuple2int((p,p)) for p in pol_arr] setattr(self, 'polpair_array', np.array(polpair_arr)) # Backwards compatibility: exact_windows if 'exact_windows' not in grp.attrs: setattr(self, 'exact_windows', False) # Use _select() to pick out only the requested baselines/spws if just_meta: uvputils._select(self, spws=spws, bls=bls, lsts=lsts, only_pairs_in_bls=only_pairs_in_bls, blpairs=blpairs, times=times, polpairs=polpairs) else: uvputils._select(self, spws=spws, bls=bls, lsts=lsts, only_pairs_in_bls=only_pairs_in_bls, blpairs=blpairs, times=times, polpairs=polpairs, h5file=grp) # handle cosmo if hasattr(self, 'cosmo'): self.cosmo = conversions.Cosmo_Conversions( **ast.literal_eval(self.cosmo) ) # BACKWARDS COMPATIBILITY for files with Nblpairts. # Check whether we are reading a pre Nbltpairs file by looking for the # Nblpairts field. if hasattr(self, "Nblpairts"): reading_old_version = True setattr(self, "Nbltpairs", self.Nblpairts) else: reading_old_version = False if reading_old_version: # If we are reading a power spectrum object that was written in the pre-Nbltpairs days # then make sure to correctly set Ntimes and Ntpairs. self.Ntimes = len(np.unique(np.hstack([self.time_1_array, self.time_2_array]))) self.Ntpairs = len(set([(t1, t2) for t1, t2 in zip(self.time_1_array, self.time_2_array)])) # Clear deprecated attributes. for dattr in self._meta_deprecated: if hasattr(self, dattr): delattr(self, dattr) for dattr in self._meta_dsets_deprecated: if hasattr(self, dattr): delattr(self, dattr) # If we are reading an UVPSpec object created before # UVData switched to future_array_shapes try: cw = getattr(self, "_channel_width").value except AttributeError: pass else: setattr(self, "_channel_width", np.atleast_1d(getattr(self, "_channel_width").value)) self.check(just_meta=just_meta)
[docs] def read_hdf5(self, filepath, just_meta=False, spws=None, bls=None, blpairs=None, times=None, lsts=None, polpairs=None, only_pairs_in_bls=False): """ Clear current UVPSpec object and load in data from an HDF5 file. Parameters ---------- filepath : str Path to HDF5 file to read from. just_meta : bool, optional If True, read-in metadata but ignore data, wgts and integration arrays. Default: False. spws : list of tuple, optional List of spectral window integers to select. Default: None (loads all channels). bls : list of i6 baseline integers or baseline tuples Select all baseline-pairs whose first _or_ second baseline are in the list. This changes if only_pairs_in_bls == True. Example tuple: (2, 3). Default: None (loads all bl pairs). blpairs : list of baseline-pair tuples or integers List of baseline pairs to keep. If bls is also fed, this list is concatenated onto the baseline-pair list constructed from from the bls selection. times : float ndarray Times from the time_avg_array to keep. lsts : float ndarray lsts from the lst_avg_array to keep. polpairs : list of polpair ints or tuples or str List of polarization-pair integers or tuples to keep. Strings are expanded into polarization pairs, and are not treated as individual polarizations, e.g. 'XX' becomes ('XX,'XX'). only_pairs_in_bls : bool, optional If True, keep only baseline-pairs whose first _and_ second baseline are found in the 'bls' list. Default: False. """ # Open file descriptor and read data with h5py.File(filepath, 'r') as f: self.read_from_group(f, just_meta=just_meta, spws=spws, bls=bls, times=times, lsts=lsts, polpairs=polpairs, only_pairs_in_bls=only_pairs_in_bls)
[docs] def write_to_group(self, group, run_check=True): """ Write UVPSpec data into an HDF5 group. Parameters ---------- group : HDF5 group The handle of the HDF5 group that the UVPSpec object should be written into. run_check : bool, optional Whether to run a validity check on the UVPSpec object before writing it to the HDF5 group. Default: True. """ # Run check if run_check: self.check() # Check whether the group already contains info # TODO # Write meta data for k in self._meta_attrs: if hasattr(self, k): # handle cosmo if k == 'cosmo': group.attrs['cosmo'] = str(getattr(self, k).get_params()) continue this_attr = getattr(self, k) # Do Unicode/string conversions, as HDF5 struggles with them if k == 'labels': this_attr = [np.string_(lbl) for lbl in this_attr] # Store attribute in group group.attrs[k] = this_attr for k in self._meta_dsets: if hasattr(self, k): group.create_dataset(k, data=getattr(self, k)) # Iterate over spectral windows and create datasets for i in np.unique(self.spw_array): group.create_dataset("data_spw{}".format(i), data=self.data_array[i], dtype=np.complex128) group.create_dataset("wgt_spw{}".format(i), data=self.wgt_array[i], dtype=np.float64) group.create_dataset("integration_spw{}".format(i), data=self.integration_array[i], dtype=np.float64) group.create_dataset("nsample_spw{}".format(i), data=self.nsample_array[i], dtype=float) if hasattr(self, "window_function_array"): group.create_dataset("window_function_spw{}".format(i), data=self.window_function_array[i], dtype=np.float64) if self.exact_windows: group.create_dataset("window_function_kperp_spw{}".format(i), data=self.window_function_kperp[i], dtype=np.float64) group.create_dataset("window_function_kpara_spw{}".format(i), data=self.window_function_kpara[i], dtype=np.float64) if hasattr(self, "cov_array_real"): group.create_dataset("cov_real_spw{}".format(i), data=self.cov_array_real[i], dtype=np.float64) group.create_dataset("cov_imag_spw{}".format(i), data=self.cov_array_imag[i], dtype=np.float64) # Store any statistics arrays if hasattr(self, "stats_array"): for s in self.stats_array.keys(): data = self.stats_array[s] for i in np.unique(self.spw_array): group.create_dataset("stats_{}_{}".format(s, i), data=data[i], dtype=data[i].dtype) # denote as a uvpspec object group.attrs['pspec_type'] = self.__class__.__name__
[docs] def write_hdf5(self, filepath, overwrite=False, run_check=True): """ Write a UVPSpec object to HDF5 file. Parameters ---------- filepath : str Filepath for output file. overwrite : bool, optional Whether to overwrite output file if it exists. Default: False. run_check : bool, optional Run UVPSpec validity check before writing to file. Default: True. """ # Check output if os.path.exists(filepath) and overwrite is False: raise IOError("{} exists, not overwriting...".format(filepath)) elif os.path.exists(filepath) and overwrite is True: print("{} exists, overwriting...".format(filepath)) os.remove(filepath) # Write file with h5py.File(filepath, 'w') as f: self.write_to_group(f, run_check=run_check)
[docs] def set_cosmology(self, new_cosmo, overwrite=False, new_beam=None, verbose=True): """ Set the cosmology for this UVPSpec object by passing an instance of conversions.Cosmo_Conversions and assigning it to self.cosmo, in addition to re-computing power spectrum normalization quantities like self.scalar_array. Because this will attempt to recompute the scalar_array, the beam-related metadata (OmegaP, OmegaPP and beam_freqs) must exist in self. If they do not, or if you'd like to overwrite them with a new beam, you can pass a UVBeam object or path to a beam file via the new_beam kwarg. If self.cosmo already exists then you are attempting to overwrite the currently adopted cosmology, which will only proceed if overwrite is True. If overwrite == True, then this module will overwrite self.cosmo. It will also recompute the power spectrum normalization scalar (using pspecbeam) and will overwrite the values in self.scalar_array. It will then propagate these re-normalization changes into the data_array by multiplying the data by (new_scalar_array / old_scalar_array). Parameters ---------- new_cosmo : Cosmo_Conversions or dict Cosmo_Conversions instance or cosmological parameter dictionary. The new cosmology you want to adopt for this UVPSpec object. overwrite : bool, optional If True, overwrite self.cosmo if it already exists. Default: False. new_beam : PSpecBeamBase sublcass or str pspecbeam.PSpecBeamUV object or path to beam file. The new beam you want to adopt for this UVPSpec object. verbose : bool, optional If True, report feedback to stdout. Default: True. """ if hasattr(self, 'cosmo') and overwrite == False: print("self.cosmo exists and overwrite == False, not overwriting...") return else: if (not hasattr(self, 'OmegaP') \ or not hasattr(self, 'OmegaPP') \ or not hasattr(self, 'beam_freqs')) \ and new_beam is None: print("In order to set the cosmology, self.OmegaP, " \ "self.OmegaPP and self.beam_freqs must exist, or you " \ "need to pass a PSpecBeamUV object or a path to a beam " \ "file as new_beam.") return # overwrite beam quantities if new_beam is not None: if verbose: print("Updating beam data with {}".format(new_beam)) if isinstance(new_beam, str): # PSpecBeamUV will adopt a default cosmology upon instantiation, # but this doesn't matterfor what we need from it new_beam = pspecbeam.PSpecBeamUV(new_beam) self.OmegaP, self.OmegaPP = new_beam.get_Omegas(self.polpair_array) self.beam_freqs = new_beam.beam_freqs # Update cosmo if isinstance(new_cosmo, (dict, odict)): new_cosmo = conversions.Cosmo_Conversions(**new_cosmo) if verbose: print("setting cosmology: \n{}".format(new_cosmo)) self.cosmo = new_cosmo # Update scalar_array if verbose: print("Updating scalar array and re-normalizing power spectra") for spw in range(self.Nspws): for j, polpair in enumerate(self.polpair_array): scalar = self.compute_scalar(spw, polpair, num_steps=1000, little_h=True, noise_scalar=False) # renormalize power spectra with new scalar self.data_array[spw][:, :, j] *= scalar/self.scalar_array[spw, j] # update self.scalar_array element self.scalar_array[spw, j] = scalar # Update self.units if pspectra were not originally in cosmological units if "Mpc" not in self.norm_units: self.norm_units = "h^-3 Mpc^3"
[docs] def get_exact_window_functions(self, ftbeam=None, spw_array=None, verbose=False, inplace=True, add_to_history='', x_orientation=None): """ Obtain the exact window functions corresponding to UVPSpec object. Depending on the format of the UVPSpec object given as input, the shape of the output array will be different: (Nblpts, Ndlys, Nkperp, Nkpara, Npols). Weights can be applied to the different baseline pairs to facilitate spherical average later. Parameters ---------- ftbeam : str or FTBeam object, optional Definition of the beam Fourier transform to be used. Options include; - Root name of the file to use, without the polarisation Ex : FT_beam_HERA_dipole (+ path) - '' for computation from beam simulations (slow) - FTBeam object. Make sure the polarisation and bandwidths are compatible with uvp. spw_array : list of ints, optional Spectral window indices. If None, the window functions are computed on all the uvp.spw_ranges, successively. Default: None. verbose : bool, optional If True, print progress, warnings and debugging info to stdout. inplace : bool, optional If True (default value), the UVPspec attribute window_function_array is filled with the values computed in the function, and window_function_kperp_bins and window_function_kpara_bins array are added as attributes. If False, returns kperp_bins, kpara_bins and window functions computed. Automatically set to False if blpair_groups is not None (that is, if the window functions are not computed on all the blpairs). add_to_history : str, optional Added text to add to file history. x_orientation: str, optional Orientation in cardinal direction east or north of X dipole. Default keeps polarization in X and Y basis. Used to convert polstr to polnum and conversely. Default: None. """ if self.exact_windows and inplace: warnings.warn("Exact window functions already computed, overwriting...") blpair_groups, blpair_lens, _ = self.get_red_blpairs() # check spw input and create array of spws to loop over if spw_array is None: # if no spw specified, use attribute spw_array = self.spw_array else: spw_array = spw_array if isinstance(spw_array, (list, tuple, np.ndarray)) else [int(spw_array)] # check if spw given is in uvp assert np.all([spw in self.spw_array for spw in spw_array]), \ "input spw is not in UVPSpec.spw_array." if inplace: # set inplace to False inplace = False warnings.warn('inplace set to False because you are not considering' \ ' all spectral windows in object.') # Create new window function array window_function_array = odict() window_function_kperp, window_function_kpara = odict(), odict() # Iterate over spectral windows for spw in spw_array: if verbose: print('spw = {}'.format(spw)) spw_window_function = [] spw_wf_kperp_bins, spw_wf_kpara_bins = [], [] # Iterate over polarizations for i, p in enumerate(self.polpair_array): # initialise UVWindow object uvw = UVWindow.from_uvpspec(self, ipol=i, spw=spw, ftbeam=ftbeam, x_orientation=x_orientation, verbose=verbose) # extract kperp bins the window functions corresponding to the baseline # lengths given as input kperp_bins = uvw.get_kperp_bins(blpair_lens) kpara_bins = uvw.get_kpara_bins(uvw.freq_array) pol_window_function = np.zeros((self.Nbltpairs, self.get_dlys(spw).size, kperp_bins.size, kpara_bins.size)) # Iterate over baseline-pair groups for j, blpg in enumerate(blpair_groups): if verbose: print('\rComputing for bl group {} of {}...'.format(j+1, len(blpair_groups)), end='') # window functions identical for all times window_function_blg = uvw.get_cylindrical_wf(blpair_lens[j], kperp_bins = kperp_bins, kpara_bins = kpara_bins) if np.isnan(window_function_blg).any(): warnings.warn('nan values in the window functions at spw={}, blpair_lens={:.2f}'.format(spw, blpair_lens[j])) # shape of window_function: (Ndlys, Nkperp, Nkpara) # Iterate within a baseline-pair group for k, blp in enumerate(blpg): # iterate over baselines within group, which all have the same window function for iblts in self.blpair_to_indices(blp): pol_window_function[iblts, :, :, :] = np.copy(window_function_blg) if verbose: print('\rComputed wf for baseline-pair groups {} of {}.'.format(len(blpair_groups),len(blpair_groups))) # Append to lists (spectral window) spw_window_function.append(pol_window_function) spw_wf_kperp_bins.append(kperp_bins.value) spw_wf_kpara_bins.append(kpara_bins.value) # Append to dictionaries window_function_array[spw] = np.moveaxis(spw_window_function, 0, -1) window_function_kperp[spw] = np.moveaxis(spw_wf_kperp_bins, 0, -1) window_function_kpara[spw] = np.moveaxis(spw_wf_kpara_bins, 0, -1) if inplace: self.window_function_array = window_function_array self.window_function_kperp = window_function_kperp self.window_function_kpara = window_function_kpara if np.all(spw_array==self.spw_array): self.exact_windows = True # Add to history self.history = "Computed exact window functions [{}]\n{}\n{}\n{}".format(__version__, add_to_history, '-'*40, self.history) # Validity check self.check() else: return window_function_kperp, window_function_kpara, window_function_array
[docs] def check(self, just_meta=False): """ Run checks to make sure metadata and data arrays are properly defined and in the right format. Raises AssertionErrors if checks fail. Parameters ---------- just_meta : bool, optional If True, only check metadata. Default: False. """ # iterate over all possible parameters for p in self._all_params: # only enforce existance if not just_meta if not just_meta: if p in self._req_params: assert hasattr(self, p), \ "required parameter {} doesn't exist".format(p) # if attribute exists, check its type if hasattr(self, p): a = getattr(self, '_' + p) if hasattr(a, 'expected_type'): err_msg = "attribute {} does not have expected data type {}".format(p, a.expected_type) # ndarrays if p in self._ndarrays: assert isinstance(getattr(self, p), np.ndarray), \ "attribute {} needs to be an ndarray, {}".format(p, getattr(self, p).tostring()) if issubclass(getattr(self, p).dtype.type, a.expected_type): pass else: # try to cast into its dtype try: setattr(self, p, getattr(self, p).astype(a.expected_type)) except: raise ValueError(err_msg) # dicts elif p in self._dicts: assert isinstance(getattr(self, p), (dict, odict)), \ "attribute {} needs to be a dictionary".format(p) # iterate over keys for k in getattr(self, p).keys(): assert isinstance(getattr(self, p)[k], np.ndarray), \ "values of attribute {} need to be ndarrays".format(p) assert issubclass(getattr(self, p)[k].dtype.type, a.expected_type), err_msg # immutables elif p in self._immutables: if not isinstance(getattr(self, p), a.expected_type): try: setattr(self, p, a.expected_type(getattr(self, p))) except: raise AssertionError(err_msg) elif p in self._dicts_of_dicts: assert isinstance(getattr(self, p), (dict, odict)) for k in getattr(self, p).keys(): assert isinstance(getattr(self, p)[k], (dict, odict)) for j in getattr(self, p)[k].keys(): assert isinstance(getattr(self, p)[k][j], np.ndarray) try: getattr(self, p)[k][j] = a.expected_type(getattr(self, p)[k][j]) except: raise AssertionError(err_msg) # check spw convention assert set(self.spw_array) == set(np.arange(self.Nspws)), "spw_array must be np.arange(Nspws)" # check choice of window functions if self.exact_windows: assert hasattr(self, 'window_function_array') and hasattr(self, 'window_function_kperp'), \ "Error with window functions: object has exact_windows=True but no related arrays stored."
def _clear(self): """ Clear UVPSpec of all parameters. Warning: this cannot be undone. """ for p in self._all_params: if hasattr(self, p): delattr(self, p) def __str__(self): """ Output useful info about UVPSpec object. """ s = "" s += " ATTRIBUTES\n" s += "-"*12 + "\n" for k in self._meta_attrs: if hasattr(self, k) and 'history' not in k: y = getattr(self, k) if isinstance(y, np.ndarray): s += "%18s: shape %s\n" % (k, y.shape) else: s += "%18s: %s\n" % (k, y) s += "\n DATASETS\n" s += "-"*12 + "\n" for k in self._meta_dsets: if hasattr(self, k): s += "%18s: shape %s\n" % (k, getattr(self, k).shape) return s def __eq__(self, other, params=None, verbose=False): """ Check equivalence between attributes of two UVPSpec objects """ if params is None: params = self._all_params try: for p in params: if p not in self._req_params \ and (not hasattr(self, p) and not hasattr(other, p)): continue if p in self._immutables: assert getattr(self, p) == getattr(other, p) elif p in self._ndarrays: if issubclass(getattr(self, p).dtype.type, str): assert np.all(getattr(self, p) == getattr(other, p)) else: assert np.isclose(getattr(self, p), getattr(other, p)).all() elif p in self._dicts: for i in getattr(self, p): assert np.isclose(getattr(self, p)[i],\ getattr(other, p)[i]).all() except AssertionError: if verbose: print("UVPSpec parameter '{}' not equivalent between {} and {}" \ "".format(p, self.__repr__(), other.__repr__())) return False return True def __add__(self, other, verbose=False): """ Combine the data of two UVPSpec objects together along a single axis """ return combine_uvpspec([self, other], verbose=verbose) @property def units(self): """ Return power spectrum units. See self.vis_units and self.norm_units. """ return "({})^2 {}".format(self.vis_units, self.norm_units)
[docs] def generate_noise_spectra(self, spw, polpair, Tsys, blpairs=None, little_h=True, form='Pk', num_steps=2000, component='real', scalar=None): """ Generate the expected RMS noise power spectrum given a selection of spectral window, system temp. [K], and polarization. This estimate is constructed as: P_N = scalar * (Tsys * 1e3)^2 / (integration_time) / sqrt(Nincoherent) [mK^2 h^-3 Mpc^3] where scalar is the cosmological and beam scalar (i.e. X2Y * Omega_eff) calculated from pspecbeam with noise_scalar = True, integration_time is in seconds and comes from self.integration_array and Nincoherent is the number of incoherent averaging samples and comes from `self.nsample_array`. If `component` is `real` or `imag`, P_N is divided by an additional factor of sqrt(2). If the polarizations specified are pseudo Stokes pol (I, Q, U or V) then an extra factor of 2 is divided. If `form` is `DelSq` then a factor of `|k|^3 / (2pi^2)` is multiplied. If real is True, a factor of sqrt(2) is divided to account for discarding imaginary noise component. For more details, see hera_pspec.noise.Sensitivity.calc_P_N, and Cheng et al. 2018. The default units of P_N are mK^2 h^-3 Mpc^3 (if little_h=True and form='Pk'), but is different if those kwargs are altered. Parameters ---------- spw : int Spectral window index to generate noise curve for. polpair : tuple or int or str Polarization-pair selection in form of tuple (e.g. ('I', 'Q')) or polpair int. Strings are expanded into polarization pairs, e.g. 'XX' becomes ('XX,'XX'). Tsys : dictionary, float or array System temperature in Kelvin for each blpair. Key is blpair-integer, value is Tsys float or ndarray. If fed as an ndarray, shape=(Ntpairs,) blpairs : list List of unique blair tuples or i12 integers to calculate noise spectrum for default is to calculate for baseline pairs. little_h : bool, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc form : str, optional Form of power spectra, whetehr P(k) or Delta^2(k). Valid options are 'Pk' or 'DelSq'. Default: 'Pk'. num_steps : int, optional Number of frequency bins to use in integrating power spectrum scalar in pspecbeam. Default: 2000. component : str Options=['real', 'imag', 'abs']. If component is real or imag, divide by an extra factor of sqrt(2). scalar : float Optional noise scalar used to convert from Tsys to cosmological units output from pspecbeam.compute_pspec_scalar(...,noise_scalar=True) Default is None. If None provided, will calculate scalar in function. Returns ------- P_N : dict Dictionary containing blpair integers as keys and float ndarrays of noise power spectra as values, with ndarrays having shape (Ntimes, Ndlys). """ # Check for str polpair and convert to integer if isinstance(polpair, str): polpair = uvputils.polpair_tuple2int((polpair, polpair)) # Assert polarization-pair type if isinstance(polpair, tuple): polpair = uvputils.polpair_tuple2int(polpair) # Get polarization index polpair_ind = self.polpair_to_indices(polpair) # Calculate scalar if scalar is None: scalar = self.compute_scalar(spw, polpair, num_steps=num_steps, little_h=little_h, noise_scalar=True) # Get k vectors if form == 'DelSq': k_perp = self.get_kperps(spw, little_h=little_h) k_para = self.get_kparas(spw, little_h=little_h) k_mag = np.sqrt(k_perp[:, None]**2 + k_para[None, :]**2) # get blpairs if blpairs is None: blpairs = _ordered_unique(self.blpair_array) elif isinstance(blpairs[0], tuple): blpairs = [self.antnums_to_blpair(blp) for blp in blpairs] # Get delays dlys = self.get_dlys(spw) # handle Tsys if not isinstance(Tsys, (dict, odict)): if not isinstance(Tsys, np.ndarray): Tsys = np.ones(self.Ntpairs) * Tsys Tsys = dict([(blp, Tsys) for blp in blpairs]) # Iterate over blpairs to get P_N P_N = odict() for i, blp in enumerate(blpairs): # get indices inds = self.blpair_to_indices(blp) assert isinstance(Tsys[blp], (float, int)) \ or Tsys[blp].shape[0] == self.Ntpairs, \ "Tsys must be a float or an ndarray with shape[0] == Ntpairs" P_blp = [] # iterate over time axis for j, ind in enumerate(inds): # get integration time and n_samp t_int = self.integration_array[spw][ind, polpair_ind] n_samp = self.nsample_array[spw][ind, polpair_ind] # get kvecs if form == 'DelSq': k = k_mag[ind] else: k = None # Get noise power spectrum pn = noise.calc_P_N(scalar, Tsys[blp][j], t_int, k=k, Nincoherent=n_samp, form=form, component=component) # Put into appropriate form if form == 'Pk': pn = np.ones(len(dlys), float) * pn # append to P_blp P_blp.append(pn) P_N[blp] = np.array(P_blp) return P_N
[docs] def average_spectra(self, blpair_groups=None, time_avg=False, blpair_weights=None, error_field=None, error_weights=None, inplace=True, add_to_history=''): """ Average power spectra across the baseline-pair-time axis, weighted by each spectrum's integration time. This is an "incoherent" average, in the sense that this averages power spectra, rather than visibility data. The `nsample_array` and `integration_array` will be updated to reflect the averaging. In the case of averaging across baseline pairs, the resultant averaged spectrum is assigned to the zeroth blpair in the group. In the case of time averaging, the time and LST arrays are assigned to the mean of the averaging window. Note that this is designed to be separate from spherical binning in k: here we are not connecting k_perp modes to k_para modes. However, if blpairs holds groups of iso baseline separation, then this is equivalent to cylindrical binning in 3D k-space. If you want help constructing baseline-pair groups from baseline groups, see `self.get_blpair_groups_from_bl_groups`. Parameters ---------- blpair_groups : list List of list of baseline-pair group tuples or integers. All power spectra in a baseline-pair group are averaged together. If a baseline-pair exists in more than one group, a warning is raised. Examples:: blpair_groups = [ [((1, 2), (1, 2)), ((2, 3), (2, 3))], [((4, 6), (4, 6))]] blpair_groups = [ [1002001002, 2003002003], [4006004006] ] time_avg : bool, optional If True, average power spectra across the time axis. Default: False. blpair_weights : list, optional List of float or int weights dictating the relative weight of each baseline-pair when performing the average. This is useful for bootstrapping. This should have the same shape as blpair_groups if specified. The weights are automatically normalized within each baseline-pair group. Default: None (all baseline pairs have unity weights). error_field: string or list, optional If errorbars have been entered into stats_array, will do a weighted sum to shrink the error bars down to the size of the averaged data_array. Error_field strings be keys of stats_array. If list, does this for every specified key. Every stats_array key that is not specified is thrown out of the new averaged object. error_weights: string, optional error_weights specify which kind of errors we use for weights during averaging power spectra. The weights are defined as $w_i = 1/ sigma_i^2$, where $sigma_i$ is taken from the relevant field of stats_array. If `error_weight` is set to `None`, which means we just use the integration time as weights. If `error_weights` is specified, then it also gets appended to `error_field` as a list. Default: None. inplace : bool, optional If True, edit data in self, else make a copy and return. Default: True. add_to_history : str, optional Added text to add to file history. Notes ----- Currently, every baseline-pair in a blpair group must have the same Ntpairs, unless time_avg=True. Future versions may support baseline-pair averaging of heterogeneous time arrays. This includes the scenario of repeated blpairs (e.g. in bootstrapping), which will return multiple copies of their time_array. """ if inplace: grouping.average_spectra(self, blpair_groups=blpair_groups, time_avg=time_avg, error_field=error_field, error_weights=error_weights, blpair_weights=blpair_weights, inplace=True, add_to_history=add_to_history) else: return grouping.average_spectra(self, blpair_groups=blpair_groups, time_avg=time_avg, error_field=error_field, error_weights=error_weights, blpair_weights=blpair_weights, inplace=False, add_to_history=add_to_history)
[docs] def fold_spectra(self): """ Average bandpowers from matching positive and negative delay bins onto a purely positive delay axis. Negative delay bins are still populated, but are filled with zero power. This is an in-place operation. Will only work if self.folded == False, i.e. data is currently unfolded across negative and positive delay. Because this averages the data, the nsample array is multiplied by a factor of 2. WARNING: This operation cannot be undone. """ grouping.fold_spectra(self)
[docs] def get_blpair_groups_from_bl_groups(self, blgroups, only_pairs_in_bls=False): """ Get baseline pair matches from a list of baseline groups. Parameters ---------- blgroups : list List of baseline groups, which themselves are lists of baseline tuples or baseline i6 integers. Ex: [ [(1, 2), (2, 3), (3, 4)], [(1, 4), (2, 5)] ] only_pairs_in_bls : bool, optional If True, select only baseline-pairs whose first _and_ second baseline are both found in each baseline group. Default: False. Returns ------- blpair_groups : list List of blpair groups, which themselves are lists of blpair integers. """ blpair_groups = [] for blg in blgroups: blp_select = uvputils._get_blpairs_from_bls(self, blg, only_pairs_in_bls=only_pairs_in_bls) blp = sorted(set(self.blpair_array[blp_select])) if len(blp) > 0: blpair_groups.append(blp) return blpair_groups
def get_red_bls(self, bl_len_tol=1., bl_ang_tol=1.): return uvputils._get_red_bls(self, bl_len_tol=bl_len_tol, bl_ang_tol=bl_ang_tol) def get_red_blpairs(self, bl_len_tol=1., bl_ang_tol=1.): return uvputils._get_red_blpairs(self, bl_len_tol=bl_len_tol, bl_ang_tol=bl_ang_tol)
[docs] def compute_scalar(self, spw, polpair, num_steps=1000, little_h=True, noise_scalar=False): """ Compute power spectrum normalization scalar given an adopted cosmology and a beam model. See pspecbeam.PSpecBeamBase.compute_pspec_scalar for details. Parameters ---------- spw : integer Spectral window selection. polpair : tuple or int or str Which polarization pair to calculate the scalar for. num_steps : int, optional Number of integration bins along frequency in computing scalar. Default: 1000. little_h : bool, optional Whether to use h^-1 units or not. Default: True. noise_scalar : bool, optional If True calculate noise pspec scalar, else calculate normal pspec scalar. See pspecbeam.py for difference between normal scalar and noise scalar. Default: False. Returns ------- scalar : float Power spectrum normalization scalar. """ # make assertions assert hasattr(self, 'cosmo'), \ "self.cosmo object must exist to compute scalar. See self.set_cosmology()" assert hasattr(self, 'OmegaP') and hasattr(self, "OmegaPP") and hasattr(self, "beam_freqs"), "self.OmegaP, "\ "self.OmegaPP and self.beam_freqs must exist to compute scalar." # get freq array of selected spw spw_freqs = self.freq_array[self.spw_to_freq_indices(spw)] # compute scalar OP = self.OmegaP[:, self.polpair_to_indices(polpair)].squeeze() OPP = self.OmegaPP[:, self.polpair_to_indices(polpair)].squeeze() scalar = pspecbeam._compute_pspec_scalar(self.cosmo, self.beam_freqs, OPP / OP**2, spw_freqs, num_steps=num_steps, taper=self.taper, little_h=little_h, noise_scalar=noise_scalar) return scalar
def combine_uvpspec(uvps, merge_history=True, verbose=True): """ Combine (concatenate) multiple UVPSpec objects into a single object, combining along one of either spectral window [spw], baseline-pair-times [blpairts], or polarization [pol]. Certain meta-data of all of the UVPSpec objs must match exactly, see get_uvp_overlap for details. In addition, one can only combine data along a single data axis, with the condition that all other axes match exactly. Parameters ---------- uvps : list A list of UVPSpec objects to combine. merge_history : bool If True, merge all histories. Else use zeroth object's history. Returns ------- u : UVPSpec object A UVPSpec object with the data of all the inputs combined. """ # Perform type checks and get concatenation axis (uvps, concat_ax, new_spws, new_blpts, new_polpairs, static_meta) = get_uvp_overlap(uvps, just_meta=False, verbose=verbose) Nuvps = len(uvps) # Create a new uvp u = UVPSpec() # Sort new data axes new_spws = sorted(new_spws) new_blpts = sorted(new_blpts) new_polpairs = [new_polpairs[i] for i in np.argsort(np.abs(new_polpairs))] Nspws = len(new_spws) Nbltpairs = len(new_blpts) Ntpairs = len(set([(t1, t2) for blp, t1, t2 in new_blpts])) Npols = len(new_polpairs) # Store optional attrs only if all uvps have them store_cov = np.all([hasattr(uvp, 'cov_array_real') for uvp in uvps]) store_window = np.all([hasattr(uvp, 'window_function_array') for uvp in uvps]) exact_windows = np.all([uvp.exact_windows for uvp in uvps]) store_stats = np.all([hasattr(uvp, 'stats_array') for uvp in uvps]) # Create new empty data arrays and fill spw arrays u.data_array = odict() u.integration_array = odict() u.wgt_array = odict() u.nsample_array = odict() if store_window: u.window_function_array = odict() if exact_windows: u.window_function_kperp = odict() u.window_function_kpara = odict() if store_cov: # ensure cov model is the same for all uvps if len(set([uvp.cov_model for uvp in uvps])) > 1: store_cov = False else: u.cov_array_real = odict() u.cov_array_imag = odict() u.cov_model = uvps[0].cov_model if store_stats: # get shared stats keys stored_stats = [set(uvp.stats_array.keys()) for uvp in uvps] for i in range(len(stored_stats)): stored_stats[0].intersection_update(stored_stats[i]) stored_stats = stored_stats[0] # if nothing in common, set to False if len(stored_stats) == 0: store_stats = False else: u.stats_array = odict([(stat, odict()) for stat in stored_stats]) u.scalar_array = np.empty((Nspws, Npols), float) u.freq_array, u.spw_array, u.dly_array = [], [], [] u.spw_dly_array, u.spw_freq_array = [], [] # Loop over new spectral windows and setup arrays for i, spw in enumerate(new_spws): # Initialize new arrays u.data_array[i] = np.empty((Nbltpairs, spw[3], Npols), np.complex128) u.integration_array[i] = np.empty((Nbltpairs, Npols), np.float64) u.wgt_array[i] = np.empty((Nbltpairs, spw[2], 2, Npols), np.float64) # spw[2] == Nfreqs (wgt_array is not resampled if Ndlys != Nfreqs, # so needs to keep this shape) u.nsample_array[i] = np.empty((Nbltpairs, Npols), np.float64) if store_window: if exact_windows: Nkperp = uvps[0].window_function_kperp[i][:, 0].size Nkpara = uvps[0].window_function_kpara[i][:, 0].size u.window_function_array[i] = np.empty((Nbltpairs, spw[3], Nkperp, Nkpara, Npols), np.float64) else: u.window_function_array[i] = np.empty((Nbltpairs, spw[3], spw[3], Npols), np.float64) if store_cov: u.cov_array_real[i] = np.empty((Nbltpairs, spw[3], spw[3], Npols), np.float64) u.cov_array_imag[i] = np.empty((Nbltpairs, spw[3], spw[3], Npols), np.float64) if store_stats: for stat in stored_stats: u.stats_array[stat][i] = np.empty((Nbltpairs, spw[3], Npols), np.complex128) # Set frequencies and delays: if concat_ax == 'spw' this is changed below # assumes spw metadata are the same for all uvps u.spw_array = uvps[0].spw_array.copy() u.spw_freq_array = uvps[0].spw_freq_array.copy() u.spw_dly_array = uvps[0].spw_dly_array.copy() u.freq_array = uvps[0].freq_array.copy() u.dly_array = uvps[0].dly_array.copy() u.Nfreqs = len(np.unique(u.freq_array)) u.Nspwfreqs = len(u.spw_freq_array) u.Ndlys = len(np.unique(u.dly_array)) u.Nspwdlys = len(u.spw_dly_array) # other metadata u.polpair_array = np.array(new_polpairs) # Number of spectral windows, delays etc. u.Nspws = Nspws u.Nbltpairs = Nbltpairs u.Npols = Npols # Prepare time and label arrays u.time_1_array, u.time_2_array = np.empty(Nbltpairs, np.float64), \ np.empty(Nbltpairs, np.float64) u.time_avg_array, u.lst_avg_array = np.empty(Nbltpairs, np.float64), \ np.empty(Nbltpairs, np.float64) u.lst_1_array, u.lst_2_array = np.empty(Nbltpairs, np.float64), \ np.empty(Nbltpairs, np.float64) u.blpair_array = np.empty(Nbltpairs, np.int64) u.labels = sorted(set(np.concatenate([uvp.labels for uvp in uvps]))) u.label_1_array = np.empty((Nspws, Nbltpairs, Npols), np.int32) u.label_2_array = np.empty((Nspws, Nbltpairs, Npols), np.int32) # get each uvp's data axes uvp_spws = [_uvp.get_spw_ranges() for _uvp in uvps] uvp_blpts = [list(zip(_uvp.blpair_array, _uvp.time_1_array, _uvp.time_2_array)) for _uvp in uvps] uvp_polpairs = [_uvp.polpair_array.tolist() for _uvp in uvps] # Construct dict of label indices, to be used for re-ordering later u_lbls = {lbl: ll for ll, lbl in enumerate(u.labels)} # Concatenate r_params arrays #check that r_params are either all empty or all full. _r_param_strs = [_uvp.r_params for _uvp in uvps] if '' in _r_param_strs: if not np.all(np.asarray([rp == '' for rp in _r_param_strs])): raise ValueError("All r_params must be set or empty." "Combining empty with non-empty r_params" "is not yet supported.") _r_params = [_uvp.get_r_params() for _uvp in uvps] #check for conflicts by iterating through each key in the first _uvp, store #in list of new keys. r_params = {} for _r_param in _r_params: for rkey in _r_param: if not rkey in r_params: r_params[rkey] = _r_param[rkey] elif r_params[rkey] != _r_param[rkey]: #For now, we won't support inconsistent weightings. raise ValueError("Conflict between weightings" "Only consistent weightings are supported!") # fill in data arrays depending on concat ax if concat_ax == 'spw': u.spw_array = np.arange(Nspws, dtype=np.int32) freq_array, dly_array = [], [] spw_freq_array, spw_dly_array = [], [] # Concatenate spectral windows for i, spw in enumerate(new_spws): # get index of this new spw in uvps and its spw index l = [spw in _u for _u in uvp_spws].index(True) m = [spw == _spw for _spw in uvp_spws[l]].index(True) # freq metadata spw_freq_inds = np.where(uvps[l].spw_freq_array == m)[0] freq_array.extend(uvps[l].freq_array[spw_freq_inds]) spw_freq_array.extend(np.ones(len(spw_freq_inds), dtype=np.int32) * i) # dly metadata spw_dly_inds = np.where(uvps[l].spw_dly_array == m)[0] dly_array.extend(uvps[l].dly_array[spw_dly_inds]) spw_dly_array.extend(np.ones(len(spw_dly_inds), dtype=np.int32) * i) # Lookup indices of new_blpts in the uvp_blpts[l] array blpts_idxs = uvputils._fast_lookup_blpairts(uvp_blpts[l], new_blpts) if i == 0: blpts_idxs0 = blpts_idxs.copy() # Loop over polarizations for k, p in enumerate(new_polpairs): q = uvp_polpairs[l].index(p) u.scalar_array[i, k] = uvps[l].scalar_array[m, q] # Loop over blpair-times for j, blpt in enumerate(new_blpts): # get blpts indices n = blpts_idxs[j] # Data/weight/integration arrays u.data_array[i][j, :, k] = uvps[l].data_array[m][n, :, q] u.wgt_array[i][j, :, :, k] = uvps[l].wgt_array[m][n, :, :, q] u.integration_array[i][j, k] = uvps[l].integration_array[m][n, q] u.nsample_array[i][j, k] = uvps[l].nsample_array[m][n, q] # Labels lbl1 = uvps[l].label_1_array[m, n, q] lbl2 = uvps[l].label_2_array[m, n, q] u.label_1_array[i, j, k] = u_lbls[uvps[l].labels[lbl1]] u.label_2_array[i, j, k] = u_lbls[uvps[l].labels[lbl2]] if store_cov: u.cov_array_real[i][j, :, :, k] = uvps[l].cov_array_real[m][n, :, :, q] u.cov_array_imag[i][j, :, :, k] = uvps[l].cov_array_imag[m][n, :, :, q] if store_window: if exact_windows: u.window_function_array[i][j,:, :, :,k] = uvps[l].window_function_array[m][n, :, :, :, q] else: u.window_function_array[i][j,:,:,k] = uvps[l].window_function_array[m][n, :, :, q] if store_stats: for stat in stored_stats: u.stats_array[stat][i][j, :, k] = uvps[l].stats_array[stat][m][n, :, q] u.freq_array = np.array(freq_array) u.dly_array = np.array(dly_array) u.spw_freq_array = np.array(spw_freq_array) u.spw_dly_array = np.array(spw_dly_array) u.Nfreqs = len(np.unique(u.freq_array)) u.Nspwfreqs = len(u.spw_freq_array) u.Ndlys = len(np.unique(u.dly_array)) u.Nspwdlys = len(u.spw_dly_array) # Populate new LST, time, and blpair arrays for j, blpt in enumerate(new_blpts): n = blpts_idxs0[j] u.time_1_array[j] = uvps[0].time_1_array[n] u.time_2_array[j] = uvps[0].time_2_array[n] u.time_avg_array[j] = uvps[0].time_avg_array[n] u.lst_1_array[j] = uvps[0].lst_1_array[n] u.lst_2_array[j] = uvps[0].lst_2_array[n] u.lst_avg_array[j] = uvps[0].lst_avg_array[n] u.blpair_array[j] = uvps[0].blpair_array[n] elif concat_ax == 'blpairts': is_in = [uvputils._is_in(_blpts, new_blpts) for _blpts in uvp_blpts] # Concatenate blpair-times for j, blpt in enumerate(new_blpts): l = [isn[j] for isn in is_in].index(True) n = uvp_blpts[l].index(blpt) # Loop over spectral windows for i, spw in enumerate(new_spws): m = [spw == _spw for _spw in uvp_spws[l]].index(True) # Loop over polarizations for k, p in enumerate(new_polpairs): q = uvp_polpairs[l].index(p) u.data_array[i][j, :, k] = uvps[l].data_array[m][n, :, q] u.wgt_array[i][j, :, :, k] = uvps[l].wgt_array[m][n, :, :, q] u.integration_array[i][j, k] = uvps[l].integration_array[m][n, q] u.nsample_array[i][j, k] = uvps[l].nsample_array[m][n, q] if store_window: if exact_windows: u.window_function_array[i][j, :, :, :, k] = uvps[l].window_function_array[m][n, :, :, :, q] else: u.window_function_array[i][j, :, :, k] = uvps[l].window_function_array[m][n, :, :, q] if store_cov: u.cov_array_real[i][j, :, :, k] = uvps[l].cov_array_real[m][n, :, :, q] u.cov_array_imag[i][j, :, :, k] = uvps[l].cov_array_imag[m][n, :, :, q] if store_stats: for stat in stored_stats: u.stats_array[stat][i][j, :, k] = uvps[l].stats_array[stat][m][n, :, q] # Labels lbl1 = uvps[l].label_1_array[m, n, q] lbl2 = uvps[l].label_2_array[m, n, q] u.label_1_array[i, j, k] = u_lbls[uvps[l].labels[lbl1]] u.label_2_array[i, j, k] = u_lbls[uvps[l].labels[lbl2]] # scalar array, only do once per spw and pol if j == 0: u.scalar_array[i, k] = uvps[l].scalar_array[m, q] # Populate new LST, time, and blpair arrays u.time_1_array[j] = uvps[l].time_1_array[n] u.time_2_array[j] = uvps[l].time_2_array[n] u.time_avg_array[j] = uvps[l].time_avg_array[n] u.lst_1_array[j] = uvps[l].lst_1_array[n] u.lst_2_array[j] = uvps[l].lst_2_array[n] u.lst_avg_array[j] = uvps[l].lst_avg_array[n] u.blpair_array[j] = uvps[l].blpair_array[n] elif concat_ax == 'polpairs': # Concatenate polarizations for k, p in enumerate(new_polpairs): l = [p in _polpairs for _polpairs in uvp_polpairs].index(True) q = uvp_polpairs[l].index(p) # Get mapping of blpair-time indices between old UVPSpec objects # and the new one blpts_idxs = uvputils._fast_lookup_blpairts(uvp_blpts[l], new_blpts) if k == 0: blpts_idxs0 = blpts_idxs.copy() # Loop over spectral windows for i, spw in enumerate(new_spws): m = [spw == _spw for _spw in uvp_spws[l]].index(True) u.scalar_array[i,k] = uvps[l].scalar_array[m,q] # Loop over blpair-times for j, blpt in enumerate(new_blpts): n = blpts_idxs[j] u.data_array[i][j, :, k] = uvps[l].data_array[m][n, :, q] if store_window: if exact_windows: u.window_function_array[i][j, :, :, :, k] = uvps[l].window_function_array[m][n, :, :, :, q] else: u.window_function_array[i][j, :, :, k] = uvps[l].window_function_array[m][n, :, :, q] if store_cov: u.cov_array_real[i][j, :, :, k] = uvps[l].cov_array_real[m][n, :, :, q] u.cov_array_imag[i][j, :, :, k] = uvps[l].cov_array_imag[m][n, :, :, q] if store_stats: for stat in stored_stats: u.stats_array[stat][i][j, :, k] = uvps[l].stats_array[stat][m][n, :, q] u.wgt_array[i][j, :, :, k] = uvps[l].wgt_array[m][n, :, :, q] u.integration_array[i][j, k] = uvps[l].integration_array[m][n, q] u.nsample_array[i][j, k] = uvps[l].nsample_array[m][n, q] # Labels lbl1 = uvps[l].label_1_array[m, n, q] lbl2 = uvps[l].label_2_array[m, n, q] u.label_1_array[i, j, k] = u_lbls[uvps[l].labels[lbl1]] u.label_2_array[i, j, k] = u_lbls[uvps[l].labels[lbl2]] for j, blpt in enumerate(new_blpts): n = blpts_idxs0[j] u.time_1_array[j] = uvps[0].time_1_array[n] u.time_2_array[j] = uvps[0].time_2_array[n] u.time_avg_array[j] = uvps[0].time_avg_array[n] u.lst_1_array[j] = uvps[0].lst_1_array[n] u.lst_2_array[j] = uvps[0].lst_2_array[n] u.lst_avg_array[j] = uvps[0].lst_avg_array[n] u.blpair_array[j] = uvps[0].blpair_array[n] else: # Make sure we have properly identified the concat_ax raise ValueError("concat_ax {} not recognized.".format(concat_ax)) # Set baselines u.Nblpairs = len(np.unique(u.blpair_array)) uvp_bls = [uvp.bl_array for uvp in uvps] new_bls = np.unique(np.hstack(uvp_bls)) u.bl_array = np.array(new_bls) u.Nbls = len(u.bl_array) u.bl_vecs = [] u.Ntpairs = Ntpairs u.Nbltpairs = Nbltpairs for b, bl in enumerate(new_bls): l = [bl in _bls for _bls in uvp_bls].index(True) h = [bl == _bl for _bl in uvp_bls[l]].index(True) u.bl_vecs.append(uvps[l].bl_vecs[h]) u.bl_vecs = np.array(u.bl_vecs) u.Ntimes = len(np.unique(np.hstack([u.time_1_array, u.time_2_array]))) if merge_history: u.history = "".join([uvp.history for uvp in uvps]) else: u.history = uvps[0].history u.labels = np.array(u.labels, str) u.r_params = uvputils.compress_r_params(r_params) for k in static_meta.keys(): setattr(u, k, static_meta[k]) # Run check to make sure the new UVPSpec object is valid u.check() return u def get_uvp_overlap(uvps, just_meta=True, verbose=True): """ Given a list of UVPSpec objects or a list of paths to UVPSpec objects, find a single data axis within ['spw', 'blpairts', 'pol'] where *all* uvpspec objects contain non-overlapping data. Overlapping data are delay spectra that have identical spw, blpair-time and pol metadata between each other. If two uvps are completely overlapping (i.e. there is not non-overlapping data axis) an error is raised. If there are multiple non-overlapping data axes between all uvpspec pairs in uvps, an error is raised. All uvpspec objects must have certain attributes that agree exactly. These include: 'channel_width', 'telescope_location', 'weighting', 'OmegaP', 'beam_freqs', 'OmegaPP', 'beamfile', 'norm', 'taper', 'vis_units', 'norm_units', 'folded', 'cosmo', 'scalar' Parameters ---------- uvps : list List of UVPSpec objects or list of string paths to UVPSpec objects just_meta : boolean, optional If uvps is a list of strings, when loading-in each uvpspec, only load its metadata. verbose : bool, optional print feedback to standard output Returns ------- uvps : list List of input UVPSpec objects concat_ax : str Data axis ['spw', 'blpairts', 'pols'] across data can be concatenated unique_spws : list List of unique spectral window tuples (spw_freq_start, spw_freq_end, spw_Nfreqs, spw_Ndlys) across all input uvps unique_blpts : list List of unique baseline-pair-time tuples (blpair_integer, time_1_array JD float, time_2_array JD float) across all input uvps unique_polpairs : list List of unique polarization-pair integers across all input uvps """ # type check assert isinstance(uvps, (list, tuple, np.ndarray)), \ "uvps must be fed as a list" assert isinstance(uvps[0], (UVPSpec, str)), \ "uvps must be fed as a list of UVPSpec objects or strings" Nuvps = len(uvps) # load uvps if fed as strings if isinstance(uvps[0], str): _uvps = [] for u in uvps: uvp = UVPSpec() uvp.read_hdf5(u, just_meta=just_meta) _uvps.append(uvp) uvps = _uvps # ensure static metadata agree between all objects static_meta = ['channel_width', 'telescope_location', 'weighting', 'OmegaP', 'beam_freqs', 'OmegaPP', 'beamfile', 'norm', 'taper', 'vis_units', 'norm_units', 'folded', 'cosmo', 'exact_windows'] for m in static_meta: for u in uvps[1:]: if hasattr(uvps[0], m) and hasattr(u, m): assert uvps[0].__eq__(u, params=[m]), \ "Cannot concatenate UVPSpec objs: not all agree on '{}' attribute".format(m) else: assert not hasattr(uvps[0], m) and not hasattr(u, m), \ "Cannot concatenate UVPSpec objs: not all agree on '{}' attribute".format(m) # get static metadata values static_meta = odict([(k, getattr(uvps[0], k, None)) for k in static_meta if getattr(uvps[0], k, None) is not None]) # create unique data axis lists unique_spws = [] unique_blpts = [] unique_polpairs = [] data_concat_axes = odict() blpts_comb = [] # Combined blpair + time # find unique items for uvp1 in uvps: for s in uvp1.get_spw_ranges(): if s not in unique_spws: unique_spws.append(s) for p in uvp1.polpair_array: if p not in unique_polpairs: unique_polpairs.append(p) uvp1_blpts_comb = [(blp, t1, t2) for blp, t1, t2 in zip(uvp1.blpair_array, uvp1.time_1_array, uvp1.time_2_array)] blpts_comb.extend(uvp1_blpts_comb) # Old way using np.unique and complex coding did not support # two times. unique_blpts = list(set(blpts_comb)) # iterate over uvps for i, uvp1 in enumerate(uvps): # get uvp1 sets and append to unique lists uvp1_spws = uvp1.get_spw_ranges() uvp1_blpts = list(zip(uvp1.blpair_array, uvp1.time_1_array, uvp1.time_2_array)) uvp1_polpairs = uvp1.polpair_array # iterate over uvps for j, uvp2 in enumerate(uvps): if j <= i: continue # get uvp2 sets uvp2_spws = uvp2.get_spw_ranges() uvp2_blpts = list(zip(uvp2.blpair_array, uvp2.time_1_array, uvp2.time_2_array)) uvp2_polpairs = uvp2.polpair_array # determine if uvp1 and uvp2 are an identical match spw_match = len(set(uvp1_spws) ^ set(uvp2_spws)) == 0 blpts_match = len(set(uvp1_blpts) ^ set(uvp2_blpts)) == 0 polpair_match = len(set(uvp1_polpairs) ^ set(uvp2_polpairs)) == 0 # ensure no partial-overlaps if not spw_match: assert len(set(uvp1_spws) & set(uvp2_spws)) == 0, \ "uvp {} and {} have partial overlap across spw, cannot combine".format(i, j) if not blpts_match: assert len(set(uvp1_blpts) & set(uvp2_blpts)) == 0, \ "uvp {} and {} have partial overlap across blpairts, cannot combine".format(i, j) if not polpair_match: assert len(set(uvp1_polpairs) & set(uvp2_polpairs)) == 0, \ "uvp {} and {} have partial overlap across pol-pair, cannot combine".format(i, j) # assert all except 1 axis overlaps matches = [spw_match, blpts_match, polpair_match] assert sum(matches) != 3, \ "uvp {} and {} have completely overlapping data, cannot combine".format(i, j) assert sum(matches) > 1, \ "uvp {} and {} are non-overlapping across multiple data axes, cannot combine".format(i, j) concat_ax = ['spw', 'blpairts', 'polpairs'][matches.index(False)] data_concat_axes[(i, j)] = concat_ax if verbose: print("uvp {} and {} are concatable across {} axis".format(i, j, concat_ax)) # assert all uvp pairs have the same (single) non-overlap (concat) axis err_msg = "Non-overlapping data in uvps span multiple data axes:\n{}" \ "".format("\n".join( ["{} & {}: {}".format(i[0][0],i[0][1],i[1]) for i in data_concat_axes.items()] )) assert len(set(data_concat_axes.values())) == 1, err_msg # perform additional checks given concat ax if concat_ax == 'blpairts': # check scalar_array assert np.all([np.isclose(uvps[0].scalar_array, u.scalar_array) for u in uvps[1:]]), \ "scalar_array must be the same for all uvps given " \ "concatenation along blpairts." return uvps, concat_ax, unique_spws, unique_blpts, \ unique_polpairs, static_meta def _ordered_unique(arr): """ Get the unique elements of an array while preserving order. """ arr = np.asarray(arr) _, idx = np.unique(arr, return_index=True) return arr[np.sort(idx)]