Source code for hera_pspec.uvpspec

import numpy as np
from collections import OrderedDict as odict
import os, copy, shutil, operator, ast, fnmatch
from hera_pspec import conversions, noise, version, pspecbeam, grouping
from hera_pspec import uvpspec_utils as uvputils
from hera_pspec.parameter import PSpecParam
from pyuvdata import uvutils as uvutils
import h5py


[docs]class UVPSpec(object): """ An object for storing power spectra generated by hera_pspec and a file-format for its data and meta-data. """
[docs] def __init__(self): """ An object for storing power spectra and associated metadata generated by hera_pspec. """ # Summary attributes self._Ntimes = PSpecParam("Ntimes", description="Number of unique times.", expected_type=int) self._Nblpairts = PSpecParam("Nblpairts", description="Total number of baseline-pair times.", expected_type=int) self._Nblpairs = PSpecParam("Nblpairs", description='Total number of baseline-pairs.', expected_type=int) self._Nspwdlys = PSpecParam("Nspwdlys", description="Total number of delay bins across all sub-bands.", expected_type=int) self._Nspws = PSpecParam("Nspws", description="Number of spectral windows.", expected_type=int) self._Ndlys = PSpecParam("Ndlys", description="Total number of delay bins.", expected_type=int) self._Nfreqs = PSpecParam("Nfreqs", description="Total number of frequency bins in the original data.", expected_type=int) self._Npols = PSpecParam("Npols", description="Number of polarizations in the data.", expected_type=int) self._history = PSpecParam("history", description='The file history.', expected_type=str) # Data attributes desc = "Power spectrum data dictionary with spw integer as keys and values as complex ndarrays." self._data_array = PSpecParam("data_array", description=desc, expected_type=dict, form="(Nblpairts, Ndlys, Npols)") desc = "Weight dictionary for original two datasets. The second axis holds [dset1_wgts, dset2_wgts] in that order." self._wgt_array = PSpecParam("wgt_array", description=desc, expected_type=dict, form="(Nblpairts, Nfreqs, 2, Npols)") desc = "Integration time dictionary. This holds the average integration time [seconds] of each delay spectrum in the data. " \ "This is not necessarily equal to the integration time of the visibility data: If data have been coherently averaged " \ "(i.e. averaged before squaring), than this is the sum of each spectrum's integration time." self._integration_array = PSpecParam("integration_array", description=desc, expected_type=dict, form="(Nblpairts, Npols)") desc = "Nsample dictionary, if the pspectra have been incoherently averaged (i.e. averaged after squaring), this is " \ "the effective number of samples in that average (float type). This is not the same as the pyuvdata.UVData nsample_array." self._nsample_array = PSpecParam("nsample_array", description=desc, expected_type=dict, form="(Nblpairts, Npols)") self._spw_array = PSpecParam("spw_array", description="Spw integer array.", form="(Nspwdlys,)") self._freq_array = PSpecParam("freq_array", description="Frequency array of the original data in Hz.", form="(Nspwdlys,)") self._dly_array = PSpecParam("dly_array", description="Delay array in seconds.", form="(Nspwdlys,)") desc = "Polarization integers of power spectra. Stokes 1:4 (I,Q,U,V); circular -1:-4 (RR,LL,RL,LR); linear -5:-8 (XX,YY,XY,YX)" self._pol_array = PSpecParam("pol_array", description=desc, form="(Npols,)") self._lst_1_array = PSpecParam("lst_1_array", description="LST array of the first bl in the bl-pair [radians].", form="(Nblpairts,)") self._lst_2_array = PSpecParam("lst_2_array", description="LST array of the second bl in the bl-pair [radians].", form="(Nblpairts,)") self._lst_avg_array = PSpecParam("lst_avg_array", description="Average of the lst_1_array and lst_2_array [radians].", form="(Nblpairts,)") self._time_1_array = PSpecParam("time_1_array", description="Time array of the first bl in the bl-pair [Julian Date].", form="(Nblpairts,)") self._time_1_array = PSpecParam("time_2_array", description="Time array of the second bl in the bl-pair [Julian Date].", form="(Nblpairts,)") self._time_avg_array = PSpecParam("time_avg_array", description="Average of the time_1_array and time_2_array [Julian Date].", form='(Nblpairts,)') self._blpair_array = PSpecParam("blpair_array", description="Baseline-pair integer for all baseline-pair times.", form="(Nblpairts,)") # Baseline attributes self._Nbls = PSpecParam("Nbls", description="Number of unique baseline integers.", expected_type=int) self._bl_vecs = PSpecParam("bl_vecs", description="ndarray of baseline separation vectors in the ITRF frame [meters]. To get it in ENU frame see self.get_ENU_bl_vecs().", expected_type=np.ndarray, form="(Nbls,)") self._bl_array = PSpecParam("bl_array", description="All unique baseline (antenna-pair) integers.", expected_type=np.ndarray, form="(Nbls,)") # Misc Attributes self._channel_width = PSpecParam("channel_width", description="width of visibility frequency channels in Hz.", expected_type=float) self._telescope_location = PSpecParam("telescope_location", description="telescope location in ECEF frame [meters]. To get it in Lat/Lon/Alt see pyuvdata.utils.LatLonAlt_from_XYZ().", expected_type=np.ndarray) self._weighting = PSpecParam("weighting", description="Form of data weighting used when forming power spectra.", expected_type=str) self._norm = PSpecParam("norm", description="Normalization method adopted in OQE (M matrix).", expected_type=str) self._taper = PSpecParam("taper", description='Taper function applied to visibility data before FT."', expected_type=str) self._vis_units = PSpecParam("vis_units", description="Units of the original visibility data used to form the power spectra.", expected_type=str) self._norm_units = PSpecParam("norm_units", description="Power spectra normalization units, i.e. telescope units [Hz str] or cosmological [(h^-3) Mpc^3].", expected_type=str) self._filename1 = PSpecParam("filename1", description="filename of data from first dataset", expected_type=str) self._filename2 = PSpecParam("filename2", description="filename of data from second dataset", expected_type=str) self._label1 = PSpecParam("label1", description="label of data from first dataset", expected_type=str) self._label2 = PSpecParam("label2", description="label of data from second dataset", expected_type=str) self._git_hash = PSpecParam("git_hash", description="GIT hash of hera_pspec when pspec was generated.", expected_type=str) self._folded = PSpecParam("folded", description="if power spectra are folded (i.e. averaged) onto purely positive delay axis. Default is False", expected_type=bool) self._scalar_array = PSpecParam("scalar_array", description="Power spectrum normalization scalar from pspecbeam module.", expected_type=np.ndarray, form="(Nspws, Npols)") self._beamfile = PSpecParam("beamfile", description="filename of beam-model used to normalized pspectra.", expected_type=str) self._OmegaP = PSpecParam("OmegaP", description="Integral of unitless beam power over the sky [steradians].", form="(Nbeam_freqs, Npols)") self._OmegaPP = PSpecParam("OmegaP", description="Integral of unitless beam power squared over the sky [steradians].", form="(Nbeam_freqs, Npols)") self._beam_freqs = PSpecParam("beam_freqs", description="Frequency bins of the OmegaP and OmegaPP beam-integral arrays [Hz].", form="(Nbeam_freqs,)") self._cosmo = PSpecParam("cosmo", description="Instance of conversion.Cosmo_Conversions class.") # collect all parameters: required and non-required self._all_params = sorted(map(lambda p: p[1:], fnmatch.filter(self.__dict__.keys(), '_*'))) # specify required params: these are required for read / write and self.check() self._req_params = ["Ntimes", "Nblpairts", "Nblpairs", "Nspwdlys", "Nspws", "Ndlys", "Npols", "Nfreqs", "history", "data_array", "wgt_array", "integration_array", "spw_array", "freq_array", "dly_array", "pol_array", "lst_1_array", "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "Nbls", "bl_vecs", "bl_array", "channel_width", "telescope_location", "weighting", "vis_units", "norm_units", "taper", "norm", "git_hash", "nsample_array", 'lst_avg_array', 'time_avg_array', 'folded', "scalar_array"] # all parameters must fall into one and only one of the following groups, which are used in __eq__ self._immutables = ["Ntimes", "Nblpairts", "Nblpairs", "Nspwdlys", "Nspws", "Ndlys", "Npols", "Nfreqs", "history", "Nbls", "channel_width", "weighting", "vis_units", "filename1", "filename2", "label1", "label2", "norm", "norm_units", "taper", "git_hash", "cosmo", "beamfile" ,'folded'] self._ndarrays = ["spw_array", "freq_array", "dly_array", "pol_array", "lst_1_array", 'lst_avg_array', 'time_avg_array', "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "OmegaP", "OmegaPP", "beam_freqs", "bl_vecs", "bl_array", "telescope_location", "scalar_array"] self._dicts = ["data_array", "wgt_array", "integration_array", "nsample_array"] # define which attributes are considred meta data. Large attrs should be constructed as datasets self._meta_dsets = ["lst_1_array", "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "bl_vecs", "bl_array", 'lst_avg_array', 'time_avg_array', 'OmegaP', 'OmegaPP'] self._meta_attrs = sorted(set(self._all_params) - set(self._dicts) - set(self._meta_dsets)) self._meta = sorted(set(self._meta_dsets).union(set(self._meta_attrs))) # check all params are covered assert len(set(self._all_params) - set(self._dicts) - set(self._immutables) - set(self._ndarrays)) == 0 # Default parameter values self.folded = False self.git_hash = version.git_hash
[docs] def get_data(self, key, *args): """ Slice into data_array with a specified data key in the format (spw, ((ant1, ant2), (ant3, ant4)), pol) or (spw, blpair-integer, pol) where spw is the spectral window integer, ant1 etc. are integers, and pol is either a polarization string (ex. 'XX') or integer (ex. -5). Parameters ---------- key : tuple Baseline-pair key Returns ------- data : complex ndarray Shape (Ntimes, Ndlys) """ spw, blpairts, pol = self.key_to_indices(key, *args) # if data has been folded, return only positive delays if self.folded: Ndlys = self.data_array[spw].shape[1] return self.data_array[spw][blpairts, Ndlys//2+1:, pol] # else return all delays else: return self.data_array[spw][blpairts, :, pol]
[docs] def get_wgts(self, key, *args): """ Slice into wgt_array with a specified data key in the format (spw, ((ant1, ant2), (ant3, ant4)), pol) or (spw, blpair-integer, pol) where spw is the spectral window integer, ant1 etc. are integers, and pol is either a polarization string (ex. 'XX') or integer (ex. -5). Parameters ---------- key : tuple, baseline-pair key Returns ------- wgts : float ndarray Has shape (2, Ntimes, Ndlys), where the zeroth axis holds [wgt_1, wgt_2] in that order. """ spw, blpairts, pol = self.key_to_indices(key, *args) return self.wgt_array[spw][blpairts, :, :, pol]
[docs] def get_integrations(self, key, *args): """ Slice into integration_array with a specified data key in the format:: (spw, ((ant1, ant2), (ant3, ant4)), pol) or:: (spw, blpair-integer, pol) where spw is the spectral window integer, ant1 etc. are integers, and pol is either a polarization string (ex. 'XX') or integer (ex. -5). Parameters ---------- key : tuple Baseline-pair key Returns ------- data : float ndarray Has shape (Ntimes,) """ spw, blpairts, pol = self.key_to_indices(key, *args) return self.integration_array[spw][blpairts, pol]
[docs] def get_nsamples(self, key, *args): """ Slice into nsample_array with a specified data key in the format (spw, ((ant1, ant2), (ant3, ant4)), pol) or (spw, blpair-integer, pol) where spw is the spectral window integer, ant1 etc. are integers, and pol is either a polarization string (ex. 'XX') or integer (ex. -5). Parameters ---------- key : tuple, baseline-pair key Returns ------- data : float ndarray with shape (Ntimes,) """ spw, blpairts, pol = self.key_to_indices(key, *args) return self.nsample_array[spw][blpairts, pol]
[docs] def get_dlys(self, key): """ Get array of delays given a spectral window selection. Parameters ---------- key : int, or tuple with integer Spectral window selection Returns ------- dlys : float ndarray, contains delays in nanosec of pspectra given spw """ indices = self.spw_to_indices(key) return self.dly_array[indices]
[docs] def get_blpair_seps(self): """ For each baseline-pair, get the average baseline separation in ENU frame in meters. Returns ------- blp_avg_sep : float ndarray shape=(Nblpairts,) """ # get list of bl separations bl_vecs = self.get_ENU_bl_vecs() bls = self.bl_array.tolist() blseps = np.array(map(lambda bl: np.linalg.norm(bl_vecs[bls.index(bl)]), self.bl_array)) # construct empty blp_avg_sep array blp_avg_sep = np.empty(self.Nblpairts, np.float) # construct blpair_bls blpairs = np.unique(self.blpair_array) bl1 = np.floor(blpairs / 1e6) blpair_bls = np.vstack([bl1, blpairs - bl1*1e6]).astype(np.int).T # iterate over blpairs for blp, bl in zip(blpairs, blpair_bls): avg_sep = np.mean([blseps[bls.index(bl[0])], blseps[bls.index(bl[1])]]) inds = self.blpair_to_indices(blp) blp_avg_sep[inds] = avg_sep return blp_avg_sep
[docs] def get_kperps(self, spw, little_h=True): """ Get transverse (perpendicular) cosmological wavevector for each baseline-pair given an adopted cosmology and a spw selection. Parameters ---------- spw : int, choice of spectral window little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc Returns ------- k_perp : float ndarray, transverse wave-vectors, shape=(Nblpairs,) """ # assert cosmo assert hasattr(self, 'cosmo'), "self.cosmo must exist to form cosmological " \ "wave-vectors. See self.set_cosmology()" # calculate mean redshift of band avg_z = self.cosmo.f2z(np.mean(self.freq_array[self.spw_to_indices(spw)])) # get kperps blpair_seps = self.get_blpair_seps() k_perp = blpair_seps * self.cosmo.bl_to_kperp(avg_z, little_h=little_h) return k_perp
[docs] def get_kparas(self, spw, little_h=True): """ Get radial (parallel) cosmological wavevectors for power spectra given an adopted cosmology and a spw selection. Parameters ---------- spw : int, choice of spectral window little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc Returns (k_perp, k_para) ------- k_para : float ndarray, radial wave-vectors, shape=(Ndlys given spw) """ # assert cosmo assert hasattr(self, 'cosmo'), "self.cosmo must exist to form cosmological " \ "wave-vectors. See self.set_cosmology()" # calculate mean redshift of band avg_z = self.cosmo.f2z(np.mean(self.freq_array[self.spw_to_indices(spw)])) # get kparas k_para = self.get_dlys(spw) * self.cosmo.tau_to_kpara(avg_z, little_h=little_h) return k_para
[docs] def convert_to_deltasq(self, little_h=True, inplace=True): """ Convert from P(k) to Delta^2(k) by multiplying by k^3 / (2pi^2). The units of the output is therefore the current units (self.units) times h^3 Mpc^-3, where the h^3 is only included if little_h == True. Parameters ---------- inplace : boolean If True edit and overwrite arrays in self, else make a copy of self and return. """ # copy object if inplace: uvp = self else: uvp = copy.deepcopy(self) # loop over spectral windows for spw in range(uvp.Nspws): # get k vectors k_perp = uvp.get_kperps(spw, little_h=little_h) k_para = uvp.get_kparas(spw, little_h=little_h) k_mag = np.sqrt(k_perp[:, None, None]**2 + k_para[None, :, None]**2) # multiply into data uvp.data_array[spw] *= k_mag**3 / (2*np.pi**2) # edit units uvp.norm_units = "k^3 / (2pi^2)" if inplace == False: return uvp
[docs] def blpair_to_antnums(self, blpair): """ Convert baseline-pair integer to nested tuple of antenna numbers. Parameters ---------- blpair : i12 int baseline-pair integer ID Returns ------- antnums : tuple Nested tuple containing baseline-pair antenna numbers. Ex. ((ant1, ant2), (ant3, ant4)) """ return uvputils._blpair_to_antnums(blpair)
[docs] def antnums_to_blpair(self, antnums): """ Convert nested tuple of antenna numbers to baseline-pair integer. Parameters ---------- antnums : tuple nested tuple containing integer antenna numbers for a baseline-pair. Ex. ((ant1, ant2), (ant3, ant4)) Returns ------- blpair : i12 integer baseline-pair integer """ return uvputils._antnums_to_blpair(antnums)
[docs] def bl_to_antnums(self, bl): """ Convert baseline (antenna-pair) integer to nested tuple of antenna numbers. Parameters ---------- bl : i6 int baseline integer ID Returns ------- antnums : tuple tuple containing baseline antenna numbers. Ex. (ant1, ant2) """ return uvputils._bl_to_antnums(bl)
[docs] def antnums_to_bl(self, antnums): """ Convert tuple of antenna numbers to baseline integer. Parameters ---------- antnums : tuple Tuple containing integer antenna numbers for a baseline. Ex. (ant1, ant2) Returns ------- bl : i6 integer Baseline integer. """ return uvputils._antnums_to_bl(antnums)
[docs] def blpair_to_indices(self, blpair): """ Convert a baseline-pair nested tuple ((ant1, ant2), (ant3, ant4)) or a baseline-pair integer into indices to index the blpairts axis of data_array. Parameters ---------- blpair : tuples or ints Nested tuple or blpair i12 integer, Ex. ((1, 2), (3, 4)), or list of blpairs. """ # convert blpair to integer if fed as tuple if isinstance(blpair, tuple): blpair = [self.antnums_to_blpair(blpair)] elif isinstance(blpair, (np.int, int)): blpair = [blpair] elif isinstance(blpair, list): if isinstance(blpair[0], tuple): blpair = map(lambda blp: self.antnums_to_blpair(blp), blpair) # assert exists in data assert np.array(map(lambda b: b in self.blpair_array, blpair)).all(), "blpairs {} not all found in data".format(blpair) return np.arange(self.Nblpairts)[reduce(operator.add, map(lambda b: self.blpair_array == b, blpair))]
[docs] def spw_to_indices(self, spw): """ Convert a spectral window integer into a list of indices to index into the spwdlys axis of dly_array and/or freq_array. If self.folded == False, return indices for both positive and negative delay bins, else if self.folded == True, returns indices for only positive delay bins. Parameters ---------- spw : int Spectral window index or list of indices. """ # convert to list if int if isinstance(spw, (np.int, int)): spw = [spw] # assert exists in data assert np.array(map(lambda s: s in self.spw_array, spw)).all(), "spws {} not all found in data".format(spw) # get select boolean array select = reduce(operator.add, map(lambda s: self.spw_array == s, spw)) if self.folded: select[self.dly_array < 1e-10] = False return np.arange(self.Nspwdlys)[select]
[docs] def pol_to_indices(self, pol): """ Map a polarization integer or str to its index in pol_array Parameters ---------- pol : str or int Polarization string (ex. 'XX') or integer (ex. -5), or a list of strs or ints. Returns ------- indices : int Index of pol in pol_array. """ # convert pol to int if str if isinstance(pol, (str, np.str)): pol = [uvutils.polstr2num(pol)] elif isinstance(pol, (int, np.int)): pol = [pol] elif isinstance(pol, (list, tuple)): for i in range(len(pol)): if isinstance(pol[i], (np.str, str)): pol[i] = uvutils.polstr2num(pol[i]) # ensure all pols exist in data assert np.array(map(lambda p: p in self.pol_array, pol)).all(), "pols {} not all found in data".format(pol) indices = np.arange(self.Npols)[reduce(operator.add, map(lambda p: self.pol_array == p, pol))] return indices
[docs] def time_to_indices(self, time, blpairs=None): """ Convert a time [Julian Date] from self.time_avg_array to an array that indexes the elements at which it appears in that array. Can optionally take a blpair selection to further select the indices at which both that time and blpair are satisfied. Parameters ---------- time : float Julian Date time from self.time_avg_array, Ex: 2458042.12242 blpairs : tuple or int, optional List of blpair tuples or integers that further selects the elements at which both time and blpairs are satisfied. Returns ------- indices : integer ndarray Contains indices at which selection is satisfied along the blpairts axis. """ time_select = np.isclose(self.time_avg_array, time, rtol=1e-10) if blpairs is None: return np.arange(self.Nblpairts)[time_select] else: blp_select = np.zeros(self.Nblpairts, np.bool) if isinstance(blpairs, (tuple, int, np.int)): blpairs = [blpairs] for blp in blpairs: blp_select[self.blpair_to_indices(blp)] = True time_select *= blp_select return np.arange(self.Nblpairts)[time_select]
[docs] def key_to_indices(self, key, *args): """ Convert a data key into relevant slice arrays. A data key takes the form (spw_integer, ((ant1, ant2), (ant3, ant4)), pol_string) or (spw, blpair-integer, pol) where spw is the spectral window integer, ant1 etc. are integers, and pol is either a polarization string (ex. 'XX') or integer (ex. -5). One can also expand this key into the kwarg slots, such that key=spw, key2=blpair, and key3=pol. The key can also be a dictionary in the form:: key = { 'spw' : spw_integer, 'blpair' : ((ant1, ant2), (ant3, ant4)) 'pol' : pol_string } and it will parse the dictionary for you. Parameters ---------- key : tuple Baseline-pair key. Returns ------- spw : int Spectral window index. blpairts : list List of integers to apply along blpairts axis. pol : int Polarization index. """ # assert key length if len(args) == 0: assert len(key) == 3, "length of key must be 3." if isinstance(key, (odict, dict)): key = (key['spw'], key['blpair'], key['pol']) elif len(args) > 0: assert len(args) == 2, "length of key must be 3." assert isinstance(args[0], (tuple, int, np.int)) and isinstance(args[1], (np.str, str, int, np.int)), "key must be ordered as (spw, blpair, pol)" key = (key, args[0], args[1]) # assign key elements spw = key[0] blpair = key[1] pol = key[2] # assert types assert isinstance(spw, (int, np.int)), "spw must be an integer" assert isinstance(blpair, (int, np.int, tuple)), "blpair must be an integer or nested tuple" assert isinstance(pol, (np.str, str, np.int, int)), "pol must be a string or integer" # convert blpair to int if not int if type(blpair) == tuple: blpair = self.antnums_to_blpair(blpair) # convert pol to int if str if type(pol) in (str, np.str): pol = uvutils.polstr2num(pol) # check attributes exist in data assert spw in self.spw_array, "spw {} not found in data".format(spw) assert blpair in self.blpair_array, "blpair {} not found in data".format(blpair) assert pol in self.pol_array, "pol {} not found in data".format(pol) # index polarization array pol = self.pol_to_indices(pol) # index blpairts blpairts = self.blpair_to_indices(blpair) return spw, blpairts, pol
[docs] def select(self, spws=None, bls=None, only_pairs_in_bls=True, blpairs=None, times=None, pols=None, inplace=True): """ Select function for selecting out certain slices of the data. Parameters ---------- spws : list, optional List of spectral window integers to select. If None, all will be selected. Default: None. bls : list of i6 baseline integers or baseline tuples, optional Example: (2,3). Select all baseline-pairs whose first _or_ second baseline are in bls list. This changes if only_pairs_in_bls == True. If None, all baselines are selected (subject to other options). Default: None. only_pairs_in_bls : bool, optional If True, keep only baseline-pairs whose first _and_ second baseline are found in bls list. Default: True. blpairs : list of baseline-pair tuples or integers, optional List of baseline-pairs to keep. If bls is also fed, this list is concatenated onto the baseline-pair list constructed from the bls selection. If None, all valid baseline pairs are selected. Default: None. times : array_like, optional Float ndarray of times from the time_avg_array to select. If None, all times are kept. Default: None. pols : list of str or int, optional List of polarizations to keep. See pyuvdata.utils.polstr2num for acceptable options. If None, all polarizations are kept. Default: None. inplace : bool, optional If True, edit and overwrite arrays in self, else make a copy of self and return. Default: True. Returns ------- uvp : UVPSpec, optional If inplace=False, return a new UVPSpec object containing only the selected data. """ if inplace: uvp = self else: uvp = copy.deepcopy(self) uvputils._select(uvp, spws=spws, bls=bls, only_pairs_in_bls=only_pairs_in_bls, blpairs=blpairs, times=times, pols=pols) if inplace == False: return uvp
[docs] def get_ENU_bl_vecs(self): """ Return baseline vector array in TOPO (ENU) frame in meters, with matched ordering of self.bl_vecs. Returns ------- blvecs : array_like Array of baseline vectors. """ return uvutils.ENU_from_ECEF( (self.bl_vecs + self.telescope_location).T, \ *uvutils.LatLonAlt_from_XYZ(self.telescope_location)).T
[docs] def read_from_group(self, grp, just_meta=False, spws=None, bls=None, blpairs=None, times=None, pols=None, only_pairs_in_bls=False): """ Clear current UVPSpec object and load in data from specified HDF5 group. Parameters ---------- grp : HDF5 group HDF5 group to load data from. just_meta : bool, optional If True, read-in metadata but ignore data, wgts and integration arrays. Default: False. spws : list of tuple, optional List of spectral window integers to select. Default: None (loads all channels). bls : list of i6 baseline integers or baseline tuples Select all baseline-pairs whose first _or_ second baseline are in the list. This changes if only_pairs_in_bls == True. Example tuple: (2, 3). Default: None (loads all bl pairs). blpairs : list of baseline-pair tuples or integers List of baseline pairs to keep. If bls is also fed, this list is concatenated onto the baseline-pair list constructed from from the bls selection. times : float ndarray Times from the time_avg_array to keep. pols : list of str or int List of polarization strings or integers to keep. See pyuvdata.utils.polstr2num for acceptable options. only_pairs_in_bls : bool, optional If True, keep only baseline-pairs whose first _and_ second baseline are found in the 'bls' list. Default: False. """ # Clear all data in the current object self._clear() # Load-in meta data for k in grp.attrs: if k in self._meta_attrs: setattr(self, k, grp.attrs[k]) for k in grp: if k in self._meta_dsets: setattr(self, k, grp[k][:]) # Use _select() to pick out only the requested baselines/spws if just_meta: uvputils._select(self, spws=spws, bls=bls, only_pairs_in_bls=only_pairs_in_bls, blpairs=blpairs, times=times, pols=pols) else: uvputils._select(self, spws=spws, bls=bls, only_pairs_in_bls=only_pairs_in_bls, blpairs=blpairs, times=times, pols=pols, h5file=grp) # handle cosmo if hasattr(self, 'cosmo'): self.cosmo = conversions.Cosmo_Conversions(**ast.literal_eval(self.cosmo)) self.check(just_meta=just_meta)
[docs] def read_hdf5(self, filepath, just_meta=False, spws=None, bls=None, blpairs=None, times=None, pols=None, only_pairs_in_bls=False): """ Clear current UVPSpec object and load in data from an HDF5 file. Parameters ---------- filepath : str Path to HDF5 file to read from. just_meta : bool, optional If True, read-in metadata but ignore data, wgts and integration arrays. Default: False. spws : list of tuple, optional List of spectral window integers to select. Default: None (loads all channels). bls : list of i6 baseline integers or baseline tuples Select all baseline-pairs whose first _or_ second baseline are in the list. This changes if only_pairs_in_bls == True. Example tuple: (2, 3). Default: None (loads all bl pairs). blpairs : list of baseline-pair tuples or integers List of baseline pairs to keep. If bls is also fed, this list is concatenated onto the baseline-pair list constructed from from the bls selection. times : float ndarray Times from the time_avg_array to keep. pols : list of str or int List of polarization strings or integers to keep. See pyuvdata.utils.polstr2num for acceptable options. only_pairs_in_bls : bool, optional If True, keep only baseline-pairs whose first _and_ second baseline are found in the 'bls' list. Default: False. """ # Open file descriptor and read data with h5py.File(filepath, 'r') as f: self.read_from_group(f, just_meta=just_meta, spws=spws, bls=bls, only_pairs_in_bls=only_pairs_in_bls)
[docs] def write_to_group(self, group, run_check=True): """ Write UVPSpec data into an HDF5 group. Parameters ---------- group : HDF5 group The handle of the HDF5 group that the UVPSpec object should be written into. run_check : bool, optional Whether to run a validity check on the UVPSpec object before writing it to the HDF5 group. Default: True. """ # Run check if run_check: self.check() # Check whether the group already contains info # TODO # Write meta data for k in self._meta_attrs: if hasattr(self, k): # handle cosmo if k == 'cosmo': group.attrs['cosmo'] = str(getattr(self, k).get_params()) continue group.attrs[k] = getattr(self, k) for k in self._meta_dsets: if hasattr(self, k): group.create_dataset(k, data=getattr(self, k)) # Iterate over spectral windows and create datasets for i in np.unique(self.spw_array): group.create_dataset("data_spw{}".format(i), data=self.data_array[i], dtype=np.complex) group.create_dataset("wgt_spw{}".format(i), data=self.wgt_array[i], dtype=np.float) group.create_dataset("integration_spw{}".format(i), data=self.integration_array[i], dtype=np.float) group.create_dataset("nsample_spw{}".format(i), data=self.nsample_array[i], dtype=np.float)
[docs] def write_hdf5(self, filepath, overwrite=False, run_check=True): """ Write a UVPSpec object to HDF5 file. Parameters ---------- filepath : str Filepath for output file. overwrite : bool, optional Whether to overwrite output file if it exists. Default: False. run_check : bool, optional Run UVPSpec validity check before writing to file. Default: True. """ # Check output if os.path.exists(filepath) and overwrite is False: raise IOError("{} exists, not overwriting...".format(filepath)) elif os.path.exists(filepath) and overwrite is True: print "{} exists, overwriting...".format(filepath) os.remove(filepath) # Write file with h5py.File(filepath, 'w') as f: self.write_to_group(f, run_check=run_check)
[docs] def set_cosmology(self, new_cosmo, overwrite=False, new_beam=None, verbose=True): """ Set the cosmology for this UVPSpec object by passing an instance of conversions.Cosmo_Conversions and assigning it to self.cosmo, in addition to re-computing power spectrum normalization quantities like self.scalar_array. Because this will attempt to recompute the scalar_array, the beam-related metadata (OmegaP, OmegaPP and beam_freqs) must exist in self. If they do not, or if you'd like to overwrite them with a new beam, you can pass a UVBeam object or path to a beam file via the new_beam kwarg. If self.cosmo already exists then you are attempting to overwrite the currently adopted cosmology, which will only proceed if overwrite is True. If overwrite == True, then this module will overwrite self.cosmo. It will also recompute the power spectrum normalization scalar (using pspecbeam) and will overwrite the values in self.scalar_array. It will then propagate these re-normalization changes into the data_array by multiplying the data by (new_scalar_array / old_scalar_array). Parameters ---------- new_cosmo : Cosmo_Conversions or dict Cosmo_Conversions instance or cosmological parameter dictionary. The new cosmology you want to adopt for this UVPSpec object. overwrite : bool, optional If True, overwrite self.cosmo if it already exists. Default: False. new_beam : PSpecBeamUV or str pspecbeam.PSpecBeamUV object or path to beam file. The new beam you want to adopt for this UVPSpec object. verbose : bool, optional If True, report feedback to stdout. Default: True. """ if hasattr(self, 'cosmo') and overwrite == False: print("self.cosmo exists and overwrite == False, not overwriting...") return else: if (not hasattr(self, 'OmegaP') \ or not hasattr(self, 'OmegaPP') \ or not hasattr(self, 'beam_freqs')) \ and new_beam is None: print "In order to set the cosmology, self.OmegaP, " \ "self.OmegaPP and self.beam_freqs must exist, or you " \ "need to pass a PSpecBeamUV object or a path to a beam " \ "file as new_beam." return # overwrite beam quantities if new_beam is not None: if verbose: print "Updating beam data with {}".format(new_beam) if isinstance(new_beam, (str, np.str)): # PSpecBeamUV will adopt a default cosmology upon instantiation, # but this doesn't matterfor what we need from it new_beam = pspecbeam.PSpecBeamUV(new_beam) self.OmegaP, self.OmegaPP = new_beam.get_Omegas(self.pol_array) self.beam_freqs = new_beam.beam_freqs # Update cosmo if isinstance(new_cosmo, (dict, odict)): new_cosmo = conversions.Cosmo_Conversions(**new_cosmo) if verbose: print("setting cosmology: \n{}".format(new_cosmo)) self.cosmo = new_cosmo # Update scalar_array if verbose: print("Updating scalar array and re-normalizing power spectra") for spw in range(self.Nspws): for j, pol in enumerate(self.pol_array): scalar = self.compute_scalar(spw, pol, num_steps=1000, little_h=True, noise_scalar=False) # renormalize power spectra with new scalar self.data_array[spw][:, :, j] *= scalar / self.scalar_array[spw, j] # update self.scalar_array element self.scalar_array[spw, j] = scalar # Update self.units if pspectra were not originally in cosmological units if "Mpc" not in self.norm_units: self.norm_units = "h^-3 Mpc^3"
[docs] def check(self, just_meta=False): """ Run checks to make sure metadata and data arrays are properly defined and in the right format. Raises AssertionErrors if checks fail. Parameters ---------- just_meta : bool, optional If True, only check metadata. Default: False. """ # check required parameters exist if just_meta: req_metas = sorted(set(self._req_params).intersection(set(self._meta))) for p in req_metas: assert hasattr(self, p), "required parameter {} hasn't been defined".format(p) else: for p in self._req_params: assert hasattr(self, p), "required parameter {} hasn't been defined".format(p) # check data assert isinstance(self.data_array, (dict, odict)), "self.data_array must be a dictionary type" assert np.min(map(lambda k: self.data_array[k].dtype in (np.complex, complex, np.complex128), self.data_array.keys())), "self.data_array values must be complex type" # check wgts assert isinstance(self.wgt_array, (dict, odict)), "self.wgt_array must be a dictionary type" assert np.min(map(lambda k: self.wgt_array[k].dtype in (np.float, float), self.wgt_array.keys())), "self.wgt_array values must be float type" # check integration assert isinstance(self.integration_array, (dict, odict)), "self.integration_array must be a dictionary type" assert np.min(map(lambda k: self.integration_array[k].dtype in (np.float, float, np.float64), self.integration_array.keys())), "self.integration_array values must be float type" # check nsample assert isinstance(self.nsample_array, (dict, odict)), "self.nsample_array must be a dictionary type" assert np.min(map(lambda k: self.nsample_array[k].dtype in (np.float, float, np.float64), self.nsample_array.keys())), "self.nsample_array values must be float type"
def _clear(self): """ Clear UVPSpec of all parameters. Warning: this cannot be undone. """ for p in self._all_params: if hasattr(self, p): delattr(self, p) def __str__(self): """ Output useful info about UVPSpec object. """ s = "" s += " ATTRIBUTES\n" s += "-"*12 + "\n" for k in self._meta_attrs: if hasattr(self, k) and 'history' not in k: y = getattr(self, k) if isinstance(y, np.ndarray): s += "%18s: shape %s\n" % (k, y.shape) else: s += "%18s: %s\n" % (k, y) s += "\n DATASETS\n" s += "-"*12 + "\n" for k in self._meta_dsets: if hasattr(self, k): s += "%18s: shape %s\n" % (k, getattr(self, k).shape) return s def __eq__(self, other): """ Check equivalence between attributes of two UVPSpec objects """ try: for p in self._all_params: if p not in self._req_params \ and (not hasattr(self, p) and not hasattr(other, p)): continue if p in self._immutables: assert getattr(self, p) == getattr(other, p) elif p in self._ndarrays: assert np.isclose(getattr(self, p), getattr(other, p)).all() elif p in self._dicts: for i in getattr(self, p): assert np.isclose(getattr(self, p)[i], \ getattr(other, p)[i]).all() except AssertionError: return False return True @property def units(self): """Return power spectrum units. See self.vis_units and self.norm_units.""" return "({})^2 {}".format(self.vis_units, self.norm_units)
[docs] def generate_noise_spectra(self, spw, pol, Tsys, blpairs=None, little_h=True, form='Pk', num_steps=2000, real=True): """ Generate the expected 1-sigma noise power spectrum given a selection of spectral window, system temp., and polarization. This estimate is constructed as: P_N = scalar * (Tsys * 1e3)^2 / (integration_time) / sqrt(Nincoherent) [mK^2 h^-3 Mpc^3] where scalar is the cosmological and beam scalar (i.e. X2Y * Omega_eff) calculated from pspecbeam with noise_scalar = True, integration_time is in seconds and comes from self.integration_array and Nincoherent is the number of incoherent averaging samples and comes from self.nsample_array. If the polarization specified is a pseudo Stokes pol (I, Q, U or V) then an extra factor of 2 is divided. If form == 'DelSq' then a factor of k^3 / (2pi^2) is multiplied. If real is True, a factor of sqrt(2) is divided to account for discarding imaginary noise component. For more details, see hera_pspec.noise.Sensitivity.calc_P_N, and Cheng et al. 2018. The default units of P_N are mK^2 h^-3 Mpc^3 (if little_h=True and form='Pk'), but is different if those kwargs are altered. Parameters ---------- spw : int Spectral window index to generate noise curve for. pol : str or int Polarization selection in form of str (e.g. 'I' or 'Q' or 'xx') or int (e.g. -5 or -6). Tsys : float System temperature in Kelvin. blpairs : list List of unique blair tuples or i12 integers to calculate noise spectrum for default is to calculate for baseline pairs. little_h : bool, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc form : str, optional Form of power spectra, whetehr P(k) or Delta^2(k). Valid options are 'Pk' or 'DelSq'. Default: 'Pk'. num_steps : int, optional Number of frequency bins to use in integrating power spectrum scalar in pspecbeam. Default: 2000. real : bool, optional If True assumes the real component of complex power spectrum is used, and will divide P_N by an extra sqrt(2). Otherwise, assume power spectra are complex and keep P_N as is. Default: True. Returns ------- P_N : dict Dictionary containing blpair integers as keys and float ndarrays of noise power spectra as values, with ndarrays having shape (Ntimes, Ndlys). """ # Assert polarization type if isinstance(pol, (np.int, int)): pol = uvutils.polnum2str(pol) # Get polarization index pol_ind = self.pol_to_indices(pol) # Get frequency band freqs = self.freq_array[self.spw_to_indices(spw)] # Calculate scalar scalar = self.compute_scalar(spw, pol, num_steps=num_steps, little_h=little_h, noise_scalar=True) # Get k vectors if form == 'DelSq': k_perp = self.get_kperps(spw, little_h=little_h) k_para = self.get_kparas(spw, little_h=little_h) k_mag = np.sqrt(k_perp[:, None]**2 + k_para[None, :]**2) # get blpairs if blpairs is None: blpairs = np.unique(self.blpair_array) elif isinstance(blpairs[0], tuple): blpairs = map(lambda blp: self.antnums_to_blpair(blp), blpairs) # Get delays dlys = self.get_dlys(spw) # Iterate over blpairs to get P_N P_N = odict() for i, blp in enumerate(blpairs): # get indices inds = self.blpair_to_indices(blp) P_blp = [] # iterate over time axis for j, ind in enumerate(inds): # get integration time and n_samp t_int = self.integration_array[spw][ind, pol_ind] n_samp = self.nsample_array[spw][ind, pol_ind] # get kvecs if form == 'DelSq': k = k_mag[ind] else: k = None # Get noise power spectrum pn = noise.calc_P_N(scalar, Tsys, t_int, k=k, Nincoherent=n_samp, form=form) # Put into appropriate form if form == 'Pk': pn = np.ones(len(dlys), np.float) * pn # If pseudo stokes pol (as opposed to linear or circular pol), # divide by extra factor of 2 if isinstance(pol, (np.str, str)): pol = uvutils.polstr2num(pol) if pol in (1, 2, 3, 4): pn /= 2.0 # pseudo stokes pol if real: pn /= np.sqrt(2) # if real divide by sqrt(2) # append to P_blp P_blp.append(pn) P_N[blp] = np.array(P_blp) return P_N
[docs] def average_spectra(self, blpair_groups=None, time_avg=False, blpair_weights=None, inplace=True): """ Average power spectra across the baseline-pair-time axis, weighted by each spectrum's integration time. This is an "incoherent" average, in the sense that this averages power spectra, rather than visibility data. The 'nsample_array' and 'integration_array' will be updated to reflect the averaging. In the case of averaging across baseline pairs, the resultant averaged spectrum is assigned to the zeroth blpair in the group. In the case of time averaging, the time and LST arrays are assigned to the mean of the averaging window. Note that this is designed to be separate from spherical binning in k: here we are not connecting k_perp modes to k_para modes. However, if blpairs holds groups of iso baseline separation, then this is equivalent to cylindrical binning in 3D k-space. If you want help constructing baseline-pair groups from baseline groups, see self.get_blpair_groups_from_bl_groups. Parameters ---------- blpair_groups : list of baseline-pair groups List of list of tuples or integers. All power spectra in a baseline-pair group are averaged together. If a baseline-pair exists in more than one group, a warning is raised. Examples:: blpair_groups = [ [((1, 2), (1, 2)), ((2, 3), (2, 3))], [((4, 6), (4, 6))]] blpair_groups = [ [1002001002, 2003002003], [4006004006] ] time_avg : bool, optional If True, average power spectra across the time axis. Default: False. blpair_weights : list of weights (float or int), optional Relative weight of each baseline-pair when performing the average. This is useful for bootstrapping. This should have the same shape as blpair_groups if specified. The weights are automatically normalized within each baseline-pair group. Default: None (all baseline pairs have unity weights). inplace : bool, optional If True, edit data in self, else make a copy and return. Default: True. Notes ----- Currently, every baseline-pair in a blpair group must have the same Ntimes, unless time_avg=True. Future versions may support baseline-pair averaging of heterogeneous time arrays. This includes the scenario of repeated blpairs (e.g. in bootstrapping), which will return multiple copies of their time_array. """ if inplace: grouping.average_spectra(self, blpair_groups=blpair_groups, time_avg=time_avg, blpair_weights=blpair_weights, inplace=True) else: return grouping.average_spectra(self, blpair_groups=blpair_groups, time_avg=time_avg, blpair_weights=blpair_weights, inplace=False)
[docs] def fold_spectra(self): """ Average bandpowers from matching positive and negative delay bins onto a purely positive delay axis. Negative delay bins are still populated, but are filled with zero power. This is an in-place operation. Will only work if self.folded == False, i.e. data is currently unfolded across negative and positive delay. Because this averages the data, the nsample array is multiplied by a factor of 2. WARNING: This operation cannot be undone. """ grouping.fold_spectra(self)
[docs] def get_blpair_groups_from_bl_groups(self, blgroups, only_pairs_in_bls=False): """ Get baseline pair matches from a list of baseline groups. Parameters ---------- blgroups : list List of baseline groups, which themselves are lists of baseline tuples or baseline i6 integers. Ex: [ [(1, 2), (2, 3), (3, 4)], [(1, 4), (2, 5)] ] only_pairs_in_bls : bool, optional If True, select only baseline-pairs whose first _and_ second baseline are both found in each baseline group. Default: False. Returns ------- blpair_groups : list List of blpair groups, which themselves are lists of blpair integers. """ blpair_groups = [] for blg in blgroups: blp_select = uvputils._get_blpairs_from_bls(self, blg, only_pairs_in_bls=only_pairs_in_bls) blp = sorted(set(self.blpair_array[blp_select])) if len(blp) > 0: blpair_groups.append(blp) return blpair_groups
[docs] def compute_scalar(self, spw, pol, num_steps=1000, little_h=True, noise_scalar=False): """ Compute power spectrum normalization scalar given an adopted cosmology and a beam model. See pspecbeam.PSpecBeamBase.compute_pspec_scalar for details. Parameters ---------- spw : integer Spectral window selection. pol : str or int Which polarization to calculate the scalar for. num_steps : int, optional Number of integration bins along frequency in computing scalar. Default: 1000. little_h : bool, optional Whether to use h^-1 units or not. Default: True. noise_scalar : bool, optional If True calculate noise pspec scalar, else calculate normal pspec scalar. See pspecbeam.py for difference between normal scalar and noise scalar. Default: False. Returns ------- scalar : float Power spectrum normalization scalar. """ # make assertions assert hasattr(self, 'cosmo'), "self.cosmo object must exist to compute scalar. See self.set_cosmology()" assert hasattr(self, 'OmegaP') and hasattr(self, "OmegaPP") and hasattr(self, "beam_freqs"), "self.OmegaP, "\ "self.OmegaPP and self.beam_freqs must exist to compute scalar." # get freq array of selected spw spw_freqs = self.freq_array[self.spw_to_indices(spw)] # compute scalar OP = self.OmegaP[:, self.pol_to_indices(pol)].squeeze() OPP = self.OmegaPP[:, self.pol_to_indices(pol)].squeeze() scalar = pspecbeam._compute_pspec_scalar(self.cosmo, self.beam_freqs, OPP / OP**2, spw_freqs, num_steps=num_steps, taper=self.taper, little_h=little_h, noise_scalar=noise_scalar) return scalar