import numpy as np
from collections import OrderedDict as odict
import os, copy, shutil, operator, ast, fnmatch
from pyuvdata import utils as uvutils
import h5py
import warnings
import json
from . import conversions, noise, version, __version__, pspecbeam, grouping, utils, uvpspec_utils as uvputils
from .parameter import PSpecParam
from .uvwindow import UVWindow
[docs]class UVPSpec(object):
"""
An object for storing power spectra generated by hera_pspec and a file-format
for its data and meta-data.
"""
[docs] def __init__(self):
"""
An object for storing power spectra and associated metadata generated
by hera_pspec.
"""
# Summary attributes
self._Ntimes = PSpecParam("Ntimes", description="Number of unique times.", expected_type=int)
self._Ntpairs = PSpecParam("Ntpairs", description="Number of unique time-pairs.", expected_type=int)
self._Nbltpairs = PSpecParam("Nbltpairs", description="Total number of baseline-time-pairs.", expected_type=int)
self._Nblpairs = PSpecParam("Nblpairs", description="Total number of baseline-pairs.", expected_type=int)
self._Nspwdlys = PSpecParam("Nspwdlys", description="Total number of delay bins across all spectral windows.", expected_type=int)
self._Nspwfreqs = PSpecParam("Nspwfreqs", description="Total number of frequency bins across all spectral windows.", expected_type=int)
self._Nspws = PSpecParam("Nspws", description="Number of unique spectral windows.", expected_type=int)
self._Ndlys = PSpecParam("Ndlys", description="Number of unique delay bins.", expected_type=int)
self._Nfreqs = PSpecParam("Nfreqs", description="Number of unique frequency bins in the data.", expected_type=int)
self._Npols = PSpecParam("Npols", description="Number of polarizations in the data.", expected_type=int)
self._history = PSpecParam("history", description='The file history.', expected_type=str)
desc = ('A compressed json string version of the r_params dictionary of pspecdata object.'
'uvpspec only stores each unique weighting along with a list of unique baselines'
'corresponding to that weighting.')
self._r_params = PSpecParam("r_params", description = desc, expected_type = str)
# Data attributes
desc = "A string indicating the covariance model of cov_array. Options are ['dsets', 'empirical']. See PSpecData.pspec() for details."
self._cov_model = PSpecParam("cov_model", description=desc, expected_type=str)
desc = "A boolean indicating if the window functions stored in window_function_array are exact or not."
self._exact_windows = PSpecParam("exact_windows", description=desc, expected_type=bool)
desc = "Power spectrum data dictionary with spw integer as keys and values as complex ndarrays."
self._data_array = PSpecParam("data_array", description=desc, expected_type=np.complex128, form="(Nbltpairs, spw_Ndlys, Npols)")
desc = "Power spectrum covariance dictionary with spw integer as keys and values as float ndarrays, stored separately for real and imaginary parts."
self._cov_array_real = PSpecParam("cov_array_real", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)")
self._cov_array_imag = PSpecParam("cov_array_imag", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)")
desc = "Window function dictionary of bandpowers."
self._window_function_array = PSpecParam("window_function_array", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)")
desc = "Dictionary of bandpowers given the kperp grid used to compute the window functions."
self._window_function_kperp = PSpecParam("window_function_kperp", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)")
desc = "Dictionary of bandpowers given the kparallel grid used to compute the window functions."
self._window_function_kpara = PSpecParam("window_function_kpara", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Ndlys, spw_Ndlys, Npols)")
desc = "Weight dictionary for original two datasets. The second axis holds [dset1_wgts, dset2_wgts] in that order."
self._wgt_array = PSpecParam("wgt_array", description=desc, expected_type=np.float64, form="(Nbltpairs, spw_Nfreqs, 2, Npols)")
desc = "Integration time dictionary. This holds the average integration time [seconds] of each delay spectrum in the data. " \
"This is not necessarily equal to the integration time of the visibility data: If data have been coherently averaged " \
"(i.e. averaged before squaring), than this is the sum of each spectrum's integration time."
self._integration_array = PSpecParam("integration_array", description=desc, expected_type=np.float64, form="(Nbltpairs, Npols)")
desc = "Nsample dictionary, if the pspectra have been incoherently averaged (i.e. averaged after squaring), this is " \
"the effective number of samples in that average (float type). This is not the same as the pyuvdata.UVData nsample_array."
self._nsample_array = PSpecParam("nsample_array", description=desc, expected_type=np.float64, form="(Nbltpairs, Npols)")
desc = ("Power spectrum stats array with stats type and spw integer as keys and values as complex ndarrays with same shape"
" as data_array")
self._stats_array = PSpecParam("stats_array", description=desc, expected_type=np.complex128, form="(Nbltpairs, Ndlys, Npols)")
self._spw_array = PSpecParam("spw_array", description="Integer array holding unique spectral windows.", form="(Nspws,)", expected_type=np.uint16)
self._spw_dly_array = PSpecParam("spw_dly_array", description="Spw integer array for the dly_array.", form="(Nspwdlys,)", expected_type=np.uint16)
self._spw_freq_array = PSpecParam("spw_freq_array", description="Spw integer array for the freq_array.", form="(Nspwfreqs,)", expected_type=np.uint16)
self._freq_array = PSpecParam("freq_array", description="Frequency array of the original data in Hz.", form="(Nfreqs,)", expected_type=np.float64)
self._dly_array = PSpecParam("dly_array", description="Delay array in seconds.", form="(Nspwdlys,)", expected_type=np.float64)
desc = "Polarization pair integer, made up of two polarization integers concatenated in a standardized way."
self._polpair_array = PSpecParam("polpair_array", description=desc, form="(Npols,)", expected_type=np.int32)
self._lst_1_array = PSpecParam("lst_1_array", description="LST array of the first bl in the bl-pair [radians].", form="(Nbltpairs,)", expected_type=np.float64)
self._lst_2_array = PSpecParam("lst_2_array", description="LST array of the second bl in the bl-pair [radians].", form="(Nbltpairs,)", expected_type=np.float64)
self._lst_avg_array = PSpecParam("lst_avg_array", description="Average of the lst_1_array and lst_2_array [radians].", form="(Nbltpairs,)", expected_type=np.float64)
self._time_1_array = PSpecParam("time_1_array", description="Time array of the first bl in the bl-pair [Julian Date].", form="(Nbltpairs,)", expected_type=np.float64)
self._time_1_array = PSpecParam("time_2_array", description="Time array of the second bl in the bl-pair [Julian Date].", form="(Nbltpairs,)", expected_type=np.float64)
self._time_avg_array = PSpecParam("time_avg_array", description="Average of the time_1_array and time_2_array [Julian Date].", form='(Nbltpairs,)', expected_type=np.float64)
self._blpair_array = PSpecParam("blpair_array", description="Baseline-pair integer for all baseline-pair times.", form="(Nbltpairs,)", expected_type=np.int64)
self._scalar_array = PSpecParam("scalar_array", description="Power spectrum normalization scalar from pspecbeam module.", expected_type=np.float64, form="(Nspws, Npols)")
# Baseline attributes
self._Nbls = PSpecParam("Nbls", description="Number of unique baseline integers.", expected_type=int)
self._bl_vecs = PSpecParam("bl_vecs", description="ndarray of baseline separation vectors in the ITRF frame [meters]. To get it in ENU frame see self.get_ENU_bl_vecs().", expected_type=np.float64, form="(Nbls,)")
self._bl_array = PSpecParam("bl_array", description="All unique baseline (antenna-pair) integers.", expected_type=np.int32, form="(Nbls,)")
# Misc Attributes
self._channel_width = PSpecParam("channel_width", description="width of visibility frequency channels in Hz.", form="(Nspwfreqs,)", expected_type=np.float64)
self._telescope_location = PSpecParam("telescope_location", description="telescope location in ECEF frame [meters]. To get it in Lat/Lon/Alt see pyuvdata.utils.LatLonAlt_from_XYZ().", expected_type=np.float64)
self._weighting = PSpecParam("weighting", description="Form of data weighting used when forming power spectra.", expected_type=str)
self.set_symmetric_taper = PSpecParam("symmetric_taper", description="Specify whether Taper was applied symmetrically (True) or to the left(False).", expected_type=str)
self._norm = PSpecParam("norm", description="Normalization method adopted in OQE (M matrix).", expected_type=str)
self._taper = PSpecParam("taper", description='Taper function applied to visibility data before FT. See uvtools.dspec.gen_window for options."', expected_type=str)
self._vis_units = PSpecParam("vis_units", description="Units of the original visibility data used to form the power spectra.", expected_type=str)
self._norm_units = PSpecParam("norm_units", description="Power spectra normalization units, i.e. telescope units [Hz str] or cosmological [(h^-3) Mpc^3].", expected_type=str)
self._labels = PSpecParam("labels", description="Array of dataset string labels.", expected_type=str)
self._label_1_array = PSpecParam("label_1_array", description="Integer array w/ shape of data that indexes labels and gives label of dset1.", form="(Nspws, Nbltpairs, Npols)", expected_type=np.int32)
self._label_2_array = PSpecParam("label_2_array", description="Integer array w/ shape of data that indexes labels and gives label of dset2.", form="(Nspws, Nbltpairs, Npols)", expected_type=np.int32)
self._folded = PSpecParam("folded", description="if power spectra are folded (i.e. averaged) onto purely positive delay axis. Default is False", expected_type=bool)
self._beamfile = PSpecParam("beamfile", description="filename of beam-model used to normalized pspectra.", expected_type=str)
self._OmegaP = PSpecParam("OmegaP", description="Integral of unitless beam power over the sky [steradians].", form="(Nbeam_freqs, Npols)", expected_type=np.float64)
self._OmegaPP = PSpecParam("OmegaP", description="Integral of unitless beam power squared over the sky [steradians].", form="(Nbeam_freqs, Npols)", expected_type=np.float64)
self._beam_freqs = PSpecParam("beam_freqs", description="Frequency bins of the OmegaP and OmegaPP beam-integral arrays [Hz].", form="(Nbeam_freqs,)", expected_type=np.float64)
self._cosmo = PSpecParam("cosmo", description="Instance of conversion.Cosmo_Conversions class.", expected_type=conversions.Cosmo_Conversions)
# Collect all parameters: required and non-required
self._all_params = sorted( [ p[1:] for p in
fnmatch.filter(self.__dict__.keys(), '_*')])
# Specify required params: these are required for read / write and
# self.check()
self._req_params = ["Ntimes", "Ntpairs", "Nbltpairs", "Nblpairs",
"Nspws", "Ndlys", "Npols", "Nfreqs", "history",
"Nspwdlys", "Nspwfreqs", "r_params",
"data_array", "wgt_array", "integration_array",
"spw_array", "freq_array", "dly_array",
"polpair_array",
"lst_1_array", "lst_2_array", "time_1_array",
"time_2_array", "blpair_array", "Nbls",
"bl_vecs", "bl_array", "channel_width",
"telescope_location", "weighting", "vis_units",
"norm_units", "taper", "norm", "nsample_array",
"lst_avg_array", "time_avg_array", "folded",
"scalar_array", "labels", "label_1_array",
"label_2_array", "spw_dly_array", "spw_freq_array",
"symmetric_taper"]
# All parameters must fall into one and only one of the following
# groups, which are used in __eq__
self._immutables = ["Ntimes", "Ntpairs", "Nbltpairs", "Nblpairs", "Nspwdlys",
"Nspwfreqs", "Nspws", "Ndlys", "Npols", "Nfreqs",
"history", "r_params", "cov_model",
"Nbls", "weighting", "vis_units",
"norm", "norm_units", "taper", "cosmo", "beamfile",
'folded', 'exact_windows']
self._ndarrays = ["spw_array", "freq_array", "dly_array",
"polpair_array", "lst_1_array", "lst_avg_array",
"time_avg_array", "channel_width",
"lst_2_array", "time_1_array", "time_2_array",
"blpair_array", "OmegaP", "OmegaPP", "beam_freqs",
"bl_vecs", "bl_array", "telescope_location",
"scalar_array", "labels", "label_1_array",
"label_2_array", "spw_freq_array", "spw_dly_array"]
self._dicts = ["data_array", "wgt_array", "integration_array", "window_function_array",
"window_function_kperp", "window_function_kpara",
"nsample_array", "cov_array_real", "cov_array_imag"]
self._dicts_of_dicts = ["stats_array"]
# define which attributes are considered meta data. Large attrs should
# be constructed as datasets
self._meta_dsets = ["lst_1_array", "lst_2_array", "time_1_array",
"time_2_array", "blpair_array", "bl_vecs",
"bl_array", "lst_avg_array", "time_avg_array",
"OmegaP", "OmegaPP", "label_1_array",
"label_2_array", "spw_dly_array", "spw_freq_array"]
self._meta_attrs = sorted(
set(self._all_params) - set(self._dicts) \
- set(self._meta_dsets) - set(self._dicts_of_dicts))
self._meta = sorted(set(self._meta_dsets).union(set(self._meta_attrs)))
# Attributes that appeared in previous versions of UVPSpec and may be present
# in HDF5 files written by these older versions. We can look for these attributes
# when we read the files and
# and deal with them appropriately to support backwards compatibility.
self._meta_deprecated = ["Nblpairts"]
self._meta_dsets_deprecated = []
# check all params are covered
assert len( set(self._all_params) - set(self._dicts) \
- set(self._immutables) - set(self._ndarrays) \
- set(self._dicts_of_dicts) ) == 0
# Default parameter values
self.folded = False
# Set function docstrings
self.get_red_bls.__func__.__doc__ = uvputils._get_red_bls.__doc__
self.get_red_blpairs.__func__.__doc__ = uvputils._get_red_blpairs.__doc__
[docs] def get_cov(self, key, component='real', omit_flags=False):
"""
Slice into covariance array with a specified data key in the format
(spw, ((ant1, ant2),(ant3, ant4)), (pol1, pol2))
or
(spw, blpair-integer, pol-integer)
where spw is the spectral window integer, ant1 etc. are integers,
and pol is either a tuple of polarization strings (ex. ('XX', 'YY'))
or integers (ex. (-5,-5)).
Parameters
----------
key: tuple
Contains the baseline-pair, spw, polpair keys
component : str
"real" or "imag". Indicating which cov_array the function calls.
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
data : complex ndarray
Shape (Ntimes, Ndlys, Ndlys)
"""
spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags)
# Need to deal with folded data!
# if data has been folded, return only positive delays
if component == 'real':
if hasattr(self,'cov_array_real'):
if self.folded:
Ndlys = np.count_nonzero(self.spw_dly_array == spw)
return self.cov_array_real[spw][blpairts, -(Ndlys-Ndlys//2-1):, -(Ndlys-Ndlys//2-1):, polpair]
else:
return self.cov_array_real[spw][blpairts, :, :, polpair]
else:
raise AttributeError("No covariance array has been calculated.")
elif component == 'imag':
if hasattr(self,'cov_array_imag'):
if self.folded:
Ndlys = np.count_nonzero(self.spw_dly_array == spw)
return self.cov_array_imag[spw][blpairts, -(Ndlys-Ndlys//2-1):, -(Ndlys-Ndlys//2-1):, polpair]
else:
return self.cov_array_imag[spw][blpairts, :, :, polpair]
else:
raise AttributeError("No covariance array has been calculated.")
else:
raise ValueError("No types besides real and imag.")
[docs] def get_window_function(self, key, omit_flags=False):
"""
Slice into window_function array with a specified data key in the format
(spw, ((ant1, ant2),(ant3, ant4)), (pol1, pol2))
or
(spw, blpair-integer, pol-integer)
where spw is the spectral window integer, ant1 etc. are integers,
and pol is either a tuple of polarization strings (ex. ('XX', 'YY'))
or integers (ex. (-5,-5)).
Parameters
----------
key: tuple
Contains the baseline-pair, spw, polpair keys
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
data : complex ndarray
Shape (Ntimes, Ndlys, Ndlys)
"""
spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags)
# Need to deal with folded data!
# if data has been folded, return only positive delays
if self.folded:
Ndlys = np.count_nonzero(self.spw_dly_array == spw)
return self.window_function_array[spw][blpairts, -(Ndlys-Ndlys//2-1):, -(Ndlys-Ndlys//2-1):, polpair]
else:
return self.window_function_array[spw][blpairts, ..., polpair]
[docs] def get_data(self, key, omit_flags=False):
"""
Slice into data_array with a specified data key in the format
(spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2))
or
(spw, blpair-integer, polpair)
where spw is the spectral window integer, ant1 etc. are integers,
and polpair is either a tuple of polarization strings/ints or a
polarizatio-pair integer.
Parameters
----------
key : tuple
Contains the baseline-pair, spw, polpair keys
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
data : complex ndarray
Shape (Ntimes, Ndlys)
"""
spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags)
# if data has been folded, return only positive delays
if self.folded:
Ndlys = np.count_nonzero(self.spw_dly_array == spw)
return self.data_array[spw][blpairts, -(Ndlys-Ndlys//2-1):, polpair]
# else return all delays
else:
return self.data_array[spw][blpairts, :, polpair]
[docs] def get_wgts(self, key, omit_flags=False):
"""
Slice into wgt_array with a specified data key in the format
(spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2))
or
(spw, blpair-integer, polpair-integer)
where spw is the spectral window integer, ant1 etc. are integers,
and polpair is either a polarization pair tuple or integer.
Parameters
----------
key : tuple
Contains the baseline-pair, spw, polpair keys
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
wgts : float ndarray
Has shape (Ntimes, Nfreqs, 2), where the last axis holds
[wgt_1, wgt_2] in that order.
"""
spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags)
return self.wgt_array[spw][blpairts, :, :, polpair]
[docs] def get_integrations(self, key, omit_flags=False):
"""
Slice into integration_array with a specified data key in the format::
(spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2))
or
(spw, blpair-integer, polpair-integer)
where spw is the spectral window integer, ant1 etc. are integers,
and polpair is either a polarization pair tuple or integer.
Parameters
----------
key : tuple
Contains the baseline-pair, spw, polpair keys
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
data : float ndarray
Has shape (Ntimes,)
"""
spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags)
return self.integration_array[spw][blpairts, polpair]
[docs] def get_r_params(self):
"""
Return an `r_params` dictionary.
In a `PSpecData` object, the `r_params` are stored as a dictionary with
one entry per baseline.
In a `UVPSpec` object, the dictionary is compressed so that a single
`r_param` entry corresponds to multiple baselines and is stored as a
JSON format string.
This function reads the compressed string and returns the dictionary
with the correct following fields and structure.
Returns
-------
r_params : dict
Dictionary of r_params for this object.
"""
return uvputils.decompress_r_params(self.r_params)
[docs] def get_nsamples(self, key, omit_flags=False):
"""
Slice into nsample_array with a specified data key in the format
(spw, ((ant1, ant2), (ant3, ant4)), (pol1, pol2))
or
(spw, blpair-integer, polpair-integer)
where spw is the spectral window integer, ant1 etc. are integers,
and polpair is either a polarization pair tuple or integer.
Parameters
----------
key : tuple
Contains the baseline-pair, spw, polpair keys
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
data : float ndarray with shape (Ntimes,)
"""
spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags)
return self.nsample_array[spw][blpairts, polpair]
[docs] def get_dlys(self, key):
"""
Get array of delays given a spectral window selection.
Parameters
----------
key : int, or tuple with integer
Spectral window selection
Returns
-------
dlys : float ndarray, contains delays of pspectra in nanosec, given spw
"""
indices = self.spw_to_dly_indices(key)
return self.dly_array[indices]
[docs] def get_blpair_seps(self):
"""
For each baseline-pair, get the average baseline separation in ENU
frame in meters between its bl1 and bl2. Ordering matches
self.get_blpairs()
Returns
-------
blp_avg_sep : float ndarray
shape=(Nbltpairs,)
"""
# get list of bl separations
bl_vecs = self.get_ENU_bl_vecs()
bls = self.bl_array.tolist()
blseps = np.array([np.linalg.norm(bl_vecs[bls.index(bl)])
for bl in self.bl_array])
# construct empty blp_avg_sep array
blp_avg_sep = np.empty(self.Nbltpairs, float)
# construct blpair_bls
blpairs = _ordered_unique(self.blpair_array)
bl1 = np.floor(blpairs / 1e6)
blpair_bls = np.vstack([bl1, blpairs - bl1*1e6]).astype(np.int32).T
# iterate over blpairs
for blp, bl in zip(blpairs, blpair_bls):
avg_sep = np.mean([blseps[bls.index(bl[0])], blseps[bls.index(bl[1])]])
inds = self.blpair_to_indices(blp)
blp_avg_sep[inds] = avg_sep
return blp_avg_sep
[docs] def get_blpair_blvecs(self, use_second_bl=False):
"""
For each baseline-pair, get the baseline vector in ENU
frame of the first baseline in the pair. Ordering matches
self.get_blpairs()
Parameters
----------
use_second_bl : bool
If True, return the vector of the second bl in the blpair.
Returns
-------
ndarray shape=(Nblpairs,)
vector in ENU frame of the baseline
"""
# get list of bl vectors
bl_vecs = self.get_ENU_bl_vecs()
bl_array = self.bl_array.tolist()
# construct blpair_bls
i = 0
if use_second_bl:
i = 1
blpairs = _ordered_unique(self.blpair_array)
bls = [self.antnums_to_bl(self.blpair_to_antnums(blp)[i]) for blp in blpairs]
blp_blvecs = []
for bl in bls:
blp_blvecs.append(bl_vecs[bl_array.index(bl)])
return np.array(blp_blvecs)
[docs] def get_kperps(self, spw, little_h=True):
"""
Get transverse (perpendicular) cosmological wavevector for each
baseline-pair given an adopted cosmology and a spw selection.
Parameters
----------
spw : int, choice of spectral window
little_h : boolean, optional
Whether to have cosmological length units be h^-1 Mpc or Mpc
Default: h^-1 Mpc
Returns
-------
k_perp : float ndarray, transverse wave-vectors, shape=(Nblpairs,)
"""
# assert cosmo
assert hasattr(self, 'cosmo'), \
"self.cosmo must exist to form cosmological " \
"wave-vectors. See self.set_cosmology()"
# calculate mean redshift of band
avg_z = self.cosmo.f2z(
np.mean(self.freq_array[self.spw_to_freq_indices(spw)]) )
# get kperps
blpair_seps = self.get_blpair_seps()
k_perp = blpair_seps * self.cosmo.bl_to_kperp(avg_z, little_h=little_h)
return k_perp
[docs] def get_kparas(self, spw, little_h=True):
"""
Get radial (parallel) cosmological wavevectors for
power spectra given an adopted cosmology and a spw selection.
Parameters
----------
spw : int
Choice of spectral window
little_h : boolean, optional
Whether to have cosmological length units be h^-1 Mpc or Mpc
Default: h^-1 Mpc
Returns (k_perp, k_para)
-------
k_para : float ndarray
Radial wave-vectors, shape=(Ndlys given spw)
"""
# assert cosmo
assert hasattr(self, 'cosmo'), \
"self.cosmo must exist to form cosmological " \
"wave-vectors. See self.set_cosmology()"
# calculate mean redshift of band
avg_z = self.cosmo.f2z(
np.mean(self.freq_array[self.spw_to_freq_indices(spw)]) )
# get kparas
k_para = self.get_dlys(spw) \
* self.cosmo.tau_to_kpara(avg_z, little_h=little_h)
return k_para
[docs] def get_blpairs(self):
"""
Returns a list of all blpair tuples in the data_array.
"""
return [self.blpair_to_antnums(blp)
for blp in _ordered_unique(self.blpair_array)]
[docs] def get_polpairs(self):
"""
Returns a list of all unique polarization pairs in the data_array.
"""
polpairs = [uvputils.polpair_int2tuple(pp, pol_strings=True)
for pp in self.polpair_array]
return sorted(set(polpairs))
[docs] def get_all_keys(self):
"""
Returns a list of all possible tuple keys in the data_array, in the
format:
(spectral window, baseline-pair, polarization-string)
"""
# get unique blpair tuples
blps = self.get_blpairs()
all_keys = []
# loop over spw and pols and add to keys
for spw in range(self.Nspws):
for p in self.polpair_array:
pol1, pol2 = uvputils.polpair_int2tuple(p)
pstr = (uvutils.polnum2str(pol1), uvutils.polnum2str(pol2))
all_keys.extend([(spw, blp, pstr) for blp in blps])
return all_keys
[docs] def get_spw_ranges(self, spws=None):
"""
Get the frequency range and Nfreqs of each spectral window in the object.
Parameters
----------
spws : list of spw integers, optional
Default is all spws present in data.
Returns
-------
spw_ranges : list of len-4 tuples
Tuples: (freq_start, freq_end, Nfreqs, Ndlys) in Hz. Contains
start, stop and Nfreqs and Ndlys [Hz] of each spectral window.
To turn this into the frequency array of the spectral window use
`spw_freqs = np.linspace(*spw_range[:3], endpoint=False)`
"""
# type check
if isinstance(spws, (int, np.integer)):
spws = [spws]
if spws is None:
spws = np.arange(self.Nspws)
spw_ranges = []
# iterate over spectral windows
for spw in spws:
spw_freqs = self.freq_array[self.spw_to_freq_indices(spw)]
Nfreqs = len(spw_freqs)
spw_df = np.median(np.diff(spw_freqs))
Ndlys = len(self.spw_to_dly_indices(spw))
spw_ranges.append( (spw_freqs.min(), spw_freqs.min() + Nfreqs * spw_df, Nfreqs, Ndlys) )
return spw_ranges
[docs] def get_stats(self, stat, key, omit_flags=False):
"""
Returns a statistic from the stats_array dictionary.
Parameters
----------
stat : str
The statistic to return.
key : tuple
A spw-blpair-polpair key parseable by UVPSpec.key_to_indices.
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
statistic : array_like
A 2D (Ntimes, Ndlys) complex array holding desired bandpower
statistic.
"""
if not hasattr(self, "stats_array"):
raise AttributeError("No stats have been entered to this UVPSpec object")
assert stat in self.stats_array.keys(), "Statistic name {} not found in stat keys.".format(stat)
spw, blpairts, polpair = self.key_to_indices(key, omit_flags=omit_flags)
statistic = self.stats_array[stat]
# if bandpowers have been folded, return only positive delays
if self.folded:
Ndlys = np.count_nonzero(self.spw_dly_array == spw)
return statistic[spw][blpairts, -(Ndlys-Ndlys//2-1):, polpair]
else:
return statistic[spw][blpairts, :, polpair]
[docs] def set_stats(self, stat, key, statistic):
"""
Set a statistic waterfall (Ntimes, Ndlys) in the stats_array
given the selection of spw, blpairts and polpair in key.
Parameters
----------
stat : string
Name of the statistic.
key : tuple
A spw-blpair-polpair key parseable by UVPSpec.key_to_indices.
statistic : ndarray
2D ndarray with statistics to set. Must conform to shape of the
slice of data_array produced by the specified key.
"""
spw, blpairts, polpair = self.key_to_indices(key)
statistic = np.asarray(statistic)
data_shape = self.data_array[spw][blpairts, :, polpair].shape
if data_shape != statistic.shape:
print(data_shape, statistic.shape)
errmsg = "Input array shape {} must match " \
"data_array shape {}.".format(statistic.shape, data_shape)
raise ValueError(errmsg)
if not hasattr(self, "stats_array"):
self.stats_array = odict()
dtype = statistic.dtype
if stat not in self.stats_array.keys():
self.stats_array[stat] = \
odict([[i,
np.nan * np.ones(self.data_array[i].shape, dtype=dtype)]
for i in range(self.Nspws)])
self.stats_array[stat][spw][blpairts, :, polpair] = statistic
[docs] def set_stats_slice(self, stat, m, b, above=False, val=1e20):
"""
For each baseline, set all delay bins in stats_array that fall
above or below y = bl_len * m + b [nanosec] equal to val.
Useful for downweighting foregrounds in spherical average.
Parameters
----------
stat : str, name of stat in stat_array to set
m : float, coefficient of bl_len [nanosec / meter]
b : float, offset [nanosec]
above : bool, if True, set stats above line, else set below line
val : float, value to replace in stats_array
"""
assert stat in self.stats_array, "{} not found in stats_array".format(stat)
blpairs = self.get_blpairs()
bllens = self.get_blpair_seps()
# iterate over spws
for i in range(self.Nspws):
# get this spw dlys
dlys = self.get_dlys(i) * 1e9
# iterate over blpairs
for blp in blpairs:
# get time-blpair inds
tinds = self.blpair_to_indices(blp)
# get dly indices
if above:
dinds = np.abs(dlys) > (bllens[tinds].mean() * m + b)
else:
dinds = np.abs(dlys) < (bllens[tinds].mean() * m + b)
# set stat_array
for tind in tinds:
self.stats_array[stat][i][tind, dinds, :] = val
[docs] def convert_to_deltasq(self, inplace=True):
"""
Convert from P(k) to Delta^2(k) by multiplying by k^3 / (2pi^2).
The units of the output is therefore the current units (self.units) times
(h^3) Mpc^-3, where the h^3 is only included if h^-3 is in self.norm_units
Parameters
----------
inplace : bool, optional
If True edit and overwrite arrays in self, else make a copy of
self and return. Default: True.
Notes:
------
Alters data_array, stats_array, and cov_array if present,
as well as metadata like norm_units
"""
# copy object
if inplace:
uvp = self
else:
uvp = copy.deepcopy(self)
# determine if little_h is required
little_h = 'h^-3' in uvp.norm_units
# loop over spectral windows
for spw in range(uvp.Nspws):
# get k vectors
k_perp = uvp.get_kperps(spw, little_h=little_h)
k_para = uvp.get_kparas(spw, little_h=little_h)
k_mag = np.sqrt(k_perp[:, None, None]**2 + k_para[None, :, None]**2)
# shape of (Nbltpairs, spw_Ndlys, Npols)
coeff = k_mag**3 / (2 * np.pi**2)
# multiply into data
uvp.data_array[spw] *= coeff
# update stats array
if hasattr(uvp, 'stats_array'):
for stat in uvp.stats_array:
uvp.stats_array[stat][spw] *= coeff
# update cov array
if hasattr(uvp, 'cov_array_real'):
cov = uvp.cov_array_real[spw]
uvp.cov_array_real[spw] = np.einsum("tip,tijp,tjp->tijp", coeff, cov, coeff)
cov = uvp.cov_array_imag[spw]
uvp.cov_array_imag[spw] = np.einsum("tip,tijp,tjp->tijp", coeff, cov, coeff)
# edit units
uvp.norm_units += " k^3 / (2pi^2)"
if inplace == False:
return uvp
[docs] def blpair_to_antnums(self, blpair):
"""
Convert baseline-pair integer to nested tuple of antenna numbers.
Parameters
----------
blpair : i12 int
baseline-pair integer ID
Returns
-------
antnums : tuple
Nested tuple containing baseline-pair antenna numbers.
Ex. ((ant1, ant2), (ant3, ant4))
"""
return uvputils._blpair_to_antnums(blpair)
[docs] def antnums_to_blpair(self, antnums):
"""
Convert nested tuple of antenna numbers to baseline-pair integer.
Parameters
----------
antnums : tuple
nested tuple containing integer antenna numbers for a baseline-pair.
Ex. ((ant1, ant2), (ant3, ant4))
Returns
-------
blpair : i12 integer
baseline-pair integer
"""
return uvputils._antnums_to_blpair(antnums)
[docs] def bl_to_antnums(self, bl):
"""
Convert baseline (antenna-pair) integer to nested tuple of antenna
numbers.
Parameters
----------
bl : i6 int
baseline integer ID
Returns
-------
antnums : tuple
tuple containing baseline antenna numbers. Ex. (ant1, ant2)
"""
return uvputils._bl_to_antnums(bl)
[docs] def antnums_to_bl(self, antnums):
"""
Convert tuple of antenna numbers to baseline integer.
Parameters
----------
antnums : tuple
Tuple containing integer antenna numbers for a baseline.
Ex. (ant1, ant2)
Returns
-------
bl : i6 integer
Baseline integer.
"""
return uvputils._antnums_to_bl(antnums)
[docs] def blpair_to_indices(self, blpair):
"""
Convert a baseline-pair nested tuple ((ant1, ant2), (ant3, ant4)) or
a baseline-pair integer into indices to index the blpairts axis of
data_array.
Parameters
----------
blpair : tuples or ints
Nested tuple or blpair i12 integer, Ex. ((1, 2), (3, 4)),
or list of blpairs.
"""
# convert blpair to integer if fed as tuple
if isinstance(blpair, tuple):
blpair = [self.antnums_to_blpair(blpair)]
elif isinstance(blpair, (np.integer, int)):
blpair = [blpair]
elif isinstance(blpair, list):
if isinstance(blpair[0], tuple):
blpair = [self.antnums_to_blpair(blp) for blp in blpair]
# assert exists in data
assert np.array([b in self.blpair_array for b in blpair]).all(), \
"blpairs {} not all found in data".format(blpair)
return np.arange(self.Nbltpairs)[
np.logical_or.reduce([self.blpair_array == b for b in blpair])]
[docs] def spw_to_freq_indices(self, spw):
"""
Convert a spectral window integer into a list of indices to index
into the spw_freq_array or freq_array.
Parameters
----------
spw : int or tuple
Spectral window index, or spw tuple from get_spw_ranges(), or list
of either.
Returns
-------
freq_indices : array_like
Index array for specified spw(s) in spw_freq_array.
"""
# convert to list if int
if isinstance(spw, (int, np.integer, tuple)):
spw = [spw]
# if spws is a list of spw tuples, convert to spw indices
if np.all([isinstance(s, tuple) for s in spw]):
spw_ranges = self.get_spw_ranges()
spw = [spw_ranges.index(s) for s in spw]
# assert exists in data
assert np.array([s in self.spw_freq_array for s in spw]).all(), \
"spws {} not all found in data".format(spw)
# get select boolean array
select = np.logical_or.reduce([self.spw_freq_array == s for s in spw])
# get indices
freq_indices = np.arange(self.Nspwfreqs)[select]
return freq_indices
[docs] def spw_to_dly_indices(self, spw):
"""
Convert a spectral window integer into a list of indices to index
into the spw_dly_array or dly_array.
Parameters
----------
spw : int or tuple
Spectral window index, or spw tuple from get_spw_ranges(), or list
of either.
Returns
-------
dly_indices : array_like
Index array for specified spw(s) in spw_dly_array.
"""
# convert to list if int
if isinstance(spw, (int, np.integer, tuple)):
spw = [spw]
# if spws is a list of spw tuples, convert to spw indices
if np.all([isinstance(s, tuple) for s in spw]):
spw_ranges = self.get_spw_ranges()
spw = [spw_ranges.index(s) for s in spw]
# assert exists in data
assert np.array([s in self.spw_dly_array for s in spw]).all(), \
"spws {} not all found in data".format(spw)
# get select boolean array
select = np.logical_or.reduce([self.spw_dly_array == s for s in spw])
if self.folded:
select[self.dly_array < 1e-10] = False
# get array
dly_indices = np.arange(self.Nspwdlys)[select]
return dly_indices
[docs] def spw_indices(self, spw):
"""
Convert a spectral window integer into a list of indices to index
into the spw_array.
Parameters
----------
spw : int or tuple
Spectral window index, or spw tuple from get_spw_ranges(), or list
of either.
Returns
-------
spw_indices : array_like
Index array for specified spw(s) in spw_array.
"""
# convert to list if int
if isinstance(spw, (int, np.integer, tuple)):
spw = [spw]
# if spws is a list of spw tuples, convert to spw indices
if np.all([isinstance(s, tuple) for s in spw]):
spw_ranges = self.get_spw_ranges()
spw = [spw_ranges.index(s) for s in spw]
# assert exists in data
assert np.array([s in self.spw_array for s in spw]).all(), \
"spws {} not all found in data".format(spw)
# get select boolean array
#select = reduce(operator.add, [self.spw_array == s for s in spw])
select = np.logical_or.reduce([self.spw_array == s for s in spw])
# get array
spw_indices = np.arange(self.Nspws)[select]
return spw_indices
[docs] def polpair_to_indices(self, polpair):
"""
Map a polarization-pair integer or tuple to its index in polpair_array.
Parameters
----------
polpair : (list of) int or tuple or str
Polarization-pair integer or tuple of polarization strings/ints.
Alternatively, if a single string is given, will assume that the
specified polarization is to be cross-correlated withitself,
e.g. 'XX' implies ('XX', 'XX').
Returns
-------
indices : int
Index of polpair in polpair_array.
"""
# Convert to list if not already a list
if isinstance(polpair, (tuple, int, np.integer, str)):
polpair = [polpair,]
elif not isinstance(polpair, (list, np.ndarray)):
raise TypeError("polpair must be list of tuple or int or str")
# Convert strings to ints
polpair = [uvputils.polpair_tuple2int((p,p)) if isinstance(p, str) else p
for p in polpair]
# Convert list items to standard format (polpair integers)
polpair = [uvputils.polpair_tuple2int(p) if isinstance(p, tuple) else p
for p in polpair]
# Ensure all pols exist in data
assert np.array([p in self.polpair_array for p in polpair]).all(), \
"pols {} not all found in data".format(polpair)
idxs = np.logical_or.reduce([self.polpair_array == pp for pp in polpair])
indices = np.arange(self.polpair_array.size)[idxs]
return indices
[docs] def time_to_indices(self, time, blpairs=None):
"""
Convert a time [Julian Date] from self.time_avg_array to an array that
indexes the elements at which it appears in that array. Can optionally
take a blpair selection to further select the indices at which both
that time and blpair are satisfied.
Parameters
----------
time : float
Julian Date time from self.time_avg_array, Ex: 2458042.12242
blpairs : tuple or int, optional
List of blpair tuples or integers that further selects the elements
at which both time and blpairs are satisfied.
Returns
-------
indices : integer ndarray
Contains indices at which selection is satisfied along the
blpairts axis.
"""
time_select = np.isclose(self.time_avg_array, time, rtol=1e-10)
if blpairs is None:
return np.arange(self.Nbltpairs)[time_select]
else:
blp_select = np.zeros(self.Nbltpairs, bool)
if isinstance(blpairs, (tuple, int, np.integer)):
blpairs = [blpairs]
for blp in blpairs:
blp_select[self.blpair_to_indices(blp)] = True
time_select *= blp_select
return np.arange(self.Nbltpairs)[time_select]
[docs] def key_to_indices(self, key, omit_flags=False):
"""
Convert a data key into relevant slice arrays. A data key takes the form
(spw_integer, ((ant1, ant2), (ant3, ant4)), (pol1, pol2))
or
(spw, blpair-integer, pol-integer)
where spw is the spectral window integer, ant1 etc. are integers,
and polpair is either a polarization-pair integer or tuple.
The key can also be a dictionary in the form::
key = {
'spw' : spw_integer,
'blpair' : ((ant1, ant2), (ant3, ant4))
'polpair' : (pol1, pol2)
}
and it will parse the dictionary for you.
Parameters
----------
key : tuple
Baseline-pair, spw, and pol-pair key.
omit_flags : bool, optional
If True, remove time integrations (or spectra) that
came from visibility data that were completely flagged
across the spectral window (i.e. integration == 0).
Returns
-------
spw : int
Spectral window index.
blpairts : list
List of integers to apply along blpairts axis.
polpair : int
Polarization pair index.
"""
# assert key length
assert len(key) == 3, "length of key must be 3: (spw, blpair, polpair)"
if isinstance(key, (odict, dict)):
key = (key['spw'], key['blpair'], key['polpair'])
# assign key elements
spw_ind = key[0]
blpair = key[1]
polpair = key[2]
# assert types
assert isinstance(spw_ind, (int, np.integer)), "spw must be an integer"
assert isinstance(blpair, (int, np.integer, tuple)), \
"blpair must be an integer or nested tuple"
assert isinstance(polpair, (tuple, np.integer, int, str)), \
"polpair must be a tuple, integer, or str: %s / %s" % (polpair, key)
# convert polpair string to tuple
if isinstance(polpair, str):
polpair = (polpair, polpair)
# convert blpair to int if not int
if isinstance(blpair, tuple):
blpair = self.antnums_to_blpair(blpair)
# convert pol to int if not int
if isinstance(polpair, tuple):
polpair = uvputils.polpair_tuple2int(polpair)
# check attributes exist in data
assert spw_ind in self.spw_freq_array and spw_ind in self.spw_dly_array, \
"spw {} not found in data".format(spw_ind)
assert blpair in self.blpair_array, \
"blpair {} not found in data".format(blpair)
assert polpair in self.polpair_array, \
"polpair {} not found in data".format(polpair)
# index polarization array
polpair_ind = self.polpair_to_indices(polpair)
if isinstance(polpair_ind, (list, np.ndarray)):
assert len(polpair_ind) == 1, \
"only one polpair can be specified in key_to_indices"
polpair_ind = polpair_ind[0]
# index blpairts
blpairts_inds = self.blpair_to_indices(blpair)
# omit flagged spectra: i.e. when integration_array == 0.0
if omit_flags:
integs = self.integration_array[spw_ind][blpairts_inds, polpair_ind]
keep = ~np.isclose(integs, 0.0)
blpairts_inds = blpairts_inds[keep]
return spw_ind, blpairts_inds, polpair_ind
[docs] def select(self, spws=None, bls=None, only_pairs_in_bls=True, blpairs=None,
times=None, lsts=None, polpairs=None, inplace=True,
run_check=True):
"""
Select function for selecting out certain slices of the data.
Parameters
----------
spws : list, optional
List of spectral window integers to select. If None, all will be
selected. Default: None.
bls : list of i6 baseline integers or baseline tuples, optional
Example: (2,3). Select all baseline-pairs whose first _or_ second
baseline are in bls list. This changes if only_pairs_in_bls == True.
If None, all baselines are selected (subject to other options).
Default: None.
only_pairs_in_bls : bool, optional
If True, keep only baseline-pairs whose first _and_ second baseline
are found in bls list. Default: True.
blpairs : list of baseline-pair tuples or integers, optional
List of baseline-pairs to keep. If bls is also fed, this list is
concatenated onto the baseline-pair list constructed from the
bls selection. If None, all valid baseline pairs are selected.
Default: None.
times : array_like, optional
Float ndarray of times from the time_avg_array to select. If None,
all times are kept. Default: None.
lsts : array_like, optional
Float ndarray of lsts from the lst_avg_array to select. If None,
all lsts are kept. Default: None.
polpairs : list of tuple or int or str, optional
List of polarization-pairs to keep. If None, all polarizations
are kept. Default: None.
(Strings are expanded into polarization pairs, and are not treated
as individual polarizations, e.g. 'XX' becomes ('XX,'XX').)
inplace : bool, optional
If True, edit and overwrite arrays in self, else make a copy of
self and return. Default: True.
run_check : bool, optional
If True, run uvp.check() after selection
Returns
-------
uvp : UVPSpec, optional
If inplace=False, return a new UVPSpec object containing only the
selected data.
"""
if inplace:
uvp = self
else:
uvp = copy.deepcopy(self)
uvputils._select(uvp, spws=spws, bls=bls,
only_pairs_in_bls=only_pairs_in_bls,
blpairs=blpairs, times=times, lsts=lsts,
polpairs=polpairs)
if run_check: uvp.check()
if inplace == False: return uvp
[docs] def get_ENU_bl_vecs(self):
"""
Return baseline vector array in TOPO (ENU) frame in meters, with
matched ordering of self.bl_vecs.
Returns
-------
blvecs : array_like
Array of baseline vectors.
"""
return uvutils.ENU_from_ECEF(
(self.bl_vecs + self.telescope_location), \
*uvutils.LatLonAlt_from_XYZ(self.telescope_location[None]))
[docs] def read_from_group(self, grp, just_meta=False, spws=None, bls=None,
blpairs=None, times=None, lsts=None, polpairs=None,
only_pairs_in_bls=False):
"""
Clear current UVPSpec object and load in data from specified HDF5 group.
Parameters
----------
grp : HDF5 group
HDF5 group to load data from.
just_meta : bool, optional
If True, read-in metadata but ignore data, wgts and integration
arrays. Default: False.
spws : list of tuple, optional
List of spectral window integers to select. Default: None (loads
all channels).
bls : list of i6 baseline integers or baseline tuples
Select all baseline-pairs whose first _or_ second baseline are in
the list. This changes if only_pairs_in_bls == True.
Example tuple: (2, 3). Default: None (loads all bl pairs).
blpairs : list of baseline-pair tuples or integers
List of baseline pairs to keep. If bls is also fed, this list is
concatenated onto the baseline-pair list constructed from from the
bls selection.
times : float ndarray
Times from the time_avg_array to keep.
lsts : float ndarray
lsts from the lst_avg_array to keep.
polpairs : list of tuple or int or str
List of polarization-pair integers, tuples, or strings to keep.
Strings are expanded into polarization pairs, and are not treated
as individual polarizations, e.g. 'XX' becomes ('XX,'XX').
only_pairs_in_bls : bool, optional
If True, keep only baseline-pairs whose first _and_ second baseline
are found in the 'bls' list. Default: False.
"""
# Make sure the group is a UVPSpec object
assert 'pspec_type' in grp.attrs, "This object is not a UVPSpec object"
pstype = grp.attrs['pspec_type']
pstype = pstype.decode() if isinstance(pstype, bytes) else pstype
assert pstype == 'UVPSpec', "This object is not a UVPSpec object"
# Clear all data in the current object
self._clear()
# Load-in meta data including deprecated.
for k in grp.attrs:
if k in self._meta_attrs or k in self._meta_deprecated:
val = grp.attrs[k]
if isinstance(val, bytes): val = val.decode() # bytes -> str
setattr(self, k, val)
for k in grp:
if k in self._meta_dsets or k in self._meta_dsets_deprecated:
setattr(self, k, grp[k][:])
# Backwards compatibility: pol_array exists (not polpair_array)
if 'pol_array' in grp.attrs:
warnings.warn("Stored UVPSpec contains pol_array attr, which has "
"been superseded by polpair_array. Converting "
"automatically.", UserWarning)
pol_arr = grp.attrs['pol_array']
# Convert to polpair array
polpair_arr = [uvputils.polpair_tuple2int((p,p)) for p in pol_arr]
setattr(self, 'polpair_array', np.array(polpair_arr))
# Backwards compatibility: exact_windows
if 'exact_windows' not in grp.attrs:
setattr(self, 'exact_windows', False)
# Use _select() to pick out only the requested baselines/spws
if just_meta:
uvputils._select(self, spws=spws, bls=bls, lsts=lsts,
only_pairs_in_bls=only_pairs_in_bls,
blpairs=blpairs, times=times, polpairs=polpairs)
else:
uvputils._select(self, spws=spws, bls=bls, lsts=lsts,
only_pairs_in_bls=only_pairs_in_bls,
blpairs=blpairs, times=times, polpairs=polpairs,
h5file=grp)
# handle cosmo
if hasattr(self, 'cosmo'):
self.cosmo = conversions.Cosmo_Conversions(
**ast.literal_eval(self.cosmo) )
# BACKWARDS COMPATIBILITY for files with Nblpairts.
# Check whether we are reading a pre Nbltpairs file by looking for the
# Nblpairts field.
if hasattr(self, "Nblpairts"):
reading_old_version = True
setattr(self, "Nbltpairs", self.Nblpairts)
else:
reading_old_version = False
if reading_old_version:
# If we are reading a power spectrum object that was written in the pre-Nbltpairs days
# then make sure to correctly set Ntimes and Ntpairs.
self.Ntimes = len(np.unique(np.hstack([self.time_1_array, self.time_2_array])))
self.Ntpairs = len(set([(t1, t2) for t1, t2 in zip(self.time_1_array, self.time_2_array)]))
# Clear deprecated attributes.
for dattr in self._meta_deprecated:
if hasattr(self, dattr):
delattr(self, dattr)
for dattr in self._meta_dsets_deprecated:
if hasattr(self, dattr):
delattr(self, dattr)
# If we are reading an UVPSpec object created before
# UVData switched to future_array_shapes
try:
cw = getattr(self, "_channel_width").value
except AttributeError:
pass
else:
setattr(self, "_channel_width", np.atleast_1d(getattr(self, "_channel_width").value))
self.check(just_meta=just_meta)
[docs] def read_hdf5(self, filepath, just_meta=False, spws=None, bls=None,
blpairs=None, times=None, lsts=None, polpairs=None,
only_pairs_in_bls=False):
"""
Clear current UVPSpec object and load in data from an HDF5 file.
Parameters
----------
filepath : str
Path to HDF5 file to read from.
just_meta : bool, optional
If True, read-in metadata but ignore data, wgts and integration
arrays. Default: False.
spws : list of tuple, optional
List of spectral window integers to select. Default: None (loads
all channels).
bls : list of i6 baseline integers or baseline tuples
Select all baseline-pairs whose first _or_ second baseline are in
the list. This changes if only_pairs_in_bls == True.
Example tuple: (2, 3). Default: None (loads all bl pairs).
blpairs : list of baseline-pair tuples or integers
List of baseline pairs to keep. If bls is also fed, this list is
concatenated onto the baseline-pair list constructed from from the
bls selection.
times : float ndarray
Times from the time_avg_array to keep.
lsts : float ndarray
lsts from the lst_avg_array to keep.
polpairs : list of polpair ints or tuples or str
List of polarization-pair integers or tuples to keep.
Strings are expanded into polarization pairs, and are not treated
as individual polarizations, e.g. 'XX' becomes ('XX,'XX').
only_pairs_in_bls : bool, optional
If True, keep only baseline-pairs whose first _and_ second baseline
are found in the 'bls' list. Default: False.
"""
# Open file descriptor and read data
with h5py.File(filepath, 'r') as f:
self.read_from_group(f, just_meta=just_meta, spws=spws, bls=bls,
times=times, lsts=lsts, polpairs=polpairs,
only_pairs_in_bls=only_pairs_in_bls)
[docs] def write_to_group(self, group, run_check=True):
"""
Write UVPSpec data into an HDF5 group.
Parameters
----------
group : HDF5 group
The handle of the HDF5 group that the UVPSpec object should be
written into.
run_check : bool, optional
Whether to run a validity check on the UVPSpec object before
writing it to the HDF5 group. Default: True.
"""
# Run check
if run_check: self.check()
# Check whether the group already contains info
# TODO
# Write meta data
for k in self._meta_attrs:
if hasattr(self, k):
# handle cosmo
if k == 'cosmo':
group.attrs['cosmo'] = str(getattr(self, k).get_params())
continue
this_attr = getattr(self, k)
# Do Unicode/string conversions, as HDF5 struggles with them
if k == 'labels':
this_attr = [np.string_(lbl) for lbl in this_attr]
# Store attribute in group
group.attrs[k] = this_attr
for k in self._meta_dsets:
if hasattr(self, k):
group.create_dataset(k, data=getattr(self, k))
# Iterate over spectral windows and create datasets
for i in np.unique(self.spw_array):
group.create_dataset("data_spw{}".format(i),
data=self.data_array[i],
dtype=np.complex128)
group.create_dataset("wgt_spw{}".format(i),
data=self.wgt_array[i],
dtype=np.float64)
group.create_dataset("integration_spw{}".format(i),
data=self.integration_array[i],
dtype=np.float64)
group.create_dataset("nsample_spw{}".format(i),
data=self.nsample_array[i],
dtype=float)
if hasattr(self, "window_function_array"):
group.create_dataset("window_function_spw{}".format(i),
data=self.window_function_array[i],
dtype=np.float64)
if self.exact_windows:
group.create_dataset("window_function_kperp_spw{}".format(i),
data=self.window_function_kperp[i],
dtype=np.float64)
group.create_dataset("window_function_kpara_spw{}".format(i),
data=self.window_function_kpara[i],
dtype=np.float64)
if hasattr(self, "cov_array_real"):
group.create_dataset("cov_real_spw{}".format(i),
data=self.cov_array_real[i],
dtype=np.float64)
group.create_dataset("cov_imag_spw{}".format(i),
data=self.cov_array_imag[i],
dtype=np.float64)
# Store any statistics arrays
if hasattr(self, "stats_array"):
for s in self.stats_array.keys():
data = self.stats_array[s]
for i in np.unique(self.spw_array):
group.create_dataset("stats_{}_{}".format(s, i),
data=data[i], dtype=data[i].dtype)
# denote as a uvpspec object
group.attrs['pspec_type'] = self.__class__.__name__
[docs] def write_hdf5(self, filepath, overwrite=False, run_check=True):
"""
Write a UVPSpec object to HDF5 file.
Parameters
----------
filepath : str
Filepath for output file.
overwrite : bool, optional
Whether to overwrite output file if it exists. Default: False.
run_check : bool, optional
Run UVPSpec validity check before writing to file. Default: True.
"""
# Check output
if os.path.exists(filepath) and overwrite is False:
raise IOError("{} exists, not overwriting...".format(filepath))
elif os.path.exists(filepath) and overwrite is True:
print("{} exists, overwriting...".format(filepath))
os.remove(filepath)
# Write file
with h5py.File(filepath, 'w') as f:
self.write_to_group(f, run_check=run_check)
[docs] def set_cosmology(self, new_cosmo, overwrite=False, new_beam=None,
verbose=True):
"""
Set the cosmology for this UVPSpec object by passing an instance of
conversions.Cosmo_Conversions and assigning it to self.cosmo, in
addition to re-computing power spectrum normalization quantities like
self.scalar_array. Because this will attempt to recompute the
scalar_array, the beam-related metadata (OmegaP, OmegaPP and
beam_freqs) must exist in self.
If they do not, or if you'd like to overwrite them with a new beam, you
can pass a UVBeam object or path to a beam file via the new_beam kwarg.
If self.cosmo already exists then you are attempting to overwrite the
currently adopted cosmology, which will only proceed if overwrite is
True.
If overwrite == True, then this module will overwrite self.cosmo. It
will also recompute the power spectrum normalization scalar (using
pspecbeam) and will overwrite the values in self.scalar_array. It will
then propagate these re-normalization changes into the data_array by
multiplying the data by (new_scalar_array / old_scalar_array).
Parameters
----------
new_cosmo : Cosmo_Conversions or dict
Cosmo_Conversions instance or cosmological parameter dictionary.
The new cosmology you want to adopt for this UVPSpec object.
overwrite : bool, optional
If True, overwrite self.cosmo if it already exists. Default: False.
new_beam : PSpecBeamBase sublcass or str
pspecbeam.PSpecBeamUV object or path to beam file. The new beam you
want to adopt for this UVPSpec object.
verbose : bool, optional
If True, report feedback to stdout. Default: True.
"""
if hasattr(self, 'cosmo') and overwrite == False:
print("self.cosmo exists and overwrite == False, not overwriting...")
return
else:
if (not hasattr(self, 'OmegaP') \
or not hasattr(self, 'OmegaPP') \
or not hasattr(self, 'beam_freqs')) \
and new_beam is None:
print("In order to set the cosmology, self.OmegaP, " \
"self.OmegaPP and self.beam_freqs must exist, or you " \
"need to pass a PSpecBeamUV object or a path to a beam " \
"file as new_beam.")
return
# overwrite beam quantities
if new_beam is not None:
if verbose: print("Updating beam data with {}".format(new_beam))
if isinstance(new_beam, str):
# PSpecBeamUV will adopt a default cosmology upon instantiation,
# but this doesn't matterfor what we need from it
new_beam = pspecbeam.PSpecBeamUV(new_beam)
self.OmegaP, self.OmegaPP = new_beam.get_Omegas(self.polpair_array)
self.beam_freqs = new_beam.beam_freqs
# Update cosmo
if isinstance(new_cosmo, (dict, odict)):
new_cosmo = conversions.Cosmo_Conversions(**new_cosmo)
if verbose: print("setting cosmology: \n{}".format(new_cosmo))
self.cosmo = new_cosmo
# Update scalar_array
if verbose:
print("Updating scalar array and re-normalizing power spectra")
for spw in range(self.Nspws):
for j, polpair in enumerate(self.polpair_array):
scalar = self.compute_scalar(spw, polpair, num_steps=1000,
little_h=True, noise_scalar=False)
# renormalize power spectra with new scalar
self.data_array[spw][:, :, j] *= scalar/self.scalar_array[spw, j]
# update self.scalar_array element
self.scalar_array[spw, j] = scalar
# Update self.units if pspectra were not originally in cosmological units
if "Mpc" not in self.norm_units:
self.norm_units = "h^-3 Mpc^3"
[docs] def get_exact_window_functions(self, ftbeam=None, spw_array=None,
verbose=False, inplace=True, add_to_history='',
x_orientation=None):
"""
Obtain the exact window functions corresponding to UVPSpec object.
Depending on the format of the UVPSpec object given as input, the shape
of the output array will be different: (Nblpts, Ndlys, Nkperp, Nkpara, Npols).
Weights can be applied to the different baseline pairs to facilitate
spherical average later.
Parameters
----------
ftbeam : str or FTBeam object, optional
Definition of the beam Fourier transform to be used.
Options include;
- Root name of the file to use, without the polarisation
Ex : FT_beam_HERA_dipole (+ path)
- '' for computation from beam simulations (slow)
- FTBeam object. Make sure the polarisation and bandwidths
are compatible with uvp.
spw_array : list of ints, optional
Spectral window indices. If None, the window functions are computed on
all the uvp.spw_ranges, successively. Default: None.
verbose : bool, optional
If True, print progress, warnings and debugging info to stdout.
inplace : bool, optional
If True (default value), the UVPspec attribute window_function_array is filled with the
values computed in the function, and window_function_kperp_bins and
window_function_kpara_bins array are added as attributes.
If False, returns kperp_bins, kpara_bins and window functions computed.
Automatically set to False if blpair_groups is not None (that is,
if the window functions are not computed on all the blpairs).
add_to_history : str, optional
Added text to add to file history.
x_orientation: str, optional
Orientation in cardinal direction east or north of X dipole.
Default keeps polarization in X and Y basis.
Used to convert polstr to polnum and conversely. Default: None.
"""
if self.exact_windows and inplace:
warnings.warn("Exact window functions already computed, overwriting...")
blpair_groups, blpair_lens, _ = self.get_red_blpairs()
# check spw input and create array of spws to loop over
if spw_array is None:
# if no spw specified, use attribute
spw_array = self.spw_array
else:
spw_array = spw_array if isinstance(spw_array, (list, tuple, np.ndarray)) else [int(spw_array)]
# check if spw given is in uvp
assert np.all([spw in self.spw_array for spw in spw_array]), \
"input spw is not in UVPSpec.spw_array."
if inplace:
# set inplace to False
inplace = False
warnings.warn('inplace set to False because you are not considering' \
' all spectral windows in object.')
# Create new window function array
window_function_array = odict()
window_function_kperp, window_function_kpara = odict(), odict()
# Iterate over spectral windows
for spw in spw_array:
if verbose: print('spw = {}'.format(spw))
spw_window_function = []
spw_wf_kperp_bins, spw_wf_kpara_bins = [], []
# Iterate over polarizations
for i, p in enumerate(self.polpair_array):
# initialise UVWindow object
uvw = UVWindow.from_uvpspec(self, ipol=i, spw=spw, ftbeam=ftbeam,
x_orientation=x_orientation, verbose=verbose)
# extract kperp bins the window functions corresponding to the baseline
# lengths given as input
kperp_bins = uvw.get_kperp_bins(blpair_lens)
kpara_bins = uvw.get_kpara_bins(uvw.freq_array)
pol_window_function = np.zeros((self.Nbltpairs, self.get_dlys(spw).size, kperp_bins.size, kpara_bins.size))
# Iterate over baseline-pair groups
for j, blpg in enumerate(blpair_groups):
if verbose:
print('\rComputing for bl group {} of {}...'.format(j+1, len(blpair_groups)), end='')
# window functions identical for all times
window_function_blg = uvw.get_cylindrical_wf(blpair_lens[j],
kperp_bins = kperp_bins,
kpara_bins = kpara_bins)
if np.isnan(window_function_blg).any():
warnings.warn('nan values in the window functions at spw={}, blpair_lens={:.2f}'.format(spw, blpair_lens[j]))
# shape of window_function: (Ndlys, Nkperp, Nkpara)
# Iterate within a baseline-pair group
for k, blp in enumerate(blpg):
# iterate over baselines within group, which all have the same window function
for iblts in self.blpair_to_indices(blp):
pol_window_function[iblts, :, :, :] = np.copy(window_function_blg)
if verbose:
print('\rComputed wf for baseline-pair groups {} of {}.'.format(len(blpair_groups),len(blpair_groups)))
# Append to lists (spectral window)
spw_window_function.append(pol_window_function)
spw_wf_kperp_bins.append(kperp_bins.value)
spw_wf_kpara_bins.append(kpara_bins.value)
# Append to dictionaries
window_function_array[spw] = np.moveaxis(spw_window_function, 0, -1)
window_function_kperp[spw] = np.moveaxis(spw_wf_kperp_bins, 0, -1)
window_function_kpara[spw] = np.moveaxis(spw_wf_kpara_bins, 0, -1)
if inplace:
self.window_function_array = window_function_array
self.window_function_kperp = window_function_kperp
self.window_function_kpara = window_function_kpara
if np.all(spw_array==self.spw_array):
self.exact_windows = True
# Add to history
self.history = "Computed exact window functions [{}]\n{}\n{}\n{}".format(__version__, add_to_history, '-'*40, self.history)
# Validity check
self.check()
else:
return window_function_kperp, window_function_kpara, window_function_array
[docs] def check(self, just_meta=False):
"""
Run checks to make sure metadata and data arrays are properly defined
and in the right format. Raises AssertionErrors if checks fail.
Parameters
----------
just_meta : bool, optional
If True, only check metadata. Default: False.
"""
# iterate over all possible parameters
for p in self._all_params:
# only enforce existance if not just_meta
if not just_meta:
if p in self._req_params:
assert hasattr(self, p), \
"required parameter {} doesn't exist".format(p)
# if attribute exists, check its type
if hasattr(self, p):
a = getattr(self, '_' + p)
if hasattr(a, 'expected_type'):
err_msg = "attribute {} does not have expected data type {}".format(p, a.expected_type)
# ndarrays
if p in self._ndarrays:
assert isinstance(getattr(self, p), np.ndarray), \
"attribute {} needs to be an ndarray, {}".format(p, getattr(self, p).tostring())
if issubclass(getattr(self, p).dtype.type,
a.expected_type):
pass
else:
# try to cast into its dtype
try:
setattr(self, p,
getattr(self, p).astype(a.expected_type))
except:
raise ValueError(err_msg)
# dicts
elif p in self._dicts:
assert isinstance(getattr(self, p), (dict, odict)), \
"attribute {} needs to be a dictionary".format(p)
# iterate over keys
for k in getattr(self, p).keys():
assert isinstance(getattr(self, p)[k], np.ndarray), \
"values of attribute {} need to be ndarrays".format(p)
assert issubclass(getattr(self, p)[k].dtype.type, a.expected_type), err_msg
# immutables
elif p in self._immutables:
if not isinstance(getattr(self, p), a.expected_type):
try:
setattr(self, p, a.expected_type(getattr(self, p)))
except:
raise AssertionError(err_msg)
elif p in self._dicts_of_dicts:
assert isinstance(getattr(self, p), (dict, odict))
for k in getattr(self, p).keys():
assert isinstance(getattr(self, p)[k], (dict, odict))
for j in getattr(self, p)[k].keys():
assert isinstance(getattr(self, p)[k][j], np.ndarray)
try:
getattr(self, p)[k][j] = a.expected_type(getattr(self, p)[k][j])
except:
raise AssertionError(err_msg)
# check spw convention
assert set(self.spw_array) == set(np.arange(self.Nspws)), "spw_array must be np.arange(Nspws)"
# check choice of window functions
if self.exact_windows:
assert hasattr(self, 'window_function_array') and hasattr(self, 'window_function_kperp'), \
"Error with window functions: object has exact_windows=True but no related arrays stored."
def _clear(self):
"""
Clear UVPSpec of all parameters. Warning: this cannot be undone.
"""
for p in self._all_params:
if hasattr(self, p):
delattr(self, p)
def __str__(self):
"""
Output useful info about UVPSpec object.
"""
s = ""
s += " ATTRIBUTES\n"
s += "-"*12 + "\n"
for k in self._meta_attrs:
if hasattr(self, k) and 'history' not in k:
y = getattr(self, k)
if isinstance(y, np.ndarray):
s += "%18s: shape %s\n" % (k, y.shape)
else:
s += "%18s: %s\n" % (k, y)
s += "\n DATASETS\n"
s += "-"*12 + "\n"
for k in self._meta_dsets:
if hasattr(self, k):
s += "%18s: shape %s\n" % (k, getattr(self, k).shape)
return s
def __eq__(self, other, params=None, verbose=False):
""" Check equivalence between attributes of two UVPSpec objects """
if params is None:
params = self._all_params
try:
for p in params:
if p not in self._req_params \
and (not hasattr(self, p) and not hasattr(other, p)):
continue
if p in self._immutables:
assert getattr(self, p) == getattr(other, p)
elif p in self._ndarrays:
if issubclass(getattr(self, p).dtype.type, str):
assert np.all(getattr(self, p) == getattr(other, p))
else:
assert np.isclose(getattr(self, p), getattr(other, p)).all()
elif p in self._dicts:
for i in getattr(self, p):
assert np.isclose(getattr(self, p)[i],\
getattr(other, p)[i]).all()
except AssertionError:
if verbose:
print("UVPSpec parameter '{}' not equivalent between {} and {}" \
"".format(p, self.__repr__(), other.__repr__()))
return False
return True
def __add__(self, other, verbose=False):
"""
Combine the data of two UVPSpec objects together along a single axis
"""
return combine_uvpspec([self, other], verbose=verbose)
@property
def units(self):
"""
Return power spectrum units. See self.vis_units and self.norm_units.
"""
return "({})^2 {}".format(self.vis_units, self.norm_units)
[docs] def generate_noise_spectra(self, spw, polpair, Tsys, blpairs=None,
little_h=True, form='Pk', num_steps=2000,
component='real', scalar=None):
"""
Generate the expected RMS noise power spectrum given a selection of
spectral window, system temp. [K], and polarization. This estimate is
constructed as:
P_N = scalar * (Tsys * 1e3)^2 / (integration_time) / sqrt(Nincoherent)
[mK^2 h^-3 Mpc^3]
where scalar is the cosmological and beam scalar (i.e. X2Y * Omega_eff)
calculated from pspecbeam with noise_scalar = True, integration_time is
in seconds and comes from self.integration_array and Nincoherent is the
number of incoherent averaging samples and comes from `self.nsample_array`.
If `component` is `real` or `imag`, P_N is divided by an additional
factor of sqrt(2).
If the polarizations specified are pseudo Stokes pol (I, Q, U or V)
then an extra factor of 2 is divided.
If `form` is `DelSq` then a factor of `|k|^3 / (2pi^2)` is multiplied.
If real is True, a factor of sqrt(2) is divided to account for
discarding imaginary noise component.
For more details, see hera_pspec.noise.Sensitivity.calc_P_N, and
Cheng et al. 2018.
The default units of P_N are mK^2 h^-3 Mpc^3 (if little_h=True and
form='Pk'), but is different if those kwargs are altered.
Parameters
----------
spw : int
Spectral window index to generate noise curve for.
polpair : tuple or int or str
Polarization-pair selection in form of tuple (e.g. ('I', 'Q')) or
polpair int. Strings are expanded into polarization pairs, e.g.
'XX' becomes ('XX,'XX').
Tsys : dictionary, float or array
System temperature in Kelvin for each blpair. Key is blpair-integer,
value is Tsys float or ndarray. If fed as an ndarray, shape=(Ntpairs,)
blpairs : list
List of unique blair tuples or i12 integers to calculate noise
spectrum for default is to calculate for baseline pairs.
little_h : bool, optional
Whether to have cosmological length units be h^-1 Mpc or Mpc
Default: h^-1 Mpc
form : str, optional
Form of power spectra, whetehr P(k) or Delta^2(k). Valid options
are 'Pk' or 'DelSq'. Default: 'Pk'.
num_steps : int, optional
Number of frequency bins to use in integrating power spectrum
scalar in pspecbeam. Default: 2000.
component : str
Options=['real', 'imag', 'abs'].
If component is real or imag, divide by an extra factor of sqrt(2).
scalar : float
Optional noise scalar used to convert from Tsys to cosmological units
output from pspecbeam.compute_pspec_scalar(...,noise_scalar=True)
Default is None. If None provided, will calculate scalar in function.
Returns
-------
P_N : dict
Dictionary containing blpair integers as keys and float ndarrays
of noise power spectra as values, with ndarrays having shape
(Ntimes, Ndlys).
"""
# Check for str polpair and convert to integer
if isinstance(polpair, str):
polpair = uvputils.polpair_tuple2int((polpair, polpair))
# Assert polarization-pair type
if isinstance(polpair, tuple):
polpair = uvputils.polpair_tuple2int(polpair)
# Get polarization index
polpair_ind = self.polpair_to_indices(polpair)
# Calculate scalar
if scalar is None:
scalar = self.compute_scalar(spw, polpair, num_steps=num_steps,
little_h=little_h, noise_scalar=True)
# Get k vectors
if form == 'DelSq':
k_perp = self.get_kperps(spw, little_h=little_h)
k_para = self.get_kparas(spw, little_h=little_h)
k_mag = np.sqrt(k_perp[:, None]**2 + k_para[None, :]**2)
# get blpairs
if blpairs is None:
blpairs = _ordered_unique(self.blpair_array)
elif isinstance(blpairs[0], tuple):
blpairs = [self.antnums_to_blpair(blp) for blp in blpairs]
# Get delays
dlys = self.get_dlys(spw)
# handle Tsys
if not isinstance(Tsys, (dict, odict)):
if not isinstance(Tsys, np.ndarray):
Tsys = np.ones(self.Ntpairs) * Tsys
Tsys = dict([(blp, Tsys) for blp in blpairs])
# Iterate over blpairs to get P_N
P_N = odict()
for i, blp in enumerate(blpairs):
# get indices
inds = self.blpair_to_indices(blp)
assert isinstance(Tsys[blp], (float, int)) \
or Tsys[blp].shape[0] == self.Ntpairs, \
"Tsys must be a float or an ndarray with shape[0] == Ntpairs"
P_blp = []
# iterate over time axis
for j, ind in enumerate(inds):
# get integration time and n_samp
t_int = self.integration_array[spw][ind, polpair_ind]
n_samp = self.nsample_array[spw][ind, polpair_ind]
# get kvecs
if form == 'DelSq':
k = k_mag[ind]
else:
k = None
# Get noise power spectrum
pn = noise.calc_P_N(scalar, Tsys[blp][j], t_int, k=k,
Nincoherent=n_samp, form=form,
component=component)
# Put into appropriate form
if form == 'Pk':
pn = np.ones(len(dlys), float) * pn
# append to P_blp
P_blp.append(pn)
P_N[blp] = np.array(P_blp)
return P_N
[docs] def average_spectra(self, blpair_groups=None, time_avg=False,
blpair_weights=None, error_field=None, error_weights=None,
inplace=True, add_to_history=''):
"""
Average power spectra across the baseline-pair-time axis, weighted by
each spectrum's integration time.
This is an "incoherent" average, in the sense that this averages power
spectra, rather than visibility data. The `nsample_array` and
`integration_array` will be updated to reflect the averaging.
In the case of averaging across baseline pairs, the resultant averaged
spectrum is assigned to the zeroth blpair in the group. In the case of
time averaging, the time and LST arrays are assigned to the mean of the
averaging window.
Note that this is designed to be separate from spherical binning in k:
here we are not connecting k_perp modes to k_para modes. However, if
blpairs holds groups of iso baseline separation, then this is
equivalent to cylindrical binning in 3D k-space.
If you want help constructing baseline-pair groups from baseline
groups, see `self.get_blpair_groups_from_bl_groups`.
Parameters
----------
blpair_groups : list
List of list of baseline-pair group tuples or integers. All power
spectra in a baseline-pair group are averaged together. If a
baseline-pair exists in more than one group, a warning is raised.
Examples::
blpair_groups = [ [((1, 2), (1, 2)), ((2, 3), (2, 3))],
[((4, 6), (4, 6))]]
blpair_groups = [ [1002001002, 2003002003], [4006004006] ]
time_avg : bool, optional
If True, average power spectra across the time axis. Default: False.
blpair_weights : list, optional
List of float or int weights dictating the relative weight of each
baseline-pair when performing the average.
This is useful for bootstrapping. This should have the same shape
as blpair_groups if specified. The weights are automatically
normalized within each baseline-pair group. Default: None (all
baseline pairs have unity weights).
error_field: string or list, optional
If errorbars have been entered into stats_array, will do a weighted
sum to shrink the error bars down to the size of the averaged
data_array. Error_field strings be keys of stats_array. If list,
does this for every specified key. Every stats_array key that is
not specified is thrown out of the new averaged object.
error_weights: string, optional
error_weights specify which kind of errors we use for weights
during averaging power spectra. The weights are defined as
$w_i = 1/ sigma_i^2$, where $sigma_i$ is taken from the relevant
field of stats_array.
If `error_weight` is set to `None`, which means we just use the
integration time as weights. If `error_weights` is specified,
then it also gets appended to `error_field` as a list.
Default: None.
inplace : bool, optional
If True, edit data in self, else make a copy and return.
Default: True.
add_to_history : str, optional
Added text to add to file history.
Notes
-----
Currently, every baseline-pair in a blpair group must have the same
Ntpairs, unless time_avg=True. Future versions may support
baseline-pair averaging of heterogeneous time arrays. This includes
the scenario of repeated blpairs (e.g. in bootstrapping), which will
return multiple copies of their time_array.
"""
if inplace:
grouping.average_spectra(self,
blpair_groups=blpair_groups,
time_avg=time_avg,
error_field=error_field,
error_weights=error_weights,
blpair_weights=blpair_weights,
inplace=True,
add_to_history=add_to_history)
else:
return grouping.average_spectra(self,
blpair_groups=blpair_groups,
time_avg=time_avg,
error_field=error_field,
error_weights=error_weights,
blpair_weights=blpair_weights,
inplace=False,
add_to_history=add_to_history)
[docs] def fold_spectra(self):
"""
Average bandpowers from matching positive and negative delay bins onto a
purely positive delay axis. Negative delay bins are still populated, but
are filled with zero power. This is an in-place operation.
Will only work if self.folded == False, i.e. data is currently unfolded
across negative and positive delay. Because this averages the data, the
nsample array is multiplied by a factor of 2.
WARNING: This operation cannot be undone.
"""
grouping.fold_spectra(self)
[docs] def get_blpair_groups_from_bl_groups(self, blgroups, only_pairs_in_bls=False):
"""
Get baseline pair matches from a list of baseline groups.
Parameters
----------
blgroups : list
List of baseline groups, which themselves are lists of baseline
tuples or baseline i6 integers.
Ex: [ [(1, 2), (2, 3), (3, 4)], [(1, 4), (2, 5)] ]
only_pairs_in_bls : bool, optional
If True, select only baseline-pairs whose first _and_ second
baseline are both found in each baseline group. Default: False.
Returns
-------
blpair_groups : list
List of blpair groups, which themselves are lists of blpair
integers.
"""
blpair_groups = []
for blg in blgroups:
blp_select = uvputils._get_blpairs_from_bls(self, blg,
only_pairs_in_bls=only_pairs_in_bls)
blp = sorted(set(self.blpair_array[blp_select]))
if len(blp) > 0:
blpair_groups.append(blp)
return blpair_groups
def get_red_bls(self, bl_len_tol=1., bl_ang_tol=1.):
return uvputils._get_red_bls(self, bl_len_tol=bl_len_tol,
bl_ang_tol=bl_ang_tol)
def get_red_blpairs(self, bl_len_tol=1., bl_ang_tol=1.):
return uvputils._get_red_blpairs(self, bl_len_tol=bl_len_tol,
bl_ang_tol=bl_ang_tol)
[docs] def compute_scalar(self, spw, polpair, num_steps=1000, little_h=True,
noise_scalar=False):
"""
Compute power spectrum normalization scalar given an adopted cosmology
and a beam model. See pspecbeam.PSpecBeamBase.compute_pspec_scalar for
details.
Parameters
----------
spw : integer
Spectral window selection.
polpair : tuple or int or str
Which polarization pair to calculate the scalar for.
num_steps : int, optional
Number of integration bins along frequency in computing scalar.
Default: 1000.
little_h : bool, optional
Whether to use h^-1 units or not. Default: True.
noise_scalar : bool, optional
If True calculate noise pspec scalar, else calculate normal pspec
scalar. See pspecbeam.py for difference between normal scalar and
noise scalar. Default: False.
Returns
-------
scalar : float
Power spectrum normalization scalar.
"""
# make assertions
assert hasattr(self, 'cosmo'), \
"self.cosmo object must exist to compute scalar. See self.set_cosmology()"
assert hasattr(self, 'OmegaP') and hasattr(self, "OmegaPP") and hasattr(self, "beam_freqs"), "self.OmegaP, "\
"self.OmegaPP and self.beam_freqs must exist to compute scalar."
# get freq array of selected spw
spw_freqs = self.freq_array[self.spw_to_freq_indices(spw)]
# compute scalar
OP = self.OmegaP[:, self.polpair_to_indices(polpair)].squeeze()
OPP = self.OmegaPP[:, self.polpair_to_indices(polpair)].squeeze()
scalar = pspecbeam._compute_pspec_scalar(self.cosmo, self.beam_freqs,
OPP / OP**2, spw_freqs,
num_steps=num_steps,
taper=self.taper,
little_h=little_h,
noise_scalar=noise_scalar)
return scalar
def combine_uvpspec(uvps, merge_history=True, verbose=True):
"""
Combine (concatenate) multiple UVPSpec objects into a single object,
combining along one of either spectral window [spw], baseline-pair-times
[blpairts], or polarization [pol]. Certain meta-data of all of the UVPSpec
objs must match exactly, see get_uvp_overlap for details.
In addition, one can only combine data along a single data axis, with the
condition that all other axes match exactly.
Parameters
----------
uvps : list
A list of UVPSpec objects to combine.
merge_history : bool
If True, merge all histories. Else use zeroth object's history.
Returns
-------
u : UVPSpec object
A UVPSpec object with the data of all the inputs combined.
"""
# Perform type checks and get concatenation axis
(uvps, concat_ax, new_spws, new_blpts, new_polpairs,
static_meta) = get_uvp_overlap(uvps, just_meta=False, verbose=verbose)
Nuvps = len(uvps)
# Create a new uvp
u = UVPSpec()
# Sort new data axes
new_spws = sorted(new_spws)
new_blpts = sorted(new_blpts)
new_polpairs = [new_polpairs[i] for i in np.argsort(np.abs(new_polpairs))]
Nspws = len(new_spws)
Nbltpairs = len(new_blpts)
Ntpairs = len(set([(t1, t2) for blp, t1, t2 in new_blpts]))
Npols = len(new_polpairs)
# Store optional attrs only if all uvps have them
store_cov = np.all([hasattr(uvp, 'cov_array_real') for uvp in uvps])
store_window = np.all([hasattr(uvp, 'window_function_array') for uvp in uvps])
exact_windows = np.all([uvp.exact_windows for uvp in uvps])
store_stats = np.all([hasattr(uvp, 'stats_array') for uvp in uvps])
# Create new empty data arrays and fill spw arrays
u.data_array = odict()
u.integration_array = odict()
u.wgt_array = odict()
u.nsample_array = odict()
if store_window:
u.window_function_array = odict()
if exact_windows:
u.window_function_kperp = odict()
u.window_function_kpara = odict()
if store_cov:
# ensure cov model is the same for all uvps
if len(set([uvp.cov_model for uvp in uvps])) > 1:
store_cov = False
else:
u.cov_array_real = odict()
u.cov_array_imag = odict()
u.cov_model = uvps[0].cov_model
if store_stats:
# get shared stats keys
stored_stats = [set(uvp.stats_array.keys()) for uvp in uvps]
for i in range(len(stored_stats)):
stored_stats[0].intersection_update(stored_stats[i])
stored_stats = stored_stats[0]
# if nothing in common, set to False
if len(stored_stats) == 0:
store_stats = False
else:
u.stats_array = odict([(stat, odict()) for stat in stored_stats])
u.scalar_array = np.empty((Nspws, Npols), float)
u.freq_array, u.spw_array, u.dly_array = [], [], []
u.spw_dly_array, u.spw_freq_array = [], []
# Loop over new spectral windows and setup arrays
for i, spw in enumerate(new_spws):
# Initialize new arrays
u.data_array[i] = np.empty((Nbltpairs, spw[3], Npols), np.complex128)
u.integration_array[i] = np.empty((Nbltpairs, Npols), np.float64)
u.wgt_array[i] = np.empty((Nbltpairs, spw[2], 2, Npols), np.float64)
# spw[2] == Nfreqs (wgt_array is not resampled if Ndlys != Nfreqs,
# so needs to keep this shape)
u.nsample_array[i] = np.empty((Nbltpairs, Npols), np.float64)
if store_window:
if exact_windows:
Nkperp = uvps[0].window_function_kperp[i][:, 0].size
Nkpara = uvps[0].window_function_kpara[i][:, 0].size
u.window_function_array[i] = np.empty((Nbltpairs, spw[3], Nkperp, Nkpara, Npols), np.float64)
else:
u.window_function_array[i] = np.empty((Nbltpairs, spw[3], spw[3], Npols), np.float64)
if store_cov:
u.cov_array_real[i] = np.empty((Nbltpairs, spw[3], spw[3], Npols), np.float64)
u.cov_array_imag[i] = np.empty((Nbltpairs, spw[3], spw[3], Npols), np.float64)
if store_stats:
for stat in stored_stats:
u.stats_array[stat][i] = np.empty((Nbltpairs, spw[3], Npols), np.complex128)
# Set frequencies and delays: if concat_ax == 'spw' this is changed below
# assumes spw metadata are the same for all uvps
u.spw_array = uvps[0].spw_array.copy()
u.spw_freq_array = uvps[0].spw_freq_array.copy()
u.spw_dly_array = uvps[0].spw_dly_array.copy()
u.freq_array = uvps[0].freq_array.copy()
u.dly_array = uvps[0].dly_array.copy()
u.Nfreqs = len(np.unique(u.freq_array))
u.Nspwfreqs = len(u.spw_freq_array)
u.Ndlys = len(np.unique(u.dly_array))
u.Nspwdlys = len(u.spw_dly_array)
# other metadata
u.polpair_array = np.array(new_polpairs)
# Number of spectral windows, delays etc.
u.Nspws = Nspws
u.Nbltpairs = Nbltpairs
u.Npols = Npols
# Prepare time and label arrays
u.time_1_array, u.time_2_array = np.empty(Nbltpairs, np.float64), \
np.empty(Nbltpairs, np.float64)
u.time_avg_array, u.lst_avg_array = np.empty(Nbltpairs, np.float64), \
np.empty(Nbltpairs, np.float64)
u.lst_1_array, u.lst_2_array = np.empty(Nbltpairs, np.float64), \
np.empty(Nbltpairs, np.float64)
u.blpair_array = np.empty(Nbltpairs, np.int64)
u.labels = sorted(set(np.concatenate([uvp.labels for uvp in uvps])))
u.label_1_array = np.empty((Nspws, Nbltpairs, Npols), np.int32)
u.label_2_array = np.empty((Nspws, Nbltpairs, Npols), np.int32)
# get each uvp's data axes
uvp_spws = [_uvp.get_spw_ranges() for _uvp in uvps]
uvp_blpts = [list(zip(_uvp.blpair_array, _uvp.time_1_array, _uvp.time_2_array))
for _uvp in uvps]
uvp_polpairs = [_uvp.polpair_array.tolist() for _uvp in uvps]
# Construct dict of label indices, to be used for re-ordering later
u_lbls = {lbl: ll for ll, lbl in enumerate(u.labels)}
# Concatenate r_params arrays
#check that r_params are either all empty or all full.
_r_param_strs = [_uvp.r_params for _uvp in uvps]
if '' in _r_param_strs:
if not np.all(np.asarray([rp == '' for rp in _r_param_strs])):
raise ValueError("All r_params must be set or empty."
"Combining empty with non-empty r_params"
"is not yet supported.")
_r_params = [_uvp.get_r_params() for _uvp in uvps]
#check for conflicts by iterating through each key in the first _uvp, store
#in list of new keys.
r_params = {}
for _r_param in _r_params:
for rkey in _r_param:
if not rkey in r_params:
r_params[rkey] = _r_param[rkey]
elif r_params[rkey] != _r_param[rkey]:
#For now, we won't support inconsistent weightings.
raise ValueError("Conflict between weightings"
"Only consistent weightings are supported!")
# fill in data arrays depending on concat ax
if concat_ax == 'spw':
u.spw_array = np.arange(Nspws, dtype=np.int32)
freq_array, dly_array = [], []
spw_freq_array, spw_dly_array = [], []
# Concatenate spectral windows
for i, spw in enumerate(new_spws):
# get index of this new spw in uvps and its spw index
l = [spw in _u for _u in uvp_spws].index(True)
m = [spw == _spw for _spw in uvp_spws[l]].index(True)
# freq metadata
spw_freq_inds = np.where(uvps[l].spw_freq_array == m)[0]
freq_array.extend(uvps[l].freq_array[spw_freq_inds])
spw_freq_array.extend(np.ones(len(spw_freq_inds), dtype=np.int32) * i)
# dly metadata
spw_dly_inds = np.where(uvps[l].spw_dly_array == m)[0]
dly_array.extend(uvps[l].dly_array[spw_dly_inds])
spw_dly_array.extend(np.ones(len(spw_dly_inds), dtype=np.int32) * i)
# Lookup indices of new_blpts in the uvp_blpts[l] array
blpts_idxs = uvputils._fast_lookup_blpairts(uvp_blpts[l], new_blpts)
if i == 0: blpts_idxs0 = blpts_idxs.copy()
# Loop over polarizations
for k, p in enumerate(new_polpairs):
q = uvp_polpairs[l].index(p)
u.scalar_array[i, k] = uvps[l].scalar_array[m, q]
# Loop over blpair-times
for j, blpt in enumerate(new_blpts):
# get blpts indices
n = blpts_idxs[j]
# Data/weight/integration arrays
u.data_array[i][j, :, k] = uvps[l].data_array[m][n, :, q]
u.wgt_array[i][j, :, :, k] = uvps[l].wgt_array[m][n, :, :, q]
u.integration_array[i][j, k] = uvps[l].integration_array[m][n, q]
u.nsample_array[i][j, k] = uvps[l].nsample_array[m][n, q]
# Labels
lbl1 = uvps[l].label_1_array[m, n, q]
lbl2 = uvps[l].label_2_array[m, n, q]
u.label_1_array[i, j, k] = u_lbls[uvps[l].labels[lbl1]]
u.label_2_array[i, j, k] = u_lbls[uvps[l].labels[lbl2]]
if store_cov:
u.cov_array_real[i][j, :, :, k] = uvps[l].cov_array_real[m][n, :, :, q]
u.cov_array_imag[i][j, :, :, k] = uvps[l].cov_array_imag[m][n, :, :, q]
if store_window:
if exact_windows:
u.window_function_array[i][j,:, :, :,k] = uvps[l].window_function_array[m][n, :, :, :, q]
else:
u.window_function_array[i][j,:,:,k] = uvps[l].window_function_array[m][n, :, :, q]
if store_stats:
for stat in stored_stats:
u.stats_array[stat][i][j, :, k] = uvps[l].stats_array[stat][m][n, :, q]
u.freq_array = np.array(freq_array)
u.dly_array = np.array(dly_array)
u.spw_freq_array = np.array(spw_freq_array)
u.spw_dly_array = np.array(spw_dly_array)
u.Nfreqs = len(np.unique(u.freq_array))
u.Nspwfreqs = len(u.spw_freq_array)
u.Ndlys = len(np.unique(u.dly_array))
u.Nspwdlys = len(u.spw_dly_array)
# Populate new LST, time, and blpair arrays
for j, blpt in enumerate(new_blpts):
n = blpts_idxs0[j]
u.time_1_array[j] = uvps[0].time_1_array[n]
u.time_2_array[j] = uvps[0].time_2_array[n]
u.time_avg_array[j] = uvps[0].time_avg_array[n]
u.lst_1_array[j] = uvps[0].lst_1_array[n]
u.lst_2_array[j] = uvps[0].lst_2_array[n]
u.lst_avg_array[j] = uvps[0].lst_avg_array[n]
u.blpair_array[j] = uvps[0].blpair_array[n]
elif concat_ax == 'blpairts':
is_in = [uvputils._is_in(_blpts, new_blpts)
for _blpts in uvp_blpts]
# Concatenate blpair-times
for j, blpt in enumerate(new_blpts):
l = [isn[j] for isn in is_in].index(True)
n = uvp_blpts[l].index(blpt)
# Loop over spectral windows
for i, spw in enumerate(new_spws):
m = [spw == _spw for _spw in uvp_spws[l]].index(True)
# Loop over polarizations
for k, p in enumerate(new_polpairs):
q = uvp_polpairs[l].index(p)
u.data_array[i][j, :, k] = uvps[l].data_array[m][n, :, q]
u.wgt_array[i][j, :, :, k] = uvps[l].wgt_array[m][n, :, :, q]
u.integration_array[i][j, k] = uvps[l].integration_array[m][n, q]
u.nsample_array[i][j, k] = uvps[l].nsample_array[m][n, q]
if store_window:
if exact_windows:
u.window_function_array[i][j, :, :, :, k] = uvps[l].window_function_array[m][n, :, :, :, q]
else:
u.window_function_array[i][j, :, :, k] = uvps[l].window_function_array[m][n, :, :, q]
if store_cov:
u.cov_array_real[i][j, :, :, k] = uvps[l].cov_array_real[m][n, :, :, q]
u.cov_array_imag[i][j, :, :, k] = uvps[l].cov_array_imag[m][n, :, :, q]
if store_stats:
for stat in stored_stats:
u.stats_array[stat][i][j, :, k] = uvps[l].stats_array[stat][m][n, :, q]
# Labels
lbl1 = uvps[l].label_1_array[m, n, q]
lbl2 = uvps[l].label_2_array[m, n, q]
u.label_1_array[i, j, k] = u_lbls[uvps[l].labels[lbl1]]
u.label_2_array[i, j, k] = u_lbls[uvps[l].labels[lbl2]]
# scalar array, only do once per spw and pol
if j == 0:
u.scalar_array[i, k] = uvps[l].scalar_array[m, q]
# Populate new LST, time, and blpair arrays
u.time_1_array[j] = uvps[l].time_1_array[n]
u.time_2_array[j] = uvps[l].time_2_array[n]
u.time_avg_array[j] = uvps[l].time_avg_array[n]
u.lst_1_array[j] = uvps[l].lst_1_array[n]
u.lst_2_array[j] = uvps[l].lst_2_array[n]
u.lst_avg_array[j] = uvps[l].lst_avg_array[n]
u.blpair_array[j] = uvps[l].blpair_array[n]
elif concat_ax == 'polpairs':
# Concatenate polarizations
for k, p in enumerate(new_polpairs):
l = [p in _polpairs for _polpairs in uvp_polpairs].index(True)
q = uvp_polpairs[l].index(p)
# Get mapping of blpair-time indices between old UVPSpec objects
# and the new one
blpts_idxs = uvputils._fast_lookup_blpairts(uvp_blpts[l], new_blpts)
if k == 0: blpts_idxs0 = blpts_idxs.copy()
# Loop over spectral windows
for i, spw in enumerate(new_spws):
m = [spw == _spw for _spw in uvp_spws[l]].index(True)
u.scalar_array[i,k] = uvps[l].scalar_array[m,q]
# Loop over blpair-times
for j, blpt in enumerate(new_blpts):
n = blpts_idxs[j]
u.data_array[i][j, :, k] = uvps[l].data_array[m][n, :, q]
if store_window:
if exact_windows:
u.window_function_array[i][j, :, :, :, k] = uvps[l].window_function_array[m][n, :, :, :, q]
else:
u.window_function_array[i][j, :, :, k] = uvps[l].window_function_array[m][n, :, :, q]
if store_cov:
u.cov_array_real[i][j, :, :, k] = uvps[l].cov_array_real[m][n, :, :, q]
u.cov_array_imag[i][j, :, :, k] = uvps[l].cov_array_imag[m][n, :, :, q]
if store_stats:
for stat in stored_stats:
u.stats_array[stat][i][j, :, k] = uvps[l].stats_array[stat][m][n, :, q]
u.wgt_array[i][j, :, :, k] = uvps[l].wgt_array[m][n, :, :, q]
u.integration_array[i][j, k] = uvps[l].integration_array[m][n, q]
u.nsample_array[i][j, k] = uvps[l].nsample_array[m][n, q]
# Labels
lbl1 = uvps[l].label_1_array[m, n, q]
lbl2 = uvps[l].label_2_array[m, n, q]
u.label_1_array[i, j, k] = u_lbls[uvps[l].labels[lbl1]]
u.label_2_array[i, j, k] = u_lbls[uvps[l].labels[lbl2]]
for j, blpt in enumerate(new_blpts):
n = blpts_idxs0[j]
u.time_1_array[j] = uvps[0].time_1_array[n]
u.time_2_array[j] = uvps[0].time_2_array[n]
u.time_avg_array[j] = uvps[0].time_avg_array[n]
u.lst_1_array[j] = uvps[0].lst_1_array[n]
u.lst_2_array[j] = uvps[0].lst_2_array[n]
u.lst_avg_array[j] = uvps[0].lst_avg_array[n]
u.blpair_array[j] = uvps[0].blpair_array[n]
else:
# Make sure we have properly identified the concat_ax
raise ValueError("concat_ax {} not recognized.".format(concat_ax))
# Set baselines
u.Nblpairs = len(np.unique(u.blpair_array))
uvp_bls = [uvp.bl_array for uvp in uvps]
new_bls = np.unique(np.hstack(uvp_bls))
u.bl_array = np.array(new_bls)
u.Nbls = len(u.bl_array)
u.bl_vecs = []
u.Ntpairs = Ntpairs
u.Nbltpairs = Nbltpairs
for b, bl in enumerate(new_bls):
l = [bl in _bls for _bls in uvp_bls].index(True)
h = [bl == _bl for _bl in uvp_bls[l]].index(True)
u.bl_vecs.append(uvps[l].bl_vecs[h])
u.bl_vecs = np.array(u.bl_vecs)
u.Ntimes = len(np.unique(np.hstack([u.time_1_array, u.time_2_array])))
if merge_history:
u.history = "".join([uvp.history for uvp in uvps])
else:
u.history = uvps[0].history
u.labels = np.array(u.labels, str)
u.r_params = uvputils.compress_r_params(r_params)
for k in static_meta.keys():
setattr(u, k, static_meta[k])
# Run check to make sure the new UVPSpec object is valid
u.check()
return u
def get_uvp_overlap(uvps, just_meta=True, verbose=True):
"""
Given a list of UVPSpec objects or a list of paths to UVPSpec objects,
find a single data axis within ['spw', 'blpairts', 'pol'] where *all*
uvpspec objects contain non-overlapping data. Overlapping data are
delay spectra that have identical spw, blpair-time and pol metadata
between each other. If two uvps are completely overlapping (i.e. there
is not non-overlapping data axis) an error is raised. If there are
multiple non-overlapping data axes between all uvpspec pairs in uvps,
an error is raised.
All uvpspec objects must have certain attributes that agree exactly. These
include:
'channel_width', 'telescope_location', 'weighting', 'OmegaP',
'beam_freqs', 'OmegaPP', 'beamfile', 'norm', 'taper', 'vis_units',
'norm_units', 'folded', 'cosmo', 'scalar'
Parameters
----------
uvps : list
List of UVPSpec objects or list of string paths to UVPSpec objects
just_meta : boolean, optional
If uvps is a list of strings, when loading-in each uvpspec, only
load its metadata.
verbose : bool, optional
print feedback to standard output
Returns
-------
uvps : list
List of input UVPSpec objects
concat_ax : str
Data axis ['spw', 'blpairts', 'pols'] across data can be concatenated
unique_spws : list
List of unique spectral window tuples (spw_freq_start, spw_freq_end,
spw_Nfreqs, spw_Ndlys) across all input uvps
unique_blpts : list
List of unique baseline-pair-time tuples (blpair_integer,
time_1_array JD float, time_2_array JD float) across all input uvps
unique_polpairs : list
List of unique polarization-pair integers across all input uvps
"""
# type check
assert isinstance(uvps, (list, tuple, np.ndarray)), \
"uvps must be fed as a list"
assert isinstance(uvps[0], (UVPSpec, str)), \
"uvps must be fed as a list of UVPSpec objects or strings"
Nuvps = len(uvps)
# load uvps if fed as strings
if isinstance(uvps[0], str):
_uvps = []
for u in uvps:
uvp = UVPSpec()
uvp.read_hdf5(u, just_meta=just_meta)
_uvps.append(uvp)
uvps = _uvps
# ensure static metadata agree between all objects
static_meta = ['channel_width', 'telescope_location', 'weighting',
'OmegaP', 'beam_freqs', 'OmegaPP', 'beamfile', 'norm',
'taper', 'vis_units', 'norm_units', 'folded', 'cosmo', 'exact_windows']
for m in static_meta:
for u in uvps[1:]:
if hasattr(uvps[0], m) and hasattr(u, m):
assert uvps[0].__eq__(u, params=[m]), \
"Cannot concatenate UVPSpec objs: not all agree on '{}' attribute".format(m)
else:
assert not hasattr(uvps[0], m) and not hasattr(u, m), \
"Cannot concatenate UVPSpec objs: not all agree on '{}' attribute".format(m)
# get static metadata values
static_meta = odict([(k, getattr(uvps[0], k, None)) for k in static_meta
if getattr(uvps[0], k, None) is not None])
# create unique data axis lists
unique_spws = []
unique_blpts = []
unique_polpairs = []
data_concat_axes = odict()
blpts_comb = [] # Combined blpair + time
# find unique items
for uvp1 in uvps:
for s in uvp1.get_spw_ranges():
if s not in unique_spws: unique_spws.append(s)
for p in uvp1.polpair_array:
if p not in unique_polpairs: unique_polpairs.append(p)
uvp1_blpts_comb = [(blp, t1, t2) for blp, t1, t2 in zip(uvp1.blpair_array, uvp1.time_1_array, uvp1.time_2_array)]
blpts_comb.extend(uvp1_blpts_comb)
# Old way using np.unique and complex coding did not support
# two times.
unique_blpts = list(set(blpts_comb))
# iterate over uvps
for i, uvp1 in enumerate(uvps):
# get uvp1 sets and append to unique lists
uvp1_spws = uvp1.get_spw_ranges()
uvp1_blpts = list(zip(uvp1.blpair_array, uvp1.time_1_array, uvp1.time_2_array))
uvp1_polpairs = uvp1.polpair_array
# iterate over uvps
for j, uvp2 in enumerate(uvps):
if j <= i: continue
# get uvp2 sets
uvp2_spws = uvp2.get_spw_ranges()
uvp2_blpts = list(zip(uvp2.blpair_array, uvp2.time_1_array, uvp2.time_2_array))
uvp2_polpairs = uvp2.polpair_array
# determine if uvp1 and uvp2 are an identical match
spw_match = len(set(uvp1_spws) ^ set(uvp2_spws)) == 0
blpts_match = len(set(uvp1_blpts) ^ set(uvp2_blpts)) == 0
polpair_match = len(set(uvp1_polpairs) ^ set(uvp2_polpairs)) == 0
# ensure no partial-overlaps
if not spw_match:
assert len(set(uvp1_spws) & set(uvp2_spws)) == 0, \
"uvp {} and {} have partial overlap across spw, cannot combine".format(i, j)
if not blpts_match:
assert len(set(uvp1_blpts) & set(uvp2_blpts)) == 0, \
"uvp {} and {} have partial overlap across blpairts, cannot combine".format(i, j)
if not polpair_match:
assert len(set(uvp1_polpairs) & set(uvp2_polpairs)) == 0, \
"uvp {} and {} have partial overlap across pol-pair, cannot combine".format(i, j)
# assert all except 1 axis overlaps
matches = [spw_match, blpts_match, polpair_match]
assert sum(matches) != 3, \
"uvp {} and {} have completely overlapping data, cannot combine".format(i, j)
assert sum(matches) > 1, \
"uvp {} and {} are non-overlapping across multiple data axes, cannot combine".format(i, j)
concat_ax = ['spw', 'blpairts', 'polpairs'][matches.index(False)]
data_concat_axes[(i, j)] = concat_ax
if verbose:
print("uvp {} and {} are concatable across {} axis".format(i, j, concat_ax))
# assert all uvp pairs have the same (single) non-overlap (concat) axis
err_msg = "Non-overlapping data in uvps span multiple data axes:\n{}" \
"".format("\n".join( ["{} & {}: {}".format(i[0][0],i[0][1],i[1])
for i in data_concat_axes.items()] ))
assert len(set(data_concat_axes.values())) == 1, err_msg
# perform additional checks given concat ax
if concat_ax == 'blpairts':
# check scalar_array
assert np.all([np.isclose(uvps[0].scalar_array, u.scalar_array)
for u in uvps[1:]]), \
"scalar_array must be the same for all uvps given " \
"concatenation along blpairts."
return uvps, concat_ax, unique_spws, unique_blpts, \
unique_polpairs, static_meta
def _ordered_unique(arr):
"""
Get the unique elements of an array while preserving order.
"""
arr = np.asarray(arr)
_, idx = np.unique(arr, return_index=True)
return arr[np.sort(idx)]