Source code for hera_pspec.pspecdata

import numpy as np
import aipy
import pyuvdata
import copy, operator, itertools
from collections import OrderedDict as odict
import hera_cal as hc
from hera_pspec import uvpspec, utils, version
from hera_pspec import uvpspec_utils as uvputils
from pyuvdata import utils as uvutils


[docs]class PSpecData(object):
[docs] def __init__(self, dsets=[], wgts=[], labels=None, beam=None): """ Object to store multiple sets of UVData visibilities and perform operations such as power spectrum estimation on them. Parameters ---------- dsets : list or dict of UVData objects, optional Set of UVData objects containing the data that will be used to compute the power spectrum. If specified as a dict, the key names will be used to tag each dataset. Default: Empty list. wgts : list or dict of UVData objects, optional Set of UVData objects containing weights for the input data. Default: Empty list. labels : list of str, optional An ordered list of names/labels for each dataset, if dsets was specified as a list. If None, names will not be assigned to the datasets. If dsets was specified as a dict, the keys of that dict will be used instead of this. Default: None. beam : PspecBeam object, optional PspecBeam object containing information about the primary beam Default: None. """ self.clear_cov_cache() # Covariance matrix cache self.dsets = []; self.wgts = []; self.labels = [] self.Nfreqs = None self.spw_range = None self.spw_Nfreqs = None # Set R to identity by default self.R = self.I # Store the input UVData objects if specified if len(dsets) > 0: self.add(dsets, wgts, labels=labels) # Store a primary beam self.primary_beam = beam
[docs] def add(self, dsets, wgts, labels=None): """ Add a dataset to the collection in this PSpecData object. Parameters ---------- dsets : UVData or list or dict UVData object or list of UVData objects containing data to add to the collection. wgts : UVData or list or dict UVData object or list of UVData objects containing weights to add to the collection. Must be the same length as dsets. If a weight is set to None, the flags of the corresponding labels : list of str An ordered list of names/labels for each dataset, if dsets was specified as a list. If dsets was specified as a dict, the keys of that dict will be used instead. """ # Check for dicts and unpack into an ordered list if found if isinstance(dsets, dict): # Disallow labels kwarg if a dict was passed if labels is not None: raise ValueError("If 'dsets' is a dict, 'labels' cannot be " "specified.") if not isinstance(wgts, dict): raise TypeError("If 'dsets' is a dict, 'wgts' must also be " "a dict") # Unpack dsets and wgts dicts labels = dsets.keys() _dsets = [dsets[key] for key in labels] _wgts = [wgts[key] for key in labels] dsets = _dsets wgts = _wgts # Convert input args to lists if possible if isinstance(dsets, pyuvdata.UVData): dsets = [dsets,] if isinstance(wgts, pyuvdata.UVData): wgts = [wgts,] if isinstance(labels, str): labels = [labels,] if wgts is None: wgts = [wgts,] if isinstance(dsets, tuple): dsets = list(dsets) if isinstance(wgts, tuple): wgts = list(wgts) # Only allow UVData or lists if not isinstance(dsets, list) or not isinstance(wgts, list): raise TypeError("dsets and wgts must be UVData or lists of UVData") # Make sure enough weights were specified assert(len(dsets) == len(wgts)) if labels is not None: assert(len(dsets) == len(labels)) # Check that everything is a UVData object for d, w in zip(dsets, wgts): if not isinstance(d, pyuvdata.UVData): raise TypeError("Only UVData objects can be used as datasets.") if not isinstance(w, pyuvdata.UVData) and w is not None: raise TypeError("Only UVData objects (or None) can be used as " "weights.") # Append to list self.dsets += dsets self.wgts += wgts # Store labels (if they were set) if labels is None: self.labels = [None for d in dsets] else: self.labels += labels # Store no. frequencies and no. times self.Nfreqs = self.dsets[0].Nfreqs self.Ntimes = self.dsets[0].Ntimes # Store the actual frequencies self.freqs = self.dsets[0].freq_array[0] self.spw_range = (0, self.Nfreqs) self.spw_Nfreqs = self.Nfreqs
def __str__(self): """ Print basic info about this PSpecData object. """ # Basic info s = "PSpecData object\n" s += " %d datasets" % len(self.dsets) if len(self.dsets) == 0: return s # Dataset summary for i, d in enumerate(self.dsets): if self.labels[i] is None: s += " dset (%d): %d bls (freqs=%d, times=%d, pols=%d)\n" \ % (i, d.Nbls, d.Nfreqs, d.Ntimes, d.Npols) else: s += " dset '%s' (%d): %d bls (freqs=%d, times=%d, pols=%d)\n" \ % (self.labels[i], i, d.Nbls, d.Nfreqs, d.Ntimes, d.Npols) return s
[docs] def validate_datasets(self, verbose=True): """ Validate stored datasets and weights to make sure they are consistent with one another (e.g. have the same shape, baselines etc.). """ # check dsets and wgts have same number of elements if len(self.dsets) != len(self.wgts): raise ValueError("self.wgts does not have same length as self.dsets") # Check if dsets are all the same shape along freq axis Nfreqs = [d.Nfreqs for d in self.dsets] if np.unique(Nfreqs).size > 1: raise ValueError("all dsets must have the same Nfreqs") # Check shape along time axis Ntimes = [d.Ntimes for d in self.dsets] if np.unique(Ntimes).size > 1: raise ValueError("all dsets must have the same Ntimes") # raise warnings if times don't match lst_diffs = np.array(map(lambda dset: np.unique(self.dsets[0].lst_array) - np.unique(dset.lst_array), self.dsets[1:])) if np.max(np.abs(lst_diffs)) > 0.001: raise_warning("Warning: taking power spectra between LST bins misaligned by more than 15 seconds", verbose=verbose) # raise warning if frequencies don't match freq_diffs = np.array(map(lambda dset: np.unique(self.dsets[0].freq_array) - np.unique(dset.freq_array), self.dsets[1:])) if np.max(np.abs(freq_diffs)) > 0.001e6: raise_warning("Warning: taking power spectra between frequency bins misaligned by more than 0.001 MHz", verbose=verbose) # Check for the same polarizations pols = [] for d in self.dsets: pols.extend(d.polarization_array) if np.unique(pols).size > 1: raise ValueError("all dsets must have the same number and kind of polarizations: \n{}".format(pols)) # Check phase type phase_types = [] for d in self.dsets: phase_types.append(d.phase_type) if np.unique(phase_types).size > 1: raise ValueError("all datasets must have the same phase type (i.e. 'drift', 'phased', ...)\ncurrent phase types are {}".format(phase_types)) # Check phase centers if phase type is phased if 'phased' in set(phase_types): phase_ra = map(lambda d: d.phase_center_ra_degrees, self.dsets) phase_dec = map(lambda d: d.phase_center_dec_degrees, self.dsets) max_diff_ra = np.max(map(lambda d: np.diff(d), itertools.combinations(phase_ra, 2))) max_diff_dec = np.max(map(lambda d: np.diff(d), itertools.combinations(phase_dec, 2))) max_diff = np.sqrt(max_diff_ra**2 + max_diff_dec**2) if max_diff > 0.15: raise_warning("Warning: maximum phase-center difference between datasets is > 10 arcmin", verbose=verbose)
[docs] def check_key_in_dset(self, key, dset_ind): """ Check 'key' exists in the UVData object self.dsets[dset_ind] Parameters ---------- key : tuple if length 1: assumed to be polarization number or string elif length 2: assumed to be antenna-number tuple (ant1, ant2) elif length 3: assumed ot be antenna-number-polarization tuple (ant1, ant2, pol) dset_ind : int, the index of the dataset to-be-checked Returns ------- exists : bool True if the key exists, False otherwise """ #FIXME: Fix this to enable label keys # get iterable key = pyuvdata.utils.get_iterable(key) if isinstance(key, str): key = (key,) # check key is a tuple if isinstance(key, tuple) == False or len(key) not in (1, 2, 3): raise KeyError("key {} must be a length 1, 2 or 3 tuple".format(key)) try: _ = self.dsets[dset_ind]._key2inds(key) return True except KeyError: return False
[docs] def clear_cov_cache(self, keys=None): """ Clear stored covariance data (or some subset of it). Parameters ---------- keys : list of tuples, optional List of keys to remove from covariance matrix cache. If None, all keys will be removed. Default: None. """ if keys is None: self._C, self._Cempirical, self._I, self._iC = {}, {}, {}, {} self._iCt = {} else: for k in keys: try: del(self._C[k]) except(KeyError): pass try: del(self._Cempirical[k]) except(KeyError): pass try: del(self._I[k]) except(KeyError): pass try: del(self._iC[k]) except(KeyError): pass
[docs] def dset_idx(self, dset): """ Return the index of a dataset, regardless of whether it was specified as an integer of a string. Parameters ---------- dset : int or str Index or name of a dataset belonging to this PSpecData object. Returns ------- dset_idx : int Index of dataset. """ # Look up dset label if it's a string if isinstance(dset, str): if dset in self.labels: return self.labels.index(dset) else: raise KeyError("dset '%s' not found." % dset) elif isinstance(dset, int): return dset else: raise TypeError("dset must be either an int or string")
[docs] def blkey(self, dset, bl=None, pol=None): """ Return a key specifying a particular dataset, baseline, and (optionally) polarization, in the tuple format used by other methods of PSpecData. Parameters ---------- dset : int or str Index or name of a dataset belonging to this PSpecData object. bl : tuple, optional Baseline ID, specified as a tuple of antenna pairs, e.g. (10, 11). Default: None. pol : str, optional Polarization of the visibility, in linear (e.g. 'xx') or Stokes (e.g. 'I') notation, whatever is supported by the input UVData objects. Default: None (polarization will not be included). Returns ------- key : tuple Tuple containing dataset ID, baseline index (if specified), and polarization (if specified). """ key = () # Look up dset label if it's a string dset_idx = self.dset_idx(dset) key += (dset_idx,) # Add the baseline tuple if it was specified if bl is None: return key key += (bl,) # Polarization if pol is not None: key += (pol,) return key
[docs] def x(self, key): """ Get data for a given dataset and baseline, as specified in a standard key format. Parameters ---------- key : tuple Tuple containing dataset ID and baseline index. The first element of the tuple is the dataset index (or label), and the subsequent elements are the baseline ID. Returns ------- x : array_like Array of data from the requested UVData dataset and baseline. """ assert isinstance(key, tuple) dset, bl = self.blkey(dset=key[0], bl=key[1:]) spwmin, spwmax = self.spw_range[0], self.spw_range[1] return self.dsets[dset].get_data(bl).T[spwmin:spwmax, :]
[docs] def w(self, key): """ Get weights for a given dataset and baseline, as specified in a standard key format. Parameters ---------- key : tuple Tuple containing dataset ID and baseline index. The first element of the tuple is the dataset index, and the subsequent elements are the baseline ID. Returns ------- x : array_like Array of weights for the requested UVData dataset and baseline. """ assert isinstance(key, tuple) spwrange = self.spw_range dset, bl = self.blkey(dset=key[0], bl=key[1:]) if self.wgts[dset] is not None: return self.wgts[dset].get_data(bl).T[spwrange[0]:spwrange[1], :] else: # If weights were not specified, use the flags built in to the # UVData dataset object flags = self.dsets[dset].get_flags(bl).astype(float).T[spwrange[0]:spwrange[1], :] return 1. - flags # Flag=1 => weight=0
[docs] def C(self, key): """ Estimate covariance matrices from the data. Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. Returns ------- C : array_like (Weighted) empirical covariance of data for baseline 'bl'. """ assert isinstance(key, tuple) key = (self.dset_idx(key[0]),) + key[1:] # Sanitize dataset name # Set covariance if it's not in the cache if not self._C.has_key(key): self.set_C( {key : utils.cov(self.x(key), self.w(key))} ) self._Cempirical[key] = self._C[key] # Return cached covariance return self._C[key]
[docs] def set_C(self, cov): """ Set the cached covariance matrix to a set of user-provided values. Parameters ---------- cov : dict Dictionary containing new covariance values for given datasets and baselines. Keys of the dictionary are tuples, with the first item being the ID (index) of the dataset, and subsequent items being the baseline indices. """ self.clear_cov_cache(cov.keys()) for key in cov: self._C[key] = cov[key]
[docs] def C_empirical(self, key): """ Calculate empirical covariance from the data (with appropriate weighting). Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. Returns ------- C_empirical : array_like Empirical covariance for the specified key. """ assert isinstance(key, tuple) key = (self.dset_idx(key[0]),) + key[1:] # Sanitize dataset name # Check cache for empirical covariance if not self._Cempirical.has_key(key): self._Cempirical[key] = utils.cov(self.x(key), self.w(key)) return self._Cempirical[key]
[docs] def I(self, key): """ Return identity covariance matrix. Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. Returns ------- I : array_like Identity covariance matrix, dimension (Nfreqs, Nfreqs). """ assert isinstance(key, tuple) key = (self.dset_idx(key[0]),) + key[1:] # Sanitize dataset name if not self._I.has_key(key): self._I[key] = np.identity(self.spw_Nfreqs) return self._I[key]
[docs] def iC(self, key): """ Return the inverse covariance matrix, C^-1. Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. Returns ------- iC : array_like Inverse covariance matrix for specified dataset and baseline. """ assert isinstance(key, tuple) key = (self.dset_idx(key[0]),) + key[1:] # Sanitize dataset name # Calculate inverse covariance if not in cache if not self._iC.has_key(key): C = self.C(key) U,S,V = np.linalg.svd(C.conj()) # conj in advance of next step # FIXME: Not sure what these are supposed to do #if self.lmin is not None: S += self.lmin # ensure invertibility #if self.lmode is not None: S += S[self.lmode-1] # FIXME: Is series of dot products quicker? self.set_iC({key:np.einsum('ij,j,jk', V.T, 1./S, U.T)}) return self._iC[key]
[docs] def set_iC(self, d): """ Set the cached inverse covariance matrix for a given dataset and baseline to a specified value. For now, you should already have applied weights to this matrix. Parameters ---------- d : dict Dictionary containing data to insert into inverse covariance matrix cache. Keys are tuples, following the same format as the input to self.iC(). """ for k in d: self._iC[k] = d[k]
[docs] def set_R(self, R_matrix): """ Set the weighting matrix R for later use in q_hat. Parameters ---------- R_matrix : string or matrix If set to "identity", sets R = I If set to "iC", sets R = C^-1 Otherwise, accepts a user inputted dictionary """ if R_matrix == "identity": self.R = self.I elif R_matrix == "iC": self.R = self.iC else: self.R = R_matrix
[docs] def set_spw(self, spw_range): """ Set the spectral window range Parameters ---------- spw_range : tuple, contains start and end of spw in channel indices used to slice the frequency array """ assert isinstance(spw_range, tuple), \ "spw_range must be fed as a len-2 integer tuple" assert isinstance(spw_range[0], (int, np.int)), \ "spw_range must be fed as len-2 integer tuple" self.spw_range = spw_range self.spw_Nfreqs = spw_range[1] - spw_range[0]
[docs] def q_hat(self, key1, key2, use_fft=True, taper='none'): """ Construct an unnormalized bandpower, q_hat, from a given pair of visibility vectors. Returns the following quantity: \hat{q}_a = (1/2) conj(x_1) R_1 Q^alt_a R_2 x_2 Note that the R matrix need not be set to C^-1. This is something that is set by the user in the set_R method. This is related to Equation 13 of arXiv:1502.06016. However, notice that there is a Q^alt_a instead of Q_a. The latter is defined as Q_a \equiv dC/dp_a. Since this is the derivative of the covariance with respect to the power spectrum, it contains factors of the primary beam etc. Q^alt_a strips away all of this, leaving only the barebones job of taking a Fourier transform. See HERA memo #44 for details Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. use_fft : bool, optional Whether to use a fast FFT summation trick to construct q_hat, or a simpler brute-force matrix multiplication. The FFT method assumes a delta-fn bin in delay space. Default: True. taper : str, optional Tapering (window) function to apply to the data. Takes the same arguments as aipy.dsp.gen_window(). Default: 'none'. Returns ------- q_hat : array_like Unnormalized bandpowers """ Rx1, Rx2 = 0, 0 # Calculate R x_1 if isinstance(key1, list): for _key in key1: Rx1 += np.dot(self.R(_key), self.x(_key)) else: Rx1 = np.dot(self.R(key1), self.x(key1)) # Calculate R x_2 if isinstance(key2, list): for _key in key2: Rx2 += np.dot(self.R(_key), self.x(_key)) else: Rx2 = np.dot(self.R(key2), self.x(key2)) # Whether to use FFT or slow direct method if use_fft: if taper != 'none': tapering_fct = aipy.dsp.gen_window(self.spw_Nfreqs, taper) Rx1 *= tapering_fct[:, None] Rx2 *= tapering_fct[:, None] _Rx1 = np.fft.fft(Rx1.conj(), axis=0) _Rx2 = np.fft.fft(Rx2.conj(), axis=0) return 0.5 * np.conj( np.fft.fftshift(_Rx1, axes=0).conj() * np.fft.fftshift(_Rx2, axes=0) ) else: # get taper if provided if taper != 'none': tapering_fct = aipy.dsp.gen_window(self.spw_Nfreqs, taper) # Slow method, used to explicitly cross-check FFT code q = [] for i in xrange(self.spw_Nfreqs): Q = self.get_Q(i, self.spw_Nfreqs) RQR = np.einsum('ab,bc,cd', self.R(key1).T.conj(), Q, self.R(key2)) x1 = self.x(key1).conj() x2 = self.x(key2) if taper != 'none': x1 = x1 * tapering_fct[:, None] x2 = x2 * tapering_fct[:, None] qi = np.sum(x1*np.dot(RQR, x2), axis=0) q.append(qi) return 0.5 * np.array(q)
[docs] def get_G(self, key1, key2): """ Calculates G_ab = (1/2) Tr[R_1 Q_a R_2 Q_b], which is needed for normalizing the power spectrum (see HERA memo #44) Note that in the limit that R_1 = R_2 = C^-1, this reduces to the Fisher matrix F_ab = 1/2 Tr [C^-1 Q_a C^-1 Q_b] (arXiv:1502.06016, Eq. 17) Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. Returns ------- G : array_like, complex Fisher matrix, with dimensions (Nfreqs, Nfreqs). """ G = np.zeros((self.spw_Nfreqs, self.spw_Nfreqs), dtype=np.complex) R1 = self.R(key1) R2 = self.R(key2) iR1Q, iR2Q = {}, {} for ch in xrange(self.spw_Nfreqs): # this loop is nchan^3 Q = self.get_Q(ch, self.spw_Nfreqs) iR1Q[ch] = np.dot(R1, Q) # R_1 Q iR2Q[ch] = np.dot(R2, Q) # R_2 Q for i in xrange(self.spw_Nfreqs): # this loop goes as nchan^4 for j in xrange(self.spw_Nfreqs): # tr(R_2 Q_i R_1 Q_j) G[i,j] += np.einsum('ab,ba', iR1Q[i], iR2Q[j]) return G / 2.
[docs] def get_H(self, key1, key2, taper='none'): """ Calculates the response matrix H of the unnormalized band powers q to the true band powers p, i.e., <q_a> = \sum_b H_{ab} p_b This is given by H_ab = (1/2) Tr[R_1 Q_a^alt R_2 Q_b] (See HERA memo #44). As currently implemented, this approximates the primary beam as frequency independent. Under this approximation, the our H_ab is defined using the equation above *except* we have Q^tapered rather than Q_b, where \overline{Q}^{tapered,beta} = e^{i 2pi eta_beta (nu_i - nu_j)} gamma(nu_i) gamma(nu_j) where gamma is the tapering function. Again, see HERA memo #44 for details. Note that in the limit that R_1 = R_2 = C^-1 and Q_a is used instead of Q_a^alt, this reduces to the Fisher matrix F_ab = 1/2 Tr [C^-1 Q_a C^-1 Q_b] (arXiv:1502.06016, Eq. 17) Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. taper : str, optional Tapering (window) function to apply to the data. Takes the same arguments as aipy.dsp.gen_window(). Default: 'none'. Returns ------- H : array_like, complex Dimensions (Nfreqs, Nfreqs). """ H = np.zeros((self.spw_Nfreqs, self.spw_Nfreqs), dtype=np.complex) R1 = self.R(key1) R2 = self.R(key2) if taper != 'none': tapering_fct = aipy.dsp.gen_window(self.spw_Nfreqs, taper) tapering_matrix = np.diag(tapering_fct) iR1Q_alt, iR2Q = {}, {} for ch in xrange(self.spw_Nfreqs): # this loop is nchan^3 Q_alt = self.get_Q(ch, self.spw_Nfreqs) iR1Q_alt[ch] = np.dot(R1, Q_alt) # R_1 Q_alt if taper != 'none': Q_tapered = np.dot(tapering_matrix, np.dot(Q_alt, tapering_matrix)) iR2Q[ch] = np.dot(R2, Q_tapered) # R_2 Q else: iR2Q[ch] = np.dot(R2, Q_alt) # R_2 Q for i in xrange(self.spw_Nfreqs): # this loop goes as nchan^4 for j in xrange(self.spw_Nfreqs): # tr(R_2 Q_i R_1 Q_j) H[i,j] += np.einsum('ab,ba', iR1Q_alt[i], iR2Q[j]) return H / 2.
[docs] def get_V_gaussian(self, key1, key2): """ Calculates the bandpower covariance matrix, V_ab = tr(C E_a C E_b) FIXME: Must check factor of 2 with Wick's theorem for complex vectors, and also check expression for when x_1 != x_2. Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. Returns ------- V : array_like, complex Bandpower covariance matrix, with dimensions (Nfreqs, Nfreqs). """ raise NotImplementedError()
[docs] def get_MW(self, G, H, mode='I'): """ Construct the normalization matrix M and window function matrix W for the power spectrum estimator. These are defined through Eqs. 14-16 of arXiv:1502.06016: \hat{p} = M \hat{q} <\hat{p}> = W p W = M H, where p is the true band power and H is the response matrix (defined above in get_H) of unnormalized bandpowers to normed bandpowers. Several choices for M are supported: 'I': Set M to be diagonal (e.g. HERA Memo #44) These choices will be supported very soon: 'G^-1': Set M = G^-1, the (pseudo)inverse response matrix. 'G^-1/2': Set M = G^-1/2, the root-inverse response matrix (using SVD). 'L^-1': Set M = L^-1, Cholesky decomposition. As written, the window functions will not be correclty normalized; it needs to be adjusted by the pspec scalar for it to be approximately correctly normalized. If the beam is being provided, this will be done in the pspec function Parameters ---------- G : array_like Denominator matrix for the bandpowers, with dimensions (Nfreqs, Nfreqs). H : array_like Response matrix for the bandpowers, with dimensions (Nfreqs, Nfreqs). mode : str, optional Definition to use for M. Must be one of the options listed above. Default: 'I'. Returns ------- M : array_like Normalization matrix, M. (If G was passed in as a dict, a dict of array_like will be returned.) W : array_like Window function matrix, W. (If G was passed in as a dict, a dict of array_like will be returned.) """ ### Next few lines commented out because (while elegant), the situation ### here where G is a distionary is not supported by the rest of the code. # Recursive case, if many G's were specified at once # if type(G) is dict: # M,W = {}, {} # for key in G: M[key], W[key] = self.get_MW(G[key], mode=mode) # return M, W # Check that mode is supported modes = ['G^-1', 'G^-1/2', 'I', 'L^-1'] assert(mode in modes) # Build M matrix according to specified mode if mode == 'G^-1': raise NotImplementedError("G^-1 mode not currently supported.") # M = np.linalg.pinv(G, rcond=1e-12) # #U,S,V = np.linalg.svd(F) # #M = np.einsum('ij,j,jk', V.T, 1./S, U.T) elif mode == 'G^-1/2': raise NotImplementedError("G^-1/2 mode not currently supported.") # U,S,V = np.linalg.svd(G) # M = np.einsum('ij,j,jk', V.T, 1./np.sqrt(S), U.T) elif mode == 'I': # This is not the M matrix as is rigorously defined in the # OQE formalism, because the power spectrum scalar is excluded # in this matrix normalization (i.e., M doesn't do the full # normalization) M = np.diag(1. / np.sum(G, axis=1)) W_norm = np.diag(1. / np.sum(H, axis=1)) W = np.dot(W_norm, H) else: raise NotImplementedError("Cholesky decomposition mode not currently supported.") # # Cholesky decomposition # order = np.arange(G.shape[0]) - np.ceil((G.shape[0]-1.)/2.) # order[order < 0] = order[order < 0] - 0.1 # # Negative integers have larger absolute value so they are sorted # # after positive integers. # order = (np.abs(order)).argsort() # if np.mod(G.shape[0], 2) == 1: # endindex = -2 # else: # endindex = -1 # order = np.hstack([order[:5], order[endindex:], order[5:endindex]]) # iorder = np.argsort(order) # G_o = np.take(np.take(G, order, axis=0), order, axis=1) # L_o = np.linalg.cholesky(G_o) # U,S,V = np.linalg.svd(L_o.conj()) # M_o = np.dot(np.transpose(V), np.dot(np.diag(1./S), np.transpose(U))) # M = np.take(np.take(M_o, iorder, axis=0), iorder, axis=1) return M, W
[docs] def get_Q(self, mode, n_k): """ Response of the covariance to a given bandpower, dC / dp_alpha. Assumes that Q will operate on a visibility vector in frequency space. In other words, produces a matrix Q that performs a two-sided Fourier transform and extracts a particular Fourier mode. (Computing x^t Q y is equivalent to Fourier transforming x and y separately, extracting one element of the Fourier transformed vectors, and then multiplying them.) Parameters ---------- mode : int Central wavenumber (index) of the bandpower, p_alpha. n_k : int Number of k bins that will be . Returns ------- Q : array_like Response matrix for bandpower p_alpha. """ _m = np.zeros((n_k,), dtype=np.complex) _m[mode] = 1. # delta function at specific delay mode # FFT to transform to frequency space m = np.fft.fft(np.fft.ifftshift(_m)) Q = np.einsum('i,j', m, m.conj()) # dot it with its conjugate return Q
[docs] def p_hat(self, M, q): """ Optimal estimate of bandpower p_alpha, defined as p_hat = M q_hat. Parameters ---------- M : array_like Normalization matrix, M. q : array_like Unnormalized bandpowers, \hat{q}. Returns ------- p_hat : array_like Optimal estimate of bandpower, \hat{p}. """ return np.dot(M, q)
[docs] def units(self, little_h=True): """ Return the units of the power spectrum. These are inferred from the units reported by the input visibilities (UVData objects). Parameters ---------- little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc Returns ------- pspec_units : str Units of the power spectrum that is returned by pspec(). """ # Work out the power spectrum units if len(self.dsets) == 0: raise IndexError("No datasets have been added yet; cannot " "calculate power spectrum units.") # get visibility units vis_units = self.dsets[0].vis_units # set pspec norm units if self.primary_beam is None: norm_units = "Hz str [beam normalization not specified]" else: if little_h: h_unit = "h^-3 " else: h_unit = "" norm_units = "{}Mpc^3".format(h_unit) return vis_units, norm_units
[docs] def delays(self): """ Return an array of delays, tau, corresponding to the bins of the delay power spectrum output by pspec() using self.spw_range to specify the spectral window. Returns ------- delays : array_like Delays, tau. Units: ns. """ # Calculate the delays if len(self.dsets) == 0: raise IndexError("No datasets have been added yet; cannot " "calculate delays.") else: return utils.get_delays(self.freqs[self.spw_range[0]:self.spw_range[1]]) * 1e9 # convert to ns
[docs] def scalar(self, pol, taper='none', little_h=True, num_steps=2000, beam=None): """ Computes the scalar function to convert a power spectrum estimate in "telescope units" to cosmological units, using self.spw_range to set spectral window. See arxiv:1304.4991 and HERA memo #27 for details. Parameters ---------- pol: str Which polarization to compute the scalar for. e.g. 'I', 'Q', 'U', 'V', 'XX', 'YY'... taper : str, optional Whether a tapering function (e.g. Blackman-Harris) is being used in the power spectrum estimation. Default: none little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc num_steps : int, optional Number of steps to use when interpolating primary beams for numerical integral Default: 10000 beam : PSpecBeam object Option to use a manually-fed PSpecBeam object instead of using self.primary_beam. Returns ------- scalar: float [\int dnu (\Omega_PP / \Omega_P^2) ( B_PP / B_P^2 ) / (X^2 Y)]^-1 in h^-3 Mpc^3 or Mpc^3. """ # set spw_range and get freqs freqs = self.freqs[self.spw_range[0]:self.spw_range[1]] start = freqs[0] end = freqs[0] + np.median(np.diff(freqs)) * len(freqs) # calculate scalar if beam is None: scalar = self.primary_beam.compute_pspec_scalar( start, end, len(freqs), pol=pol, taper=taper, little_h=little_h, num_steps=num_steps) else: scalar = beam.compute_pspec_scalar(start, end, len(freqs), pol=pol, taper=taper, little_h=little_h, num_steps=num_steps) return scalar
[docs] def pspec(self, bls1, bls2, dsets, input_data_weight='identity', norm='I', taper='none', little_h=True, spw_ranges=None, verbose=True, history=''): """ Estimate the delay power spectrum from a pair of datasets contained in this object, using the optimal quadratic estimator of arXiv:1502.06016. In this formulation, the power spectrum is proportional to the visibility data via P = M data_{LH} E data_{RH} where E contains the data weighting and FT matrices, M is a normalization matrix, and the two separate datasets are denoted as "left-hand" and "right-hand". Each power spectrum is generated by taking a baseline (specified by an antenna-pair and polarization key) from bls1 out of dsets[0] and assigning it as data_LH, and a bl from bls2 out of the dsets[1] and assigning it as data_RH. If the bl chosen from bls1 is (ant1, ant2) and the bl chosen from bls2 is (ant3, ant4), the "baseline-pair" describing their cross multiplication is ((ant1, ant2), (ant3, ant4)). Parameters ---------- bls1, bls2 : list List of baseline groups, each group being a list of ant-pair tuples. dsets : length-2 tuple or list Contains indices of self.dsets to use in forming power spectra, where the first index is for the Left-Hand dataset and second index is used for the Right-Hand dataset (see above). input_data_weight : str, optional String specifying which weighting matrix to apply to the input data. See the options in the set_R() method for details. Default: 'identity'. norm : str, optional String specifying how to choose the normalization matrix, M. See the 'mode' argument of get_MW() for options. Default: 'I'. taper : str, optional Tapering (window) function to apply to the data. Takes the same arguments as aipy.dsp.gen_window(). Default: 'none'. little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc spw_ranges : list of tuples, optional A list of spectral window channel ranges to select within the total bandwidth of the datasets, each of which forms an independent power spectrum estimate. Example: [(220, 320), (650, 775)]. Each tuple should contain a start and stop channel used to index the `freq_array` of each dataset. The default (None) is to use the entire band provided in each dataset. verbose : bool, optional If True, print progress, warnings and debugging info to stdout. history : str, optional history string to attach to UVPSpec object Returns ------- uvp : UVPSpec object Instance of UVPSpec that holds the output power spectrum data. Examples -------- *Example 1:* No grouping; i.e. each baseline is its own group, no brackets needed for each bl. If:: A = (1, 2); B = (2, 3); C = (3, 4); D = (4, 5); E = (5, 6); F = (6, 7) and:: bls1 = [ A, B, C ] bls2 = [ D, E, F ] then:: blpairs = [ (A, D), (B, E), (C, F) ] *Example 2:* Grouping; blpairs come in lists of blgroups, which are considered "grouped" in OQE. If:: bls1 = [ [A, B], [C, D] ] bls2 = [ [C, D], [E, F] ] then:: blpairs = [ [(A, C), (B, D)], [(C, E), (D, F)] ] *Example 3:* Mixed grouping; i.e. some blpairs are grouped, others are not. If:: bls1 = [ [A, B], C ] bls2 = [ [D, E], F ] then:: blpairs = [ [(A, D), (B, E)], (C, F)] """ # Validate the input data to make sure it's sensible self.validate_datasets(verbose=verbose) # Currently the "pspec normalization scalar" doesn't work if a # non-identity data weighting AND a non-trivial taper are used if taper != 'none' and input_data_weight != 'identity': raise_warning("Warning: Scalar power spectrum normalization " "doesn't work with current implementation " "if the tapering AND non-identity " "weighting matrices are both used.", verbose=verbose) # get datasets assert isinstance(dsets, (list, tuple)), "dsets must be fed as length-2 tuple of integers" assert len(dsets) == 2, "len(dsets) must be 2" assert isinstance(dsets[0], (int, np.int)) and isinstance(dsets[1], (int, np.int)), "dsets must contain integer indices" dset1 = self.dsets[self.dset_idx(dsets[0])] dset2 = self.dsets[self.dset_idx(dsets[1])] # get polarization array from zero'th dset pol_arr = map(lambda p: pyuvdata.utils.polnum2str(p), dset1.polarization_array) # assert form of bls1 and bls2 assert len(bls1) == len(bls2), "length of bls1 must equal length of bls2" for i in range(len(bls1)): if isinstance(bls1[i], tuple): assert isinstance(bls2[i], tuple), "bls1[{}] type must match bls2[{}] type".format(i, i) else: assert len(bls1[i]) == len(bls2[i]), "len(bls1[{}]) must match len(bls2[{}])".format(i, i) # construct list of baseline pairs bl_pairs = [] for i in range(len(bls1)): if isinstance(bls1[i], tuple): bl_pairs.append( (bls1[i], bls2[i]) ) elif isinstance(bls1[i], list) and len(bls1[i]) == 1: bl_pairs.append( (bls1[i][0], bls2[i][0]) ) else: bl_pairs.append(map(lambda j: (bls1[i][j] , bls2[i][j]), range(len(bls1[i])))) # validate bl-pair redundancy validate_blpairs(bl_pairs, dset1, dset2, baseline_tol=1.0) # configure spectral window selections if spw_ranges is None: spw_ranges = [(0, self.Nfreqs)] else: assert np.isclose(map(lambda t: len(t), spw_ranges), 2).all(), "spw_ranges must be fed as a list of length-2 tuples" # initialize empty lists data_array = odict() wgt_array = odict() integration_array = odict() time1 = [] time2 = [] lst1 = [] lst2 = [] spws = [] dlys = [] freqs = [] sclr_arr = np.ones((len(spw_ranges), len(pol_arr)), np.float) blp_arr = [] bls_arr = [] # Loop over spectral windows for i in range(len(spw_ranges)): # set spectral range if verbose: print( "\nSetting spectral range: {}".format(spw_ranges[i])) self.set_spw(spw_ranges[i]) # clear covariance cache self.clear_cov_cache() built_GH = False # haven't built Gv and Hv matrices in this spw loop yet # setup emtpy data arrays spw_data = [] spw_wgts = [] spw_ints = [] d = self.delays() * 1e-9 dlys.extend(d) spws.extend(np.ones_like(d, np.int) * i) freqs.extend( dset1.freq_array.flatten()[spw_ranges[i][0]:spw_ranges[i][1]] ) # Loop over polarizations for j, p in enumerate(pol_arr): pol_data = [] pol_wgts = [] pol_ints = [] # Compute scalar to convert "telescope units" to "cosmo units" if self.primary_beam is not None: scalar = self.scalar(p, taper=taper, little_h=True) else: raise_warning("Warning: self.primary_beam is not defined, " "so pspectra are not properly normalized", verbose=verbose) scalar = 1.0 sclr_arr[i, j] = scalar # Loop over baseline pairs for k, blp in enumerate(bl_pairs): # assign keys if isinstance(blp, list): raise NotImplementedError("Baseline lists bls1 and bls2" " must be lists of tuples (not lists of lists" " of tuples).\n" "Use hera_pspec.pspecdata.construct_blpairs()" " to construct appropriately grouped baseline" " lists.") # interpret blp as group of baseline-pairs #key1 = [(dsets[0],) + _blp[0] + (p,) for _blp in blp] #key2 = [(dsets[1],) + _blp[1] + (p,) for _blp in blp] elif isinstance(blp, tuple): # interpret blp as baseline-pair key1 = (dsets[0],) + blp[0] + (p,) key2 = (dsets[1],) + blp[1] + (p,) if verbose: print("\n(bl1, bl2) pair: {}\npol: {}".format(blp, p)) # Set covariance weighting scheme for input data if verbose: print(" Setting weight matrix for input data...") self.set_R(input_data_weight) # Build Fisher matrix if input_data_weight == 'identity' and built_GH: # in this case, all Gv are the same, so skip if already built for this spw! pass else: if verbose: print(" Building G...") print key1, "---", key2 Gv = self.get_G(key1, key2) Hv = self.get_H(key1, key2, taper=taper) built_GH = True # Calculate unnormalized bandpowers if verbose: print(" Building q_hat...") qv = self.q_hat(key1, key2, taper=taper) # Normalize power spectrum estimate if verbose: print(" Normalizing power spectrum...") Mv, Wv = self.get_MW(Gv, Hv, mode=norm) pv = self.p_hat(Mv, qv) # Multiply by scalar if self.primary_beam != None: if verbose: print(" Computing and multiplying scalar...") pv *= scalar Mv *= scalar # Get baseline keys if isinstance(blp, list): bl1 = blp[0][0] bl2 = blp[0][1] else: bl1 = blp[0] bl2 = blp[1] # append bls bls_arr.extend([bl1, bl2]) # insert pspectra pol_data.extend(pv.T) # get weights wgts1 = self.w(key1).T wgts2 = self.w(key2).T # get average of nsample across frequency axis, weighted by wgts nsamp1 = np.sum(dset1.get_nsamples(bl1)[:, self.spw_range[0]:self.spw_range[1]] * wgts1, axis=1) / np.sum(wgts1, axis=1).clip(1, np.inf) nsamp2 = np.sum(dset2.get_nsamples(bl2)[:, self.spw_range[0]:self.spw_range[1]] * wgts2, axis=1) / np.sum(wgts2, axis=1).clip(1, np.inf) # take average of nsamp1 and nsamp2 and multiply by integration time [seconds] to get total integration pol_ints.extend(np.mean([nsamp1, nsamp2], axis=0) * dset1.integration_time) # combined weight is geometric mean pol_wgts.extend(np.concatenate([wgts1[:, :, None], wgts2[:, :, None]], axis=2)) # insert time and blpair info only once if i < 1 and j < 1: # insert time info inds1 = dset1.antpair2ind(*bl1) inds2 = dset1.antpair2ind(*bl2) time1.extend(dset1.time_array[inds1]) time2.extend(dset2.time_array[inds2]) lst1.extend(dset1.lst_array[inds1]) lst2.extend(dset2.lst_array[inds2]) # insert blpair info blp_arr.extend(np.ones_like(inds1, np.int) * uvputils._antnums_to_blpair(blp)) # insert into data and wgts integrations dictionaries spw_data.append(pol_data) spw_wgts.append(pol_wgts) spw_ints.append(pol_ints) # insert into data and integration dictionaries spw_data = np.moveaxis(np.array(spw_data), 0, -1) spw_wgts = np.moveaxis(np.array(spw_wgts), 0, -1) spw_ints = np.moveaxis(np.array(spw_ints), 0, -1) data_array[i] = spw_data wgt_array[i] = spw_wgts integration_array[i] = spw_ints # fill uvp object uvp = uvpspec.UVPSpec() # fill meta-data uvp.time_1_array = np.array(time1) uvp.time_2_array = np.array(time2) uvp.time_avg_array = np.mean([uvp.time_1_array, uvp.time_2_array], axis=0) uvp.lst_1_array = np.array(lst1) uvp.lst_2_array = np.array(lst2) uvp.lst_avg_array = np.mean([np.unwrap(uvp.lst_1_array), np.unwrap(uvp.lst_2_array)], axis=0) % (2*np.pi) uvp.blpair_array = np.array(blp_arr) uvp.Nblpairs = len(np.unique(blp_arr)) uvp.Ntimes = len(np.unique(time1)) uvp.Nblpairts = len(time1) bls_arr = sorted(set(bls_arr)) uvp.bl_array = np.array(map(lambda bl: uvp.antnums_to_bl(bl), bls_arr)) antpos = dict(zip(dset1.antenna_numbers, dset1.antenna_positions)) uvp.bl_vecs = np.array(map(lambda bl: antpos[bl[0]] - antpos[bl[1]], bls_arr)) uvp.Nbls = len(uvp.bl_array) uvp.spw_array = np.array(spws) uvp.freq_array = np.array(freqs) uvp.dly_array = np.array(dlys) uvp.Nspws = len(np.unique(spws)) uvp.Ndlys = len(np.unique(dlys)) uvp.Nspwdlys = len(spws) uvp.Nfreqs = len(np.unique(freqs)) uvp.pol_array = np.array(map(lambda p: uvutils.polstr2num(p), pol_arr)) uvp.Npols = len(pol_arr) uvp.scalar_array = np.array(sclr_arr) uvp.channel_width = dset1.channel_width uvp.weighting = input_data_weight uvp.vis_units, uvp.norm_units = self.units(little_h=little_h) uvp.telescope_location = dset1.telescope_location uvp.history = dset1.history + dset2.history + history uvp.taper = taper uvp.norm = norm uvp.git_hash = version.git_hash if self.primary_beam is not None: # attach cosmology uvp.cosmo = self.primary_beam.cosmo # attach beam info uvp.beam_freqs = self.primary_beam.beam_freqs uvp.OmegaP, uvp.OmegaPP = self.primary_beam.get_Omegas(uvp.pol_array) if hasattr(self.primary_beam, 'filename'): uvp.beamfile = self.primary_beam.filename if hasattr(dset1.extra_keywords, 'filename'): uvp.filename1 = dset1.extra_keywords['filename'] if hasattr(dset2.extra_keywords, 'filename'): uvp.filename2 = dset2.extra_keywords['filename'] lbl1 = self.labels[self.dset_idx(dsets[0])] lbl2 = self.labels[self.dset_idx(dsets[1])] if lbl1 is not None: uvp.label1 = lbl1 if lbl2 is not None: uvp.label2 = lbl2 # fill data arrays uvp.data_array = data_array uvp.integration_array = integration_array uvp.wgt_array = wgt_array uvp.nsample_array = dict(map(lambda k: (k, np.ones_like(uvp.integration_array[k], np.float)), uvp.integration_array.keys())) # run check uvp.check() return uvp
[docs] def rephase_to_dset(self, dset_index=0, inplace=True): """ Rephase visibility data in self.dsets to the LST grid of dset[dset_index] using hera_cal.utils.lst_rephase. Each integration in all other dsets are phased to the center of the corresponding LST bin (by index) in dset[dset_index]. Will only phase if the dataset's phase type is 'drift'. Parameters ---------- dset_index : int or str Index or label of dataset in self.dset to phase other datasets to. inplace : bool, optional If True, edits data in dsets in-memory. Else, makes a copy of dsets, edits data in the copy and returns to user. Returns ------- if inplace: return new_dsets else: return None """ # run dataset validation self.validate_datasets() # assign dsets if inplace: dsets = self.dsets else: dsets = copy.deepcopy(self.dsets) # Parse dset_index dset_index = self.dset_idx(dset_index) # get LST grid we are phasing to lst_grid = [] lst_array = dsets[dset_index].lst_array.ravel() for l in lst_array: if l not in lst_grid: lst_grid.append(l) lst_grid = np.array(lst_grid) # get polarization list pol_list = dsets[dset_index].polarization_array.tolist() # iterate over dsets for i, dset in enumerate(dsets): # don't rephase dataset we are using as our LST anchor if i == dset_index: # even though not phasing this dset, must set to match all other # dsets due to phasing-check validation dset.phase_type = 'unknown' continue # skip if dataset is not drift phased if dset.phase_type != 'drift': print "skipping dataset {} b/c it isn't drift phased".format(i) # convert UVData to DataContainers. Note this doesn't make # a copy of the data (data, flgs, antpos, ants, freqs, times, lsts, pols) = hc.io.load_vis(dset, return_meta=True) # make bls dictionary bls = dict(map(lambda k: (k, antpos[k[0]] - antpos[k[1]]), data.keys())) # Get dlst array dlst = lst_grid - lsts # get telescope latitude lat = dset.telescope_location_lat_lon_alt_degrees[0] # rephase hc.utils.lst_rephase(data, bls, freqs, dlst, lat=lat) # re-insert into dataset for j, k in enumerate(data.keys()): # get blts indices of basline indices = dset.antpair2ind(*k[:2]) # get index in polarization_array for this polarization polind = pol_list.index(hc.io.polstr2num[k[-1]]) # insert into dset dset.data_array[indices, 0, :, polind] = data[k] # set phasing in UVData object to unknown b/c there isn't a single # consistent phasing for the entire data set. dset.phase_type = 'unknown' if inplace is False: return dsets
def construct_blpairs(bls, exclude_auto_bls=False, exclude_permutations=False, group=False, Nblps_per_group=1): """ Construct a list of baseline-pairs from a baseline-group. This function can be used to easily convert a single list of baselines into the input needed by PSpecData.pspec(bls1, bls2, ...). Parameters ---------- bls : list of baseline tuples, Ex. [(1, 2), (2, 3), (3, 4)] exclude_auto_bls: boolean, if True, exclude all baselines crossed with itself from the final blpairs list exclude_permutations : boolean, if True, exclude permutations and only form combinations of the bls list. For example, if bls = [1, 2, 3] (note this isn't the proper form of bls, but makes this example clearer) and exclude_permutations = False, then blpairs = [11, 12, 13, 21, 22, 23,, 31, 32, 33]. If however exclude_permutations = True, then blpairs = [11, 12, 13, 22, 23, 33]. Furthermore, if exclude_auto_bls = True then 11, 22, and 33 would additionally be excluded. group : boolean, optional if True, group each consecutive Nblps_per_group blpairs into sub-lists Nblps_per_group : integer, number of baseline-pairs to put into each sub-group Returns (bls1, bls2, blpairs) ------- bls1 : list of baseline tuples from the zeroth index of the blpair bls2 : list of baseline tuples from the first index of the blpair blpairs : list of blpair tuples """ # assert form assert isinstance(bls, list) and isinstance(bls[0], tuple), "bls must be fed as list of baseline tuples" # form blpairs w/o explicitly forming auto blpairs # however, if there are repeated bl in bls, there will be auto bls in blpairs if exclude_permutations: blpairs = list(itertools.combinations(bls, 2)) else: blpairs = list(itertools.permutations(bls, 2)) # explicitly add in auto baseline pairs blpairs.extend(zip(bls, bls)) # iterate through and eliminate all autos if desired if exclude_auto_bls: new_blpairs = [] for blp in blpairs: if blp[0] != blp[1]: new_blpairs.append(blp) blpairs = new_blpairs # create bls1 and bls2 list bls1 = map(lambda blp: blp[0], blpairs) bls2 = map(lambda blp: blp[1], blpairs) # group baseline pairs if desired if group: Nblps = len(blpairs) Ngrps = int(np.ceil(float(Nblps) / Nblps_per_group)) new_blps = [] new_bls1 = [] new_bls2 = [] for i in range(Ngrps): new_blps.append(blpairs[i*Nblps_per_group:(i+1)*Nblps_per_group]) new_bls1.append(bls1[i*Nblps_per_group:(i+1)*Nblps_per_group]) new_bls2.append(bls2[i*Nblps_per_group:(i+1)*Nblps_per_group]) bls1 = new_bls1 bls2 = new_bls2 blpairs = new_blps return bls1, bls2, blpairs def validate_blpairs(blpairs, uvd1, uvd2, baseline_tol=1.0, verbose=True): """ Validate baseline pairings in the blpair list are redundant within the specified tolerance. Parameters ---------- blpairs : list of baseline-pair tuples, Ex. [((1,2),(1,2)), ((2,3),(2,3))] See docstring of PSpecData.pspec() for details on format. uvd1 : pyuvdata.UVData instance containing visibility data that first bl in blpair will draw from uvd2 : pyuvdata.UVData instance containing visibility data that second bl in blpair will draw from baseline_tol : float, distance tolerance for notion of baseline "redundancy" in meters verbose : bool, if True report feedback to stdout """ # ensure uvd1 and uvd2 are UVData objects if isinstance(uvd1, pyuvdata.UVData) == False: raise TypeError("uvd1 must be a pyuvdata.UVData instance") if isinstance(uvd2, pyuvdata.UVData) == False: raise TypeError("uvd2 must be a pyuvdata.UVData instance") # get antenna position dictionary ap1, a1 = uvd1.get_ENU_antpos(pick_data_ants=True) ap2, a2 = uvd1.get_ENU_antpos(pick_data_ants=True) ap1 = dict(zip(a1, ap1)) ap2 = dict(zip(a2, ap2)) # ensure shared antenna keys match within tolerance shared = sorted(set(ap1.keys()) & set(ap2.keys())) for k in shared: assert np.linalg.norm(ap1[k] - ap2[k]) <= baseline_tol, "uvd1 and uvd2 don't agree on antenna positions within tolerance of {} m".format(baseline_tol) ap = ap1 ap.update(ap2) # iterate through baselines and check baselines crossed with each other are within tolerance for i, blg in enumerate(blpairs): if isinstance(blg, tuple): blg = [blg] for blp in blg: bl1_vec = ap[blp[0][0]] - ap[blp[0][1]] bl2_vec = ap[blp[1][0]] - ap[blp[1][1]] if np.linalg.norm(bl1_vec - bl2_vec) >= baseline_tol: raise_warning("blpair {} exceeds redundancy tolerance of {} m".format(blp, baseline_tol), verbose=verbose) def raise_warning(warning, verbose=True): '''warning function''' if verbose: print(warning)