import numpy as np
from collections import OrderedDict as odict
import os, copy, shutil, operator, ast, fnmatch
from hera_pspec import conversions, noise, version, pspecbeam, grouping
from hera_pspec import uvpspec_utils as uvputils
from hera_pspec.parameter import PSpecParam
from pyuvdata import uvutils as uvutils
import h5py
[docs]class UVPSpec(object):
"""
An object for storing power spectra generated by hera_pspec and a file-format
for its data and meta-data.
"""
[docs] def __init__(self):
"""
An object for storing power spectra and associated metadata generated
by hera_pspec.
"""
# Summary attributes
self._Ntimes = PSpecParam("Ntimes", description="Number of unique times.", expected_type=int)
self._Nblpairts = PSpecParam("Nblpairts", description="Total number of baseline-pair times.", expected_type=int)
self._Nblpairs = PSpecParam("Nblpairs", description='Total number of baseline-pairs.', expected_type=int)
self._Nspwdlys = PSpecParam("Nspwdlys", description="Total number of delay bins across all sub-bands.", expected_type=int)
self._Nspws = PSpecParam("Nspws", description="Number of spectral windows.", expected_type=int)
self._Ndlys = PSpecParam("Ndlys", description="Total number of delay bins.", expected_type=int)
self._Nfreqs = PSpecParam("Nfreqs", description="Total number of frequency bins in the original data.", expected_type=int)
self._Npols = PSpecParam("Npols", description="Number of polarizations in the data.", expected_type=int)
self._history = PSpecParam("history", description='The file history.', expected_type=str)
# Data attributes
desc = "Power spectrum data dictionary with spw integer as keys and values as complex ndarrays."
self._data_array = PSpecParam("data_array", description=desc, expected_type=dict, form="(Nblpairts, Ndlys, Npols)")
desc = "Weight dictionary for original two datasets. The second axis holds [dset1_wgts, dset2_wgts] in that order."
self._wgt_array = PSpecParam("wgt_array", description=desc, expected_type=dict, form="(Nblpairts, Nfreqs, 2, Npols)")
desc = "Integration time dictionary. This holds the average integration time [seconds] of each delay spectrum in the data. " \
"This is not necessarily equal to the integration time of the visibility data: If data have been coherently averaged " \
"(i.e. averaged before squaring), than this is the sum of each spectrum's integration time."
self._integration_array = PSpecParam("integration_array", description=desc, expected_type=dict, form="(Nblpairts, Npols)")
desc = "Nsample dictionary, if the pspectra have been incoherently averaged (i.e. averaged after squaring), this is " \
"the effective number of samples in that average (float type). This is not the same as the pyuvdata.UVData nsample_array."
self._nsample_array = PSpecParam("nsample_array", description=desc, expected_type=dict, form="(Nblpairts, Npols)")
self._spw_array = PSpecParam("spw_array", description="Spw integer array.", form="(Nspwdlys,)")
self._freq_array = PSpecParam("freq_array", description="Frequency array of the original data in Hz.", form="(Nspwdlys,)")
self._dly_array = PSpecParam("dly_array", description="Delay array in seconds.", form="(Nspwdlys,)")
desc = "Polarization integers of power spectra. Stokes 1:4 (I,Q,U,V); circular -1:-4 (RR,LL,RL,LR); linear -5:-8 (XX,YY,XY,YX)"
self._pol_array = PSpecParam("pol_array", description=desc, form="(Npols,)")
self._lst_1_array = PSpecParam("lst_1_array", description="LST array of the first bl in the bl-pair [radians].", form="(Nblpairts,)")
self._lst_2_array = PSpecParam("lst_2_array", description="LST array of the second bl in the bl-pair [radians].", form="(Nblpairts,)")
self._lst_avg_array = PSpecParam("lst_avg_array", description="Average of the lst_1_array and lst_2_array [radians].", form="(Nblpairts,)")
self._time_1_array = PSpecParam("time_1_array", description="Time array of the first bl in the bl-pair [Julian Date].", form="(Nblpairts,)")
self._time_1_array = PSpecParam("time_2_array", description="Time array of the second bl in the bl-pair [Julian Date].", form="(Nblpairts,)")
self._time_avg_array = PSpecParam("time_avg_array", description="Average of the time_1_array and time_2_array [Julian Date].", form='(Nblpairts,)')
self._blpair_array = PSpecParam("blpair_array", description="Baseline-pair integer for all baseline-pair times.", form="(Nblpairts,)")
# Baseline attributes
self._Nbls = PSpecParam("Nbls", description="Number of unique baseline integers.", expected_type=int)
self._bl_vecs = PSpecParam("bl_vecs", description="ndarray of baseline separation vectors in the ITRF frame [meters]. To get it in ENU frame see self.get_ENU_bl_vecs().", expected_type=np.ndarray, form="(Nbls,)")
self._bl_array = PSpecParam("bl_array", description="All unique baseline (antenna-pair) integers.", expected_type=np.ndarray, form="(Nbls,)")
# Misc Attributes
self._channel_width = PSpecParam("channel_width", description="width of visibility frequency channels in Hz.", expected_type=float)
self._telescope_location = PSpecParam("telescope_location", description="telescope location in ECEF frame [meters]. To get it in Lat/Lon/Alt see pyuvdata.utils.LatLonAlt_from_XYZ().", expected_type=np.ndarray)
self._weighting = PSpecParam("weighting", description="Form of data weighting used when forming power spectra.", expected_type=str)
self._norm = PSpecParam("norm", description="Normalization method adopted in OQE (M matrix).", expected_type=str)
self._taper = PSpecParam("taper", description='Taper function applied to visibility data before FT."', expected_type=str)
self._vis_units = PSpecParam("vis_units", description="Units of the original visibility data used to form the power spectra.", expected_type=str)
self._norm_units = PSpecParam("norm_units", description="Power spectra normalization units, i.e. telescope units [Hz str] or cosmological [(h^-3) Mpc^3].", expected_type=str)
self._filename1 = PSpecParam("filename1", description="filename of data from first dataset", expected_type=str)
self._filename2 = PSpecParam("filename2", description="filename of data from second dataset", expected_type=str)
self._label1 = PSpecParam("label1", description="label of data from first dataset", expected_type=str)
self._label2 = PSpecParam("label2", description="label of data from second dataset", expected_type=str)
self._git_hash = PSpecParam("git_hash", description="GIT hash of hera_pspec when pspec was generated.", expected_type=str)
self._folded = PSpecParam("folded", description="if power spectra are folded (i.e. averaged) onto purely positive delay axis. Default is False", expected_type=bool)
self._scalar_array = PSpecParam("scalar_array", description="Power spectrum normalization scalar from pspecbeam module.", expected_type=np.ndarray, form="(Nspws, Npols)")
self._beamfile = PSpecParam("beamfile", description="filename of beam-model used to normalized pspectra.", expected_type=str)
self._OmegaP = PSpecParam("OmegaP", description="Integral of unitless beam power over the sky [steradians].", form="(Nbeam_freqs, Npols)")
self._OmegaPP = PSpecParam("OmegaP", description="Integral of unitless beam power squared over the sky [steradians].", form="(Nbeam_freqs, Npols)")
self._beam_freqs = PSpecParam("beam_freqs", description="Frequency bins of the OmegaP and OmegaPP beam-integral arrays [Hz].", form="(Nbeam_freqs,)")
self._cosmo = PSpecParam("cosmo", description="Instance of conversion.Cosmo_Conversions class.")
# collect all parameters: required and non-required
self._all_params = sorted(map(lambda p: p[1:], fnmatch.filter(self.__dict__.keys(), '_*')))
# specify required params: these are required for read / write and self.check()
self._req_params = ["Ntimes", "Nblpairts", "Nblpairs", "Nspwdlys", "Nspws", "Ndlys",
"Npols", "Nfreqs", "history", "data_array", "wgt_array", "integration_array",
"spw_array", "freq_array", "dly_array", "pol_array", "lst_1_array",
"lst_2_array", "time_1_array", "time_2_array", "blpair_array", "Nbls",
"bl_vecs", "bl_array", "channel_width", "telescope_location", "weighting",
"vis_units", "norm_units", "taper", "norm", "git_hash", "nsample_array",
'lst_avg_array', 'time_avg_array', 'folded', "scalar_array"]
# all parameters must fall into one and only one of the following groups, which are used in __eq__
self._immutables = ["Ntimes", "Nblpairts", "Nblpairs", "Nspwdlys", "Nspws", "Ndlys",
"Npols", "Nfreqs", "history", "Nbls", "channel_width", "weighting",
"vis_units", "filename1", "filename2", "label1", "label2", "norm",
"norm_units", "taper", "git_hash", "cosmo", "beamfile" ,'folded']
self._ndarrays = ["spw_array", "freq_array", "dly_array", "pol_array", "lst_1_array",
'lst_avg_array', 'time_avg_array', "lst_2_array", "time_1_array",
"time_2_array", "blpair_array", "OmegaP", "OmegaPP", "beam_freqs",
"bl_vecs", "bl_array", "telescope_location", "scalar_array"]
self._dicts = ["data_array", "wgt_array", "integration_array", "nsample_array"]
# define which attributes are considred meta data. Large attrs should be constructed as datasets
self._meta_dsets = ["lst_1_array", "lst_2_array", "time_1_array", "time_2_array", "blpair_array",
"bl_vecs", "bl_array", 'lst_avg_array', 'time_avg_array', 'OmegaP', 'OmegaPP']
self._meta_attrs = sorted(set(self._all_params) - set(self._dicts) - set(self._meta_dsets))
self._meta = sorted(set(self._meta_dsets).union(set(self._meta_attrs)))
# check all params are covered
assert len(set(self._all_params) - set(self._dicts) - set(self._immutables) - set(self._ndarrays)) == 0
# Default parameter values
self.folded = False
self.git_hash = version.git_hash
[docs] def get_data(self, key, *args):
"""
Slice into data_array with a specified data key in the format
(spw, ((ant1, ant2), (ant3, ant4)), pol)
or
(spw, blpair-integer, pol)
where spw is the spectral window integer, ant1 etc. are integers,
and pol is either a polarization string (ex. 'XX') or integer (ex. -5).
Parameters
----------
key : tuple
Baseline-pair key
Returns
-------
data : complex ndarray
Shape (Ntimes, Ndlys)
"""
spw, blpairts, pol = self.key_to_indices(key, *args)
# if data has been folded, return only positive delays
if self.folded:
Ndlys = self.data_array[spw].shape[1]
return self.data_array[spw][blpairts, Ndlys//2+1:, pol]
# else return all delays
else:
return self.data_array[spw][blpairts, :, pol]
[docs] def get_wgts(self, key, *args):
"""
Slice into wgt_array with a specified data key in the format
(spw, ((ant1, ant2), (ant3, ant4)), pol)
or
(spw, blpair-integer, pol)
where spw is the spectral window integer, ant1 etc. are integers,
and pol is either a polarization string (ex. 'XX') or integer (ex. -5).
Parameters
----------
key : tuple, baseline-pair key
Returns
-------
wgts : float ndarray
Has shape (2, Ntimes, Ndlys), where the zeroth axis holds
[wgt_1, wgt_2] in that order.
"""
spw, blpairts, pol = self.key_to_indices(key, *args)
return self.wgt_array[spw][blpairts, :, :, pol]
[docs] def get_integrations(self, key, *args):
"""
Slice into integration_array with a specified data key in the format::
(spw, ((ant1, ant2), (ant3, ant4)), pol)
or::
(spw, blpair-integer, pol)
where spw is the spectral window integer, ant1 etc. are integers,
and pol is either a polarization string (ex. 'XX') or integer (ex. -5).
Parameters
----------
key : tuple
Baseline-pair key
Returns
-------
data : float ndarray
Has shape (Ntimes,)
"""
spw, blpairts, pol = self.key_to_indices(key, *args)
return self.integration_array[spw][blpairts, pol]
[docs] def get_nsamples(self, key, *args):
"""
Slice into nsample_array with a specified data key in the format
(spw, ((ant1, ant2), (ant3, ant4)), pol)
or
(spw, blpair-integer, pol)
where spw is the spectral window integer, ant1 etc. are integers,
and pol is either a polarization string (ex. 'XX') or integer (ex. -5).
Parameters
----------
key : tuple, baseline-pair key
Returns
-------
data : float ndarray with shape (Ntimes,)
"""
spw, blpairts, pol = self.key_to_indices(key, *args)
return self.nsample_array[spw][blpairts, pol]
[docs] def get_dlys(self, key):
"""
Get array of delays given a spectral window selection.
Parameters
----------
key : int, or tuple with integer
Spectral window selection
Returns
-------
dlys : float ndarray, contains delays in nanosec of pspectra given spw
"""
indices = self.spw_to_indices(key)
return self.dly_array[indices]
[docs] def get_blpair_seps(self):
"""
For each baseline-pair, get the average baseline separation in ENU
frame in meters.
Returns
-------
blp_avg_sep : float ndarray
shape=(Nblpairts,)
"""
# get list of bl separations
bl_vecs = self.get_ENU_bl_vecs()
bls = self.bl_array.tolist()
blseps = np.array(map(lambda bl: np.linalg.norm(bl_vecs[bls.index(bl)]), self.bl_array))
# construct empty blp_avg_sep array
blp_avg_sep = np.empty(self.Nblpairts, np.float)
# construct blpair_bls
blpairs = np.unique(self.blpair_array)
bl1 = np.floor(blpairs / 1e6)
blpair_bls = np.vstack([bl1, blpairs - bl1*1e6]).astype(np.int).T
# iterate over blpairs
for blp, bl in zip(blpairs, blpair_bls):
avg_sep = np.mean([blseps[bls.index(bl[0])], blseps[bls.index(bl[1])]])
inds = self.blpair_to_indices(blp)
blp_avg_sep[inds] = avg_sep
return blp_avg_sep
[docs] def get_kperps(self, spw, little_h=True):
"""
Get transverse (perpendicular) cosmological wavevector for each
baseline-pair given an adopted cosmology and a spw selection.
Parameters
----------
spw : int, choice of spectral window
little_h : boolean, optional
Whether to have cosmological length units be h^-1 Mpc or Mpc
Default: h^-1 Mpc
Returns
-------
k_perp : float ndarray, transverse wave-vectors, shape=(Nblpairs,)
"""
# assert cosmo
assert hasattr(self, 'cosmo'), "self.cosmo must exist to form cosmological " \
"wave-vectors. See self.set_cosmology()"
# calculate mean redshift of band
avg_z = self.cosmo.f2z(np.mean(self.freq_array[self.spw_to_indices(spw)]))
# get kperps
blpair_seps = self.get_blpair_seps()
k_perp = blpair_seps * self.cosmo.bl_to_kperp(avg_z, little_h=little_h)
return k_perp
[docs] def get_kparas(self, spw, little_h=True):
"""
Get radial (parallel) cosmological wavevectors for
power spectra given an adopted cosmology and a spw selection.
Parameters
----------
spw : int, choice of spectral window
little_h : boolean, optional
Whether to have cosmological length units be h^-1 Mpc or Mpc
Default: h^-1 Mpc
Returns (k_perp, k_para)
-------
k_para : float ndarray, radial wave-vectors, shape=(Ndlys given spw)
"""
# assert cosmo
assert hasattr(self, 'cosmo'), "self.cosmo must exist to form cosmological " \
"wave-vectors. See self.set_cosmology()"
# calculate mean redshift of band
avg_z = self.cosmo.f2z(np.mean(self.freq_array[self.spw_to_indices(spw)]))
# get kparas
k_para = self.get_dlys(spw) * self.cosmo.tau_to_kpara(avg_z, little_h=little_h)
return k_para
[docs] def convert_to_deltasq(self, little_h=True, inplace=True):
"""
Convert from P(k) to Delta^2(k) by multiplying by k^3 / (2pi^2).
The units of the output is therefore the current units (self.units) times h^3 Mpc^-3,
where the h^3 is only included if little_h == True.
Parameters
----------
inplace : boolean
If True edit and overwrite arrays in self, else make a copy of
self and return.
"""
# copy object
if inplace:
uvp = self
else:
uvp = copy.deepcopy(self)
# loop over spectral windows
for spw in range(uvp.Nspws):
# get k vectors
k_perp = uvp.get_kperps(spw, little_h=little_h)
k_para = uvp.get_kparas(spw, little_h=little_h)
k_mag = np.sqrt(k_perp[:, None, None]**2 + k_para[None, :, None]**2)
# multiply into data
uvp.data_array[spw] *= k_mag**3 / (2*np.pi**2)
# edit units
uvp.norm_units = "k^3 / (2pi^2)"
if inplace == False:
return uvp
[docs] def blpair_to_antnums(self, blpair):
"""
Convert baseline-pair integer to nested tuple of antenna numbers.
Parameters
----------
blpair : i12 int
baseline-pair integer ID
Returns
-------
antnums : tuple
Nested tuple containing baseline-pair antenna numbers.
Ex. ((ant1, ant2), (ant3, ant4))
"""
return uvputils._blpair_to_antnums(blpair)
[docs] def antnums_to_blpair(self, antnums):
"""
Convert nested tuple of antenna numbers to baseline-pair integer.
Parameters
----------
antnums : tuple
nested tuple containing integer antenna numbers for a baseline-pair.
Ex. ((ant1, ant2), (ant3, ant4))
Returns
-------
blpair : i12 integer
baseline-pair integer
"""
return uvputils._antnums_to_blpair(antnums)
[docs] def bl_to_antnums(self, bl):
"""
Convert baseline (antenna-pair) integer to nested tuple of antenna numbers.
Parameters
----------
bl : i6 int
baseline integer ID
Returns
-------
antnums : tuple
tuple containing baseline antenna numbers. Ex. (ant1, ant2)
"""
return uvputils._bl_to_antnums(bl)
[docs] def antnums_to_bl(self, antnums):
"""
Convert tuple of antenna numbers to baseline integer.
Parameters
----------
antnums : tuple
Tuple containing integer antenna numbers for a baseline.
Ex. (ant1, ant2)
Returns
-------
bl : i6 integer
Baseline integer.
"""
return uvputils._antnums_to_bl(antnums)
[docs] def blpair_to_indices(self, blpair):
"""
Convert a baseline-pair nested tuple ((ant1, ant2), (ant3, ant4)) or
a baseline-pair integer into indices to index the blpairts axis of
data_array.
Parameters
----------
blpair : tuples or ints
Nested tuple or blpair i12 integer, Ex. ((1, 2), (3, 4)),
or list of blpairs.
"""
# convert blpair to integer if fed as tuple
if isinstance(blpair, tuple):
blpair = [self.antnums_to_blpair(blpair)]
elif isinstance(blpair, (np.int, int)):
blpair = [blpair]
elif isinstance(blpair, list):
if isinstance(blpair[0], tuple):
blpair = map(lambda blp: self.antnums_to_blpair(blp), blpair)
# assert exists in data
assert np.array(map(lambda b: b in self.blpair_array, blpair)).all(), "blpairs {} not all found in data".format(blpair)
return np.arange(self.Nblpairts)[reduce(operator.add, map(lambda b: self.blpair_array == b, blpair))]
[docs] def spw_to_indices(self, spw):
"""
Convert a spectral window integer into a list of indices to index
into the spwdlys axis of dly_array and/or freq_array.
If self.folded == False, return indices for both positive and negative
delay bins, else if self.folded == True, returns indices for only
positive delay bins.
Parameters
----------
spw : int
Spectral window index or list of indices.
"""
# convert to list if int
if isinstance(spw, (np.int, int)):
spw = [spw]
# assert exists in data
assert np.array(map(lambda s: s in self.spw_array, spw)).all(), "spws {} not all found in data".format(spw)
# get select boolean array
select = reduce(operator.add, map(lambda s: self.spw_array == s, spw))
if self.folded:
select[self.dly_array < 1e-10] = False
return np.arange(self.Nspwdlys)[select]
[docs] def pol_to_indices(self, pol):
"""
Map a polarization integer or str to its index in pol_array
Parameters
----------
pol : str or int
Polarization string (ex. 'XX') or integer (ex. -5), or a list of
strs or ints.
Returns
-------
indices : int
Index of pol in pol_array.
"""
# convert pol to int if str
if isinstance(pol, (str, np.str)):
pol = [uvutils.polstr2num(pol)]
elif isinstance(pol, (int, np.int)):
pol = [pol]
elif isinstance(pol, (list, tuple)):
for i in range(len(pol)):
if isinstance(pol[i], (np.str, str)):
pol[i] = uvutils.polstr2num(pol[i])
# ensure all pols exist in data
assert np.array(map(lambda p: p in self.pol_array, pol)).all(), "pols {} not all found in data".format(pol)
indices = np.arange(self.Npols)[reduce(operator.add, map(lambda p: self.pol_array == p, pol))]
return indices
[docs] def time_to_indices(self, time, blpairs=None):
"""
Convert a time [Julian Date] from self.time_avg_array to an array that
indexes the elements at which it appears in that array. Can optionally
take a blpair selection to further select the indices at which both
that time and blpair are satisfied.
Parameters
----------
time : float
Julian Date time from self.time_avg_array, Ex: 2458042.12242
blpairs : tuple or int, optional
List of blpair tuples or integers that further selects the elements
at which both time and blpairs are satisfied.
Returns
-------
indices : integer ndarray
Contains indices at which selection is satisfied along the
blpairts axis.
"""
time_select = np.isclose(self.time_avg_array, time, rtol=1e-10)
if blpairs is None:
return np.arange(self.Nblpairts)[time_select]
else:
blp_select = np.zeros(self.Nblpairts, np.bool)
if isinstance(blpairs, (tuple, int, np.int)):
blpairs = [blpairs]
for blp in blpairs:
blp_select[self.blpair_to_indices(blp)] = True
time_select *= blp_select
return np.arange(self.Nblpairts)[time_select]
[docs] def key_to_indices(self, key, *args):
"""
Convert a data key into relevant slice arrays. A data key takes the form
(spw_integer, ((ant1, ant2), (ant3, ant4)), pol_string)
or
(spw, blpair-integer, pol)
where spw is the spectral window integer, ant1 etc. are integers,
and pol is either a polarization string (ex. 'XX') or integer (ex. -5).
One can also expand this key into the kwarg slots, such that
key=spw, key2=blpair, and key3=pol.
The key can also be a dictionary in the form::
key = {
'spw' : spw_integer,
'blpair' : ((ant1, ant2), (ant3, ant4))
'pol' : pol_string
}
and it will parse the dictionary for you.
Parameters
----------
key : tuple
Baseline-pair key.
Returns
-------
spw : int
Spectral window index.
blpairts : list
List of integers to apply along blpairts axis.
pol : int
Polarization index.
"""
# assert key length
if len(args) == 0:
assert len(key) == 3, "length of key must be 3."
if isinstance(key, (odict, dict)):
key = (key['spw'], key['blpair'], key['pol'])
elif len(args) > 0:
assert len(args) == 2, "length of key must be 3."
assert isinstance(args[0], (tuple, int, np.int)) and isinstance(args[1], (np.str, str, int, np.int)), "key must be ordered as (spw, blpair, pol)"
key = (key, args[0], args[1])
# assign key elements
spw = key[0]
blpair = key[1]
pol = key[2]
# assert types
assert isinstance(spw, (int, np.int)), "spw must be an integer"
assert isinstance(blpair, (int, np.int, tuple)), "blpair must be an integer or nested tuple"
assert isinstance(pol, (np.str, str, np.int, int)), "pol must be a string or integer"
# convert blpair to int if not int
if type(blpair) == tuple:
blpair = self.antnums_to_blpair(blpair)
# convert pol to int if str
if type(pol) in (str, np.str):
pol = uvutils.polstr2num(pol)
# check attributes exist in data
assert spw in self.spw_array, "spw {} not found in data".format(spw)
assert blpair in self.blpair_array, "blpair {} not found in data".format(blpair)
assert pol in self.pol_array, "pol {} not found in data".format(pol)
# index polarization array
pol = self.pol_to_indices(pol)
# index blpairts
blpairts = self.blpair_to_indices(blpair)
return spw, blpairts, pol
[docs] def select(self, spws=None, bls=None, only_pairs_in_bls=True, blpairs=None,
times=None, pols=None, inplace=True):
"""
Select function for selecting out certain slices of the data.
Parameters
----------
spws : list, optional
List of spectral window integers to select. If None, all will be
selected. Default: None.
bls : list of i6 baseline integers or baseline tuples, optional
Example: (2,3). Select all baseline-pairs whose first _or_ second
baseline are in bls list. This changes if only_pairs_in_bls == True.
If None, all baselines are selected (subject to other options).
Default: None.
only_pairs_in_bls : bool, optional
If True, keep only baseline-pairs whose first _and_ second baseline
are found in bls list. Default: True.
blpairs : list of baseline-pair tuples or integers, optional
List of baseline-pairs to keep. If bls is also fed, this list is
concatenated onto the baseline-pair list constructed from the
bls selection. If None, all valid baseline pairs are selected.
Default: None.
times : array_like, optional
Float ndarray of times from the time_avg_array to select. If None,
all times are kept. Default: None.
pols : list of str or int, optional
List of polarizations to keep. See pyuvdata.utils.polstr2num for
acceptable options. If None, all polarizations are kept. Default:
None.
inplace : bool, optional
If True, edit and overwrite arrays in self, else make a copy of
self and return. Default: True.
Returns
-------
uvp : UVPSpec, optional
If inplace=False, return a new UVPSpec object containing only the
selected data.
"""
if inplace:
uvp = self
else:
uvp = copy.deepcopy(self)
uvputils._select(uvp, spws=spws, bls=bls,
only_pairs_in_bls=only_pairs_in_bls,
blpairs=blpairs, times=times, pols=pols)
if inplace == False:
return uvp
[docs] def get_ENU_bl_vecs(self):
"""
Return baseline vector array in TOPO (ENU) frame in meters, with
matched ordering of self.bl_vecs.
Returns
-------
blvecs : array_like
Array of baseline vectors.
"""
return uvutils.ENU_from_ECEF(
(self.bl_vecs + self.telescope_location).T, \
*uvutils.LatLonAlt_from_XYZ(self.telescope_location)).T
[docs] def read_from_group(self, grp, just_meta=False, spws=None, bls=None,
blpairs=None, times=None, pols=None,
only_pairs_in_bls=False):
"""
Clear current UVPSpec object and load in data from specified HDF5 group.
Parameters
----------
grp : HDF5 group
HDF5 group to load data from.
just_meta : bool, optional
If True, read-in metadata but ignore data, wgts and integration
arrays. Default: False.
spws : list of tuple, optional
List of spectral window integers to select. Default: None (loads
all channels).
bls : list of i6 baseline integers or baseline tuples
Select all baseline-pairs whose first _or_ second baseline are in
the list. This changes if only_pairs_in_bls == True.
Example tuple: (2, 3). Default: None (loads all bl pairs).
blpairs : list of baseline-pair tuples or integers
List of baseline pairs to keep. If bls is also fed, this list is
concatenated onto the baseline-pair list constructed from from the
bls selection.
times : float ndarray
Times from the time_avg_array to keep.
pols : list of str or int
List of polarization strings or integers to keep. See
pyuvdata.utils.polstr2num for acceptable options.
only_pairs_in_bls : bool, optional
If True, keep only baseline-pairs whose first _and_ second baseline
are found in the 'bls' list. Default: False.
"""
# Clear all data in the current object
self._clear()
# Load-in meta data
for k in grp.attrs:
if k in self._meta_attrs:
setattr(self, k, grp.attrs[k])
for k in grp:
if k in self._meta_dsets:
setattr(self, k, grp[k][:])
# Use _select() to pick out only the requested baselines/spws
if just_meta:
uvputils._select(self, spws=spws, bls=bls,
only_pairs_in_bls=only_pairs_in_bls,
blpairs=blpairs, times=times, pols=pols)
else:
uvputils._select(self, spws=spws, bls=bls,
only_pairs_in_bls=only_pairs_in_bls,
blpairs=blpairs, times=times, pols=pols,
h5file=grp)
# handle cosmo
if hasattr(self, 'cosmo'):
self.cosmo = conversions.Cosmo_Conversions(**ast.literal_eval(self.cosmo))
self.check(just_meta=just_meta)
[docs] def read_hdf5(self, filepath, just_meta=False, spws=None, bls=None,
blpairs=None, times=None, pols=None,
only_pairs_in_bls=False):
"""
Clear current UVPSpec object and load in data from an HDF5 file.
Parameters
----------
filepath : str
Path to HDF5 file to read from.
just_meta : bool, optional
If True, read-in metadata but ignore data, wgts and integration
arrays. Default: False.
spws : list of tuple, optional
List of spectral window integers to select. Default: None (loads
all channels).
bls : list of i6 baseline integers or baseline tuples
Select all baseline-pairs whose first _or_ second baseline are in
the list. This changes if only_pairs_in_bls == True.
Example tuple: (2, 3). Default: None (loads all bl pairs).
blpairs : list of baseline-pair tuples or integers
List of baseline pairs to keep. If bls is also fed, this list is
concatenated onto the baseline-pair list constructed from from the
bls selection.
times : float ndarray
Times from the time_avg_array to keep.
pols : list of str or int
List of polarization strings or integers to keep. See
pyuvdata.utils.polstr2num for acceptable options.
only_pairs_in_bls : bool, optional
If True, keep only baseline-pairs whose first _and_ second baseline
are found in the 'bls' list. Default: False.
"""
# Open file descriptor and read data
with h5py.File(filepath, 'r') as f:
self.read_from_group(f, just_meta=just_meta, spws=spws, bls=bls,
only_pairs_in_bls=only_pairs_in_bls)
[docs] def write_to_group(self, group, run_check=True):
"""
Write UVPSpec data into an HDF5 group.
Parameters
----------
group : HDF5 group
The handle of the HDF5 group that the UVPSpec object should be
written into.
run_check : bool, optional
Whether to run a validity check on the UVPSpec object before
writing it to the HDF5 group. Default: True.
"""
# Run check
if run_check: self.check()
# Check whether the group already contains info
# TODO
# Write meta data
for k in self._meta_attrs:
if hasattr(self, k):
# handle cosmo
if k == 'cosmo':
group.attrs['cosmo'] = str(getattr(self, k).get_params())
continue
group.attrs[k] = getattr(self, k)
for k in self._meta_dsets:
if hasattr(self, k):
group.create_dataset(k, data=getattr(self, k))
# Iterate over spectral windows and create datasets
for i in np.unique(self.spw_array):
group.create_dataset("data_spw{}".format(i),
data=self.data_array[i],
dtype=np.complex)
group.create_dataset("wgt_spw{}".format(i),
data=self.wgt_array[i],
dtype=np.float)
group.create_dataset("integration_spw{}".format(i),
data=self.integration_array[i],
dtype=np.float)
group.create_dataset("nsample_spw{}".format(i),
data=self.nsample_array[i],
dtype=np.float)
[docs] def write_hdf5(self, filepath, overwrite=False, run_check=True):
"""
Write a UVPSpec object to HDF5 file.
Parameters
----------
filepath : str
Filepath for output file.
overwrite : bool, optional
Whether to overwrite output file if it exists. Default: False.
run_check : bool, optional
Run UVPSpec validity check before writing to file. Default: True.
"""
# Check output
if os.path.exists(filepath) and overwrite is False:
raise IOError("{} exists, not overwriting...".format(filepath))
elif os.path.exists(filepath) and overwrite is True:
print "{} exists, overwriting...".format(filepath)
os.remove(filepath)
# Write file
with h5py.File(filepath, 'w') as f:
self.write_to_group(f, run_check=run_check)
[docs] def set_cosmology(self, new_cosmo, overwrite=False, new_beam=None,
verbose=True):
"""
Set the cosmology for this UVPSpec object by passing an instance of
conversions.Cosmo_Conversions and assigning it to self.cosmo, in
addition to re-computing power spectrum normalization quantities like
self.scalar_array. Because this will attempt to recompute the
scalar_array, the beam-related metadata (OmegaP, OmegaPP and
beam_freqs) must exist in self.
If they do not, or if you'd like to overwrite them with a new beam, you
can pass a UVBeam object or path to a beam file via the new_beam kwarg.
If self.cosmo already exists then you are attempting to overwrite the
currently adopted cosmology, which will only proceed if overwrite is
True.
If overwrite == True, then this module will overwrite self.cosmo. It
will also recompute the power spectrum normalization scalar (using
pspecbeam) and will overwrite the values in self.scalar_array. It will
then propagate these re-normalization changes into the data_array by
multiplying the data by (new_scalar_array / old_scalar_array).
Parameters
----------
new_cosmo : Cosmo_Conversions or dict
Cosmo_Conversions instance or cosmological parameter dictionary.
The new cosmology you want to adopt for this UVPSpec object.
overwrite : bool, optional
If True, overwrite self.cosmo if it already exists. Default: False.
new_beam : PSpecBeamUV or str
pspecbeam.PSpecBeamUV object or path to beam file. The new beam you
want to adopt for this UVPSpec object.
verbose : bool, optional
If True, report feedback to stdout. Default: True.
"""
if hasattr(self, 'cosmo') and overwrite == False:
print("self.cosmo exists and overwrite == False, not overwriting...")
return
else:
if (not hasattr(self, 'OmegaP') \
or not hasattr(self, 'OmegaPP') \
or not hasattr(self, 'beam_freqs')) \
and new_beam is None:
print "In order to set the cosmology, self.OmegaP, " \
"self.OmegaPP and self.beam_freqs must exist, or you " \
"need to pass a PSpecBeamUV object or a path to a beam " \
"file as new_beam."
return
# overwrite beam quantities
if new_beam is not None:
if verbose: print "Updating beam data with {}".format(new_beam)
if isinstance(new_beam, (str, np.str)):
# PSpecBeamUV will adopt a default cosmology upon instantiation,
# but this doesn't matterfor what we need from it
new_beam = pspecbeam.PSpecBeamUV(new_beam)
self.OmegaP, self.OmegaPP = new_beam.get_Omegas(self.pol_array)
self.beam_freqs = new_beam.beam_freqs
# Update cosmo
if isinstance(new_cosmo, (dict, odict)):
new_cosmo = conversions.Cosmo_Conversions(**new_cosmo)
if verbose: print("setting cosmology: \n{}".format(new_cosmo))
self.cosmo = new_cosmo
# Update scalar_array
if verbose:
print("Updating scalar array and re-normalizing power spectra")
for spw in range(self.Nspws):
for j, pol in enumerate(self.pol_array):
scalar = self.compute_scalar(spw, pol, num_steps=1000,
little_h=True, noise_scalar=False)
# renormalize power spectra with new scalar
self.data_array[spw][:, :, j] *= scalar / self.scalar_array[spw, j]
# update self.scalar_array element
self.scalar_array[spw, j] = scalar
# Update self.units if pspectra were not originally in cosmological units
if "Mpc" not in self.norm_units:
self.norm_units = "h^-3 Mpc^3"
[docs] def check(self, just_meta=False):
"""
Run checks to make sure metadata and data arrays are properly defined
and in the right format. Raises AssertionErrors if checks fail.
Parameters
----------
just_meta : bool, optional
If True, only check metadata. Default: False.
"""
# check required parameters exist
if just_meta:
req_metas = sorted(set(self._req_params).intersection(set(self._meta)))
for p in req_metas:
assert hasattr(self, p), "required parameter {} hasn't been defined".format(p)
else:
for p in self._req_params:
assert hasattr(self, p), "required parameter {} hasn't been defined".format(p)
# check data
assert isinstance(self.data_array, (dict, odict)), "self.data_array must be a dictionary type"
assert np.min(map(lambda k: self.data_array[k].dtype in (np.complex, complex, np.complex128), self.data_array.keys())), "self.data_array values must be complex type"
# check wgts
assert isinstance(self.wgt_array, (dict, odict)), "self.wgt_array must be a dictionary type"
assert np.min(map(lambda k: self.wgt_array[k].dtype in (np.float, float), self.wgt_array.keys())), "self.wgt_array values must be float type"
# check integration
assert isinstance(self.integration_array, (dict, odict)), "self.integration_array must be a dictionary type"
assert np.min(map(lambda k: self.integration_array[k].dtype in (np.float, float, np.float64), self.integration_array.keys())), "self.integration_array values must be float type"
# check nsample
assert isinstance(self.nsample_array, (dict, odict)), "self.nsample_array must be a dictionary type"
assert np.min(map(lambda k: self.nsample_array[k].dtype in (np.float, float, np.float64), self.nsample_array.keys())), "self.nsample_array values must be float type"
def _clear(self):
"""
Clear UVPSpec of all parameters. Warning: this cannot be undone.
"""
for p in self._all_params:
if hasattr(self, p):
delattr(self, p)
def __str__(self):
"""
Output useful info about UVPSpec object.
"""
s = ""
s += " ATTRIBUTES\n"
s += "-"*12 + "\n"
for k in self._meta_attrs:
if hasattr(self, k) and 'history' not in k:
y = getattr(self, k)
if isinstance(y, np.ndarray):
s += "%18s: shape %s\n" % (k, y.shape)
else:
s += "%18s: %s\n" % (k, y)
s += "\n DATASETS\n"
s += "-"*12 + "\n"
for k in self._meta_dsets:
if hasattr(self, k):
s += "%18s: shape %s\n" % (k, getattr(self, k).shape)
return s
def __eq__(self, other):
""" Check equivalence between attributes of two UVPSpec objects """
try:
for p in self._all_params:
if p not in self._req_params \
and (not hasattr(self, p) and not hasattr(other, p)):
continue
if p in self._immutables:
assert getattr(self, p) == getattr(other, p)
elif p in self._ndarrays:
assert np.isclose(getattr(self, p), getattr(other, p)).all()
elif p in self._dicts:
for i in getattr(self, p):
assert np.isclose(getattr(self, p)[i], \
getattr(other, p)[i]).all()
except AssertionError:
return False
return True
@property
def units(self):
"""Return power spectrum units. See self.vis_units and self.norm_units."""
return "({})^2 {}".format(self.vis_units, self.norm_units)
[docs] def generate_noise_spectra(self, spw, pol, Tsys, blpairs=None, little_h=True,
form='Pk', num_steps=2000, real=True):
"""
Generate the expected 1-sigma noise power spectrum given a selection of
spectral window, system temp., and polarization. This estimate is
constructed as:
P_N = scalar * (Tsys * 1e3)^2 / (integration_time) / sqrt(Nincoherent)
[mK^2 h^-3 Mpc^3]
where scalar is the cosmological and beam scalar (i.e. X2Y * Omega_eff)
calculated from pspecbeam with noise_scalar = True, integration_time is
in seconds and comes from self.integration_array and Nincoherent is the
number of incoherent averaging samples and comes from
self.nsample_array.
If the polarization specified is a pseudo Stokes pol (I, Q, U or V)
then an extra factor of 2 is divided.
If form == 'DelSq' then a factor of k^3 / (2pi^2) is multiplied.
If real is True, a factor of sqrt(2) is divided to account for
discarding imaginary noise component.
For more details, see hera_pspec.noise.Sensitivity.calc_P_N, and
Cheng et al. 2018.
The default units of P_N are mK^2 h^-3 Mpc^3 (if little_h=True and
form='Pk'), but is different if those kwargs are altered.
Parameters
----------
spw : int
Spectral window index to generate noise curve for.
pol : str or int
Polarization selection in form of str (e.g. 'I' or 'Q' or 'xx') or
int (e.g. -5 or -6).
Tsys : float
System temperature in Kelvin.
blpairs : list
List of unique blair tuples or i12 integers to calculate noise
spectrum for default is to calculate for baseline pairs.
little_h : bool, optional
Whether to have cosmological length units be h^-1 Mpc or Mpc
Default: h^-1 Mpc
form : str, optional
Form of power spectra, whetehr P(k) or Delta^2(k). Valid options
are 'Pk' or 'DelSq'. Default: 'Pk'.
num_steps : int, optional
Number of frequency bins to use in integrating power spectrum
scalar in pspecbeam. Default: 2000.
real : bool, optional
If True assumes the real component of complex power spectrum is
used, and will divide P_N by an extra sqrt(2). Otherwise, assume
power spectra are complex and keep P_N as is. Default: True.
Returns
-------
P_N : dict
Dictionary containing blpair integers as keys and float ndarrays
of noise power spectra as values, with ndarrays having shape
(Ntimes, Ndlys).
"""
# Assert polarization type
if isinstance(pol, (np.int, int)):
pol = uvutils.polnum2str(pol)
# Get polarization index
pol_ind = self.pol_to_indices(pol)
# Get frequency band
freqs = self.freq_array[self.spw_to_indices(spw)]
# Calculate scalar
scalar = self.compute_scalar(spw, pol, num_steps=num_steps,
little_h=little_h, noise_scalar=True)
# Get k vectors
if form == 'DelSq':
k_perp = self.get_kperps(spw, little_h=little_h)
k_para = self.get_kparas(spw, little_h=little_h)
k_mag = np.sqrt(k_perp[:, None]**2 + k_para[None, :]**2)
# get blpairs
if blpairs is None:
blpairs = np.unique(self.blpair_array)
elif isinstance(blpairs[0], tuple):
blpairs = map(lambda blp: self.antnums_to_blpair(blp), blpairs)
# Get delays
dlys = self.get_dlys(spw)
# Iterate over blpairs to get P_N
P_N = odict()
for i, blp in enumerate(blpairs):
# get indices
inds = self.blpair_to_indices(blp)
P_blp = []
# iterate over time axis
for j, ind in enumerate(inds):
# get integration time and n_samp
t_int = self.integration_array[spw][ind, pol_ind]
n_samp = self.nsample_array[spw][ind, pol_ind]
# get kvecs
if form == 'DelSq':
k = k_mag[ind]
else:
k = None
# Get noise power spectrum
pn = noise.calc_P_N(scalar, Tsys, t_int, k=k,
Nincoherent=n_samp, form=form)
# Put into appropriate form
if form == 'Pk':
pn = np.ones(len(dlys), np.float) * pn
# If pseudo stokes pol (as opposed to linear or circular pol),
# divide by extra factor of 2
if isinstance(pol, (np.str, str)):
pol = uvutils.polstr2num(pol)
if pol in (1, 2, 3, 4): pn /= 2.0 # pseudo stokes pol
if real: pn /= np.sqrt(2) # if real divide by sqrt(2)
# append to P_blp
P_blp.append(pn)
P_N[blp] = np.array(P_blp)
return P_N
[docs] def average_spectra(self, blpair_groups=None, time_avg=False,
blpair_weights=None, inplace=True):
"""
Average power spectra across the baseline-pair-time axis, weighted by
each spectrum's integration time.
This is an "incoherent" average, in the sense that this averages power
spectra, rather than visibility data. The 'nsample_array' and
'integration_array' will be updated to reflect the averaging.
In the case of averaging across baseline pairs, the resultant averaged
spectrum is assigned to the zeroth blpair in the group. In the case of
time averaging, the time and LST arrays are assigned to the mean of the
averaging window.
Note that this is designed to be separate from spherical binning in k:
here we are not connecting k_perp modes to k_para modes. However, if
blpairs holds groups of iso baseline separation, then this is
equivalent to cylindrical binning in 3D k-space.
If you want help constructing baseline-pair groups from baseline
groups, see self.get_blpair_groups_from_bl_groups.
Parameters
----------
blpair_groups : list of baseline-pair groups
List of list of tuples or integers. All power spectra in a
baseline-pair group are averaged together. If a baseline-pair
exists in more than one group, a warning is raised. Examples::
blpair_groups = [ [((1, 2), (1, 2)), ((2, 3), (2, 3))],
[((4, 6), (4, 6))]]
blpair_groups = [ [1002001002, 2003002003], [4006004006] ]
time_avg : bool, optional
If True, average power spectra across the time axis. Default: False.
blpair_weights : list of weights (float or int), optional
Relative weight of each baseline-pair when performing the average.
This is useful for bootstrapping. This should have the same shape
as blpair_groups if specified. The weights are automatically
normalized within each baseline-pair group. Default: None (all
baseline pairs have unity weights).
inplace : bool, optional
If True, edit data in self, else make a copy and return. Default:
True.
Notes
-----
Currently, every baseline-pair in a blpair group must have the same
Ntimes, unless time_avg=True. Future versions may support
baseline-pair averaging of heterogeneous time arrays. This includes
the scenario of repeated blpairs (e.g. in bootstrapping), which will
return multiple copies of their time_array.
"""
if inplace:
grouping.average_spectra(self, blpair_groups=blpair_groups,
time_avg=time_avg,
blpair_weights=blpair_weights,
inplace=True)
else:
return grouping.average_spectra(self, blpair_groups=blpair_groups,
time_avg=time_avg,
blpair_weights=blpair_weights,
inplace=False)
[docs] def fold_spectra(self):
"""
Average bandpowers from matching positive and negative delay bins onto a
purely positive delay axis. Negative delay bins are still populated, but
are filled with zero power. This is an in-place operation.
Will only work if self.folded == False, i.e. data is currently unfolded
across negative and positive delay. Because this averages the data, the
nsample array is multiplied by a factor of 2.
WARNING: This operation cannot be undone.
"""
grouping.fold_spectra(self)
[docs] def get_blpair_groups_from_bl_groups(self, blgroups, only_pairs_in_bls=False):
"""
Get baseline pair matches from a list of baseline groups.
Parameters
----------
blgroups : list
List of baseline groups, which themselves are lists of baseline
tuples or baseline i6 integers.
Ex: [ [(1, 2), (2, 3), (3, 4)], [(1, 4), (2, 5)] ]
only_pairs_in_bls : bool, optional
If True, select only baseline-pairs whose first _and_ second
baseline are both found in each baseline group. Default: False.
Returns
-------
blpair_groups : list
List of blpair groups, which themselves are lists of blpair
integers.
"""
blpair_groups = []
for blg in blgroups:
blp_select = uvputils._get_blpairs_from_bls(self, blg, only_pairs_in_bls=only_pairs_in_bls)
blp = sorted(set(self.blpair_array[blp_select]))
if len(blp) > 0:
blpair_groups.append(blp)
return blpair_groups
[docs] def compute_scalar(self, spw, pol, num_steps=1000, little_h=True,
noise_scalar=False):
"""
Compute power spectrum normalization scalar given an adopted cosmology
and a beam model. See pspecbeam.PSpecBeamBase.compute_pspec_scalar for
details.
Parameters
----------
spw : integer
Spectral window selection.
pol : str or int
Which polarization to calculate the scalar for.
num_steps : int, optional
Number of integration bins along frequency in computing scalar.
Default: 1000.
little_h : bool, optional
Whether to use h^-1 units or not. Default: True.
noise_scalar : bool, optional
If True calculate noise pspec scalar, else calculate normal pspec
scalar. See pspecbeam.py for difference between normal scalar and
noise scalar. Default: False.
Returns
-------
scalar : float
Power spectrum normalization scalar.
"""
# make assertions
assert hasattr(self, 'cosmo'), "self.cosmo object must exist to compute scalar. See self.set_cosmology()"
assert hasattr(self, 'OmegaP') and hasattr(self, "OmegaPP") and hasattr(self, "beam_freqs"), "self.OmegaP, "\
"self.OmegaPP and self.beam_freqs must exist to compute scalar."
# get freq array of selected spw
spw_freqs = self.freq_array[self.spw_to_indices(spw)]
# compute scalar
OP = self.OmegaP[:, self.pol_to_indices(pol)].squeeze()
OPP = self.OmegaPP[:, self.pol_to_indices(pol)].squeeze()
scalar = pspecbeam._compute_pspec_scalar(self.cosmo, self.beam_freqs, OPP / OP**2, spw_freqs,
num_steps=num_steps, taper=self.taper, little_h=little_h,
noise_scalar=noise_scalar)
return scalar