Source code for hera_pspec.pspecdata

import numpy as np
from pyuvdata import UVData, UVCal
import copy, operator, itertools, sys
from collections import OrderedDict as odict
import hera_cal as hc
from pyuvdata import utils as uvutils
import datetime
import time
import argparse
import ast
import glob
import warnings
import json
import uvtools.dspec as dspec
import logging
from . import uvpspec, utils, __version__, pspecbeam, container, uvpspec_utils as uvputils
import warnings
from functools import lru_cache

logger = logging.getLogger(__name__)

[docs]class PSpecData:
[docs] def __init__(self, dsets=None, wgts=None, dsets_std=None, labels=None, beam=None, cals=None, cal_flag=True): """ Object to store multiple sets of UVData visibilities and perform operations such as power spectrum estimation on them. Parameters ---------- dsets : list or dict of UVData objects, optional Set of UVData objects containing the data that will be used to compute the power spectrum. If specified as a dict, the key names will be used to tag each dataset. Default: Empty list. dsets_std: list or dict of UVData objects, optional Set of UVData objects containing the standard deviations of each data point in UVData objects in dsets. If specified as a dict, the key names will be used to tag each dataset. Default: []. wgts : list or dict of UVData objects, optional Set of UVData objects containing weights for the input data. Default: None (will use the flags of each input UVData object). labels : list of str, optional An ordered list of names/labels for each dataset, if dsets was specified as a list. If None, names will not be assigned to the datasets. If dsets was specified as a dict, the keys of that dict will be used instead of this. Default: None. beam : PspecBeam object, optional PspecBeam object containing information about the primary beam Default: None. cals : list of UVCal objects, optional Calibration objects to apply to data. One per dset or one for all dsets. cal_flag : bool, optional If True, propagate flags from calibration into data """ dsets = [] if dsets is None else dsets self.clear_cache() # clear matrix cache self.dsets = []; self.wgts = []; self.labels = [] self.dsets_std = [] self.Nfreqs = None self.spw_range = None self.spw_Nfreqs = None self.spw_Ndlys = None # r_params is a dictionary that stores parameters for # parametric R matrices. self.r_params = {} self.filter_extension = (0, 0) self.cov_regularization = 0. # set data weighting to identity by default # and taper to none by default self.data_weighting = 'identity' self.taper = 'none' self.symmetric_taper = True # Set all weights to None if wgts=None if wgts is None: wgts = [None for dset in dsets] # set dsets_std to None if any are None. if not dsets_std is None and None in dsets_std: dsets_std = None # Store the input UVData objects if specified if len(dsets) > 0: self.add(dsets, wgts, dsets_std=dsets_std, labels=labels, cals=cals, cal_flag=cal_flag) # Store a primary beam self.primary_beam = beam if not hasattr(self, "_get_qalt_cached"): # Set the lru-cached get_Q_alt function, with a maxsize of Ndlys self._get_qalt_cached = lru_cache(maxsize=None)(utils.get_Q_alt) self._get_qalt_cached_tensor = lru_cache(maxsize=4)(utils.get_Q_alt_tensor)
[docs] def add(self, dsets, wgts, labels=None, dsets_std=None, cals=None, cal_flag=True): """ Add a dataset to the collection in this PSpecData object. Parameters ---------- dsets : UVData or list or dict UVData object or list of UVData objects containing data to add to the collection. wgts : UVData or list or dict UVData object or list of UVData objects containing weights to add to the collection. Must be the same length as dsets. If a weight is set to None, the flags of the corresponding dset are used. labels : list of str An ordered list of names/labels for each dataset, if dsets was specified as a list. If dsets was specified as a dict, the keys of that dict will be used instead. dsets_std: UVData or list or dict Optional UVData object or list of UVData objects containing the standard deviations (real and imaginary) of data to add to the collection. If dsets is a dict, will assume dsets_std is a dict and if dsets is a list, will assume dsets_std is a list. cals : UVCal or list, optional UVCal objects to apply to data. cal_flag : bool, optional If True, propagate flags from calibration into data """ # Check for dicts and unpack into an ordered list if found if isinstance(dsets, dict): # Disallow labels kwarg if a dict was passed if labels is not None: raise ValueError("If 'dsets' is a dict, 'labels' cannot be " "specified.") labels = list(dsets.keys()) if wgts is None: wgts = dict([(l, None) for l in labels]) elif not isinstance(wgts, dict): raise TypeError("If 'dsets' is a dict, 'wgts' must also be " "a dict") if dsets_std is None: dsets_std = dict([(l, None) for l in labels]) elif not isinstance(dsets_std, dict): raise TypeError("If 'dsets' is a dict, 'dsets_std' must also be " "a dict") if cals is None: cals = dict([(l, None) for l in labels]) elif not isinstance(cals, dict): raise TypeError("If 'cals' is a dict, 'cals' must also be " "a dict") # Unpack dsets and wgts dicts dsets = [dsets[key] for key in labels] dsets_std = [dsets_std[key] for key in labels] wgts = [wgts[key] for key in labels] cals = [cals[key] for key in labels] # Convert input args to lists if possible if isinstance(dsets, UVData): dsets = [dsets,] if isinstance(wgts, UVData): wgts = [wgts,] if isinstance(labels, str): labels = [labels,] if isinstance(dsets_std, UVData): dsets_std = [dsets_std,] if isinstance(cals, UVCal): cals = [cals,] if wgts is None: wgts = [wgts,] if dsets_std is None: dsets_std = [dsets_std for m in range(len(dsets))] if cals is None: cals = [cals for m in range(len(dsets))] if isinstance(dsets, tuple): dsets = list(dsets) if isinstance(wgts, tuple): wgts = list(wgts) if isinstance(dsets_std, tuple): dsets_std = list(dsets_std) if isinstance(cals, tuple): cals = list(cals) # Only allow UVData or lists if not isinstance(dsets, list) or not isinstance(wgts, list)\ or not isinstance(dsets_std, list) or not isinstance(cals, list): raise TypeError("dsets, dsets_std, wgts and cals must be UVData" "UVCal, or lists of UVData or UVCal") # Make sure enough weights were specified assert len(dsets) == len(wgts), \ "The dsets and wgts lists must have equal length" assert len(dsets_std) == len(dsets), \ "The dsets and dsets_std lists must have equal length" assert len(cals) == len(dsets), \ "The dsets and cals lists must have equal length" if labels is not None: assert len(dsets) == len(labels), \ "If labels are specified, the dsets and labels lists " \ "must have equal length" # Check that everything is a UVData object for d, w, s in zip(dsets, wgts, dsets_std): if not isinstance(d, UVData): raise TypeError("Only UVData objects can be used as datasets.") elif not d.future_array_shapes: warnings.warn('Converting data to future_array_shapes...') d.use_future_array_shapes() if not isinstance(w, UVData) and w is not None: raise TypeError("Only UVData objects (or None) can be used as " "weights.") if not isinstance(s, UVData) and s is not None: raise TypeError("Only UVData objects (or None) can be used as " "error sets") for c in cals: if not isinstance(c, UVCal) and c is not None: raise TypeError("Only UVCal objects can be used for calibration.") # Store labels (if they were set) if self.labels is None: self.labels = [] if labels is None: labels = ["dset{:d}".format(i) for i in range(len(self.dsets), len(dsets) + len(self.dsets))] # Apply calibration if provided for dset, dset_std, cal in zip(dsets, dsets_std, cals): if cal is not None: if dset is not None: uvutils.uvcalibrate(dset, cal, inplace=True, prop_flags=cal_flag) dset.extra_keywords['calibration'] = cal.extra_keywords.get('filename', '""') if dset_std is not None: uvutils.uvcalibrate(dset_std, cal, inplace=True, prop_flags=cal_flag) dset_std.extra_keywords['calibration'] = cal.extra_keywords.get('filename', '""') # Append to list self.dsets += dsets self.wgts += wgts self.dsets_std += dsets_std self.labels += labels # Check for repeated labels, and make them unique for i, l in enumerate(self.labels): ext = 1 while ext < 1e5: if l in self.labels[:i]: l = self.labels[i] + ".{:d}".format(ext) ext += 1 else: self.labels[i] = l break # Store no. frequencies and no. times # We are still only supporting dsets with same number of times self.Nfreqs = self.dsets[0].Nfreqs self.Ntimes = self.dsets[0].Ntimes # Store the actual frequencies self.freqs = self.dsets[0].freq_array self.spw_range = (0, self.Nfreqs) self.spw_Nfreqs = self.Nfreqs self.spw_Ndlys = self.spw_Nfreqs
def __str__(self): """ Print basic info about this PSpecData object. """ # Basic info s = "PSpecData object\n" s += " %d datasets" % len(self.dsets) if len(self.dsets) == 0: return s # Dataset summary for i, d in enumerate(self.dsets): if self.labels[i] is None: s += " dset (%d): %d bls (freqs=%d, times=%d, pols=%d)\n" \ % (i, d.Nbls, d.Nfreqs, d.Ntimes, d.Npols) else: s += " dset '%s' (%d): %d bls (freqs=%d, times=%d, pols=%d)\n" \ % (self.labels[i], i, d.Nbls, d.Nfreqs, d.Ntimes, d.Npols) return s
[docs] def validate_datasets(self, verbose=True): """ Validate stored datasets and weights to make sure they are consistent with one another (e.g. have the same shape, baselines etc.). """ # check dsets and wgts have same number of elements if len(self.dsets) != len(self.wgts): raise ValueError("self.wgts does not have same len as self.dsets") if len(self.dsets_std) != len(self.dsets): raise ValueError("self.dsets_std does not have the same len as " "self.dsets") if len(self.labels) != len(self.dsets): raise ValueError("self.labels does not have same len as self.dsets") # Check if dsets are all the same shape along freq axis Nfreqs = [d.Nfreqs for d in self.dsets] channel_widths = [d.channel_width for d in self.dsets] if np.unique(Nfreqs).size > 1: raise ValueError("all dsets must have the same Nfreqs") if not np.allclose(channel_widths[0], channel_widths): raise ValueError("all dsets must have the same channel_widths") # Check shape along time axis Ntimes = [d.Ntimes for d in self.dsets] if np.unique(Ntimes).size > 1: raise ValueError("all dsets must have the same Ntimes") # raise warnings if times don't match if len(self.dsets) > 1: lst_diffs = np.array( [ np.unique(self.dsets[0].lst_array) - np.unique(dset.lst_array) for dset in self.dsets[1:]] ) if np.max(np.abs(lst_diffs)) > 0.001: raise_warning("Warning: LST bins in dsets misaligned by more than 15 seconds", verbose=verbose) # raise warning if frequencies don't match freq_diffs = np.array( [ np.unique(self.dsets[0].freq_array) - np.unique(dset.freq_array) for dset in self.dsets[1:]] ) if np.max(np.abs(freq_diffs)) > 0.001e6: raise_warning("Warning: frequency bins in dsets misaligned by more than 0.001 MHz", verbose=verbose) # Check phase type phase_types = [] for d in self.dsets: phase_types.append(d.phase_type) if np.unique(phase_types).size > 1: raise ValueError("all datasets must have the same phase type " "(i.e. 'drift', 'phased', ...)\ncurrent phase " "types are {}".format(phase_types)) # Check phase centers if phase type is phased if 'phased' in set(phase_types): phase_ra = [d.phase_center_app_ra_degrees for d in self.dsets] phase_dec = [d.phase_center_app_dec_degrees for d in self.dsets] max_diff_ra = np.max( [np.diff(d) for d in itertools.combinations(phase_ra, 2)]) max_diff_dec = np.max([np.diff(d) for d in itertools.combinations(phase_dec, 2)]) max_diff = np.sqrt(max_diff_ra**2 + max_diff_dec**2) if max_diff > 0.15: raise_warning("Warning: maximum phase-center difference " "between datasets is > 10 arcmin", verbose=verbose)
[docs] def check_key_in_dset(self, key, dset_ind): """ Check 'key' exists in the UVData object self.dsets[dset_ind] Parameters ---------- key : tuple if length 1: assumed to be polarization number or string elif length 2: assumed to be antenna-number tuple (ant1, ant2) elif length 3: assumed ot be antenna-number-polarization tuple (ant1, ant2, pol) dset_ind : int, the index of the dataset to-be-checked Returns ------- exists : bool True if the key exists, False otherwise """ #FIXME: Fix this to enable label keys # get iterable key = uvutils._get_iterable(key) if isinstance(key, str): key = (key,) # check key is a tuple if isinstance(key, tuple) == False or len(key) not in (1, 2, 3): raise KeyError("key {} must be a length 1, 2 or 3 tuple".format(key)) try: _ = self.dsets[dset_ind]._key2inds(key) return True except KeyError: return False
[docs] def clear_cache(self, keys=None): """ Clear stored matrix data (or some subset of it). Parameters ---------- keys : list of tuples, optional List of keys to remove from matrix cache. If None, all keys will be removed. Default: None. """ if keys is None: self._C, self._I, self._iC, self._Y, self._R = {}, {}, {}, {}, {} self._identity_G, self._identity_H, self._identity_Y = {}, {}, {} else: for k in keys: try: del(self._C[k]) except(KeyError): pass try: del(self._I[k]) except(KeyError): pass try: del(self._iC[k]) except(KeyError): pass try: del(self.r_params[k]) except(KeyError): pass try: del(self._Y[k]) except(KeyError): pass try: del(self._R[k]) except(KeyError): pass
[docs] def dset_idx(self, dset): """ Return the index of a dataset, regardless of whether it was specified as an integer of a string. Parameters ---------- dset : int or str Index or name of a dataset belonging to this PSpecData object. Returns ------- dset_idx : int Index of dataset. """ # Look up dset label if it's a string if isinstance(dset, str): if dset in self.labels: return self.labels.index(dset) else: raise KeyError("dset '%s' not found." % dset) elif isinstance(dset, (int, np.integer)): return dset else: raise TypeError("dset must be either an int or string")
[docs] def parse_blkey(self, key): """ Parse a dataset + baseline key in the form (dataset, baseline, [pol]) into a dataset index and a baseline(pol) key, where pol is optional. Parameters ---------- key : tuple Returns ------- dset : int dataset index bl : tuple baseline (pol) key """ # type check assert isinstance(key, tuple), "key must be fed as a tuple" assert len(key) > 1, "key must have len >= 2" # get dataset index dset_idx = self.dset_idx(key[0]) key = key[1:] # get baseline bl = key[0] if isinstance(bl, (int, np.integer)): assert len(key) > 1, "baseline must be fed as a tuple" bl = tuple(key[:2]) key = key[2:] else: key = key[1:] assert isinstance(bl, tuple), "baseline must be fed as a tuple, %s" % bl # put pol into bl key if it exists if len(key) > 0: pol = key[0] assert isinstance(pol, (str, int, np.integer)), \ "pol must be fed as a str or int" bl += (key[0],) return dset_idx, bl
[docs] def x(self, key, include_extension=False): """ Get data for a given dataset and baseline, as specified in a standard key format. Parameters ---------- key : tuple Tuple containing dataset ID and baseline index. The first element of the tuple is the dataset index (or label), and the subsequent elements are the baseline ID. include_extension : bool (optional) default=False If True, extend spw to include filtering window extensions. Returns ------- x : array_like Array of data from the requested UVData dataset and baseline. """ dset, bl = self.parse_blkey(key) spw = slice(*self.get_spw(include_extension=include_extension)) return self.dsets[dset].get_data(bl).T[spw]
[docs] def dx(self, key, include_extension=False): """ Get standard deviation of data for given dataset and baseline as pecified in standard key format. Parameters ---------- key : tuple Tuple containing datset ID and baseline index. The first element of the tuple is the dataset index (or label), and the subsequent elements are the baseline ID. include_extension : bool (optional) default=False If True, extend spw to include filtering window extensions. Returns ------- dx : array_like Array of std data from the requested UVData dataset and baseline. """ assert isinstance(key, tuple) dset,bl = self.parse_blkey(key) spw = slice(*self.get_spw(include_extension=include_extension)) return self.dsets_std[dset].get_data(bl).T[spw]
[docs] def w(self, key, include_extension=False): """ Get weights for a given dataset and baseline, as specified in a standard key format. Parameters ---------- key : tuple Tuple containing dataset ID and baseline index. The first element of the tuple is the dataset index, and the subsequent elements are the baseline ID. include_extension : bool (optional) default=False If True, extend spw to include filtering window extensions. Returns ------- w : array_like Array of weights for the requested UVData dataset and baseline. """ dset, bl = self.parse_blkey(key) spw = slice(*self.get_spw(include_extension=include_extension)) if self.wgts[dset] is not None: return self.wgts[dset].get_data(bl).T[spw] else: # If weights were not specified, use the flags built in to the # UVData dataset object wgts = (~self.dsets[dset].get_flags(bl)).astype(float).T[spw] return wgts
[docs] def set_C(self, cov): """ Set the cached covariance matrix to a set of user-provided values. Parameters ---------- cov : dict Covariance keys and ndarrays. The key should conform to (dset_pair_index, blpair_int, model, time_index, conj_1, conj_2). e.g. ((0, 1), ((25,37,"xx"), (25, 37, "xx")), 'empirical', False, True) while the ndarrays should have shape (spw_Nfreqs, spw_Nfreqs) """ self.clear_cache(cov.keys()) for key in cov: self._C[key] = cov[key]
[docs] def get_spw(self, include_extension=False): """ Get self.spw_range with or without spw extension (self.filter_extension) Parameters ---------- include_extension : bool If True, include self.filter_extension in spw_range Returns ------- spectral_window : tuple In (start_chan, end_chan). """ if sum(self.filter_extension) > 0: include_extension = True # if there is non-zero self.filter_extension, include_extension is automatically set to be True if include_extension: return (self.spw_range[0] - self.filter_extension[0], self.spw_range[1] + self.filter_extension[1]) else: return self.spw_range
[docs] def C_model(self, key, model='empirical', time_index=None, known_cov=None, include_extension=False): """ Return a covariance model having specified a key and model type. Note: Time-dependent flags that differ from frequency channel-to-channel can create spurious spectral structure. Consider factorizing the flags with self.broadcast_dset_flags() before using model='empirical'. Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. model : string, optional Type of covariance model to calculate, if not cached. Options=['empirical', 'dsets', 'autos', (other model names in known_cov)] How the covariances of the input data should be estimated. In 'dsets' mode, error bars are estimated from user-provided per baseline and per channel standard deivations. If 'empirical' is set, then error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. If 'autos' is set, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas over channel bandwidth and integration time. time_index : integer, compute covariance at specific time-step in dset supported if mode == 'dsets' or 'autos' known_cov : dicts of covariance matrices Covariance matrices that are imported from a outer dict instead of using data stored or calculated inside the PSpecData object. known_cov could be initialized when using PSpecData.pspec() method. See PSpecData.pspec() for more details. include_extension : bool (optional) default=False If True, extend spw to include filtering window extensions. Returns ------- C : ndarray, (spw_Nfreqs, spw_Nfreqs) Covariance model for the specified key. """ # type check assert isinstance(key, tuple), "key must be fed as a tuple" assert isinstance(model, str), "model must be a string" # parse key dset, bl = self.parse_blkey(key) if model == 'empirical': # add model to key Ckey = ((dset, dset), (bl,bl), ) + (model, None, False, True,) else: assert isinstance(time_index, int), "time_index must be integer if cov-model=={}".format(model) # add model to key Ckey = ((dset, dset), (bl,bl), ) + (model, time_index, False, True,) # Check if Ckey exists in known_cov. If so, just update self._C[Ckey] with known_cov. if known_cov is not None: if Ckey in known_cov.keys(): spw = slice(*self.get_spw(include_extension=include_extension)) covariance = known_cov[Ckey][spw, spw] self.set_C({Ckey: covariance}) # check cache if Ckey not in self._C: # calculate covariance model if model == 'empirical': self.set_C({Ckey: utils.cov(self.x(key, include_extension=include_extension), self.w(key, include_extension=include_extension))}) elif model == 'dsets': self.set_C({Ckey: np.diag( np.abs(self.w(key, include_extension=include_extension)[:,time_index] * self.dx(key, include_extension=include_extension)[:,time_index]) ** 2. )}) elif model == 'autos': spw_range = self.get_spw(include_extension=include_extension) self.set_C({Ckey: np.diag(utils.variance_from_auto_correlations(self.dsets[dset], bl, spw_range, time_index))}) else: raise ValueError("didn't recognize Ckey {}".format(Ckey)) return self._C[Ckey]
[docs] def cross_covar_model(self, key1, key2, model='empirical', time_index=None, conj_1=False, conj_2=True, known_cov=None, include_extension=False): """ Return a covariance model having specified a key and model type. Note: Time-dependent flags that differ from frequency channel-to-channel can create spurious spectral structure. Consider factorizing the flags with self.broadcast_dset_flags() before using model='time_average'. Parameters ---------- key1, key2 : tuples Tuples containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. model : string, optional Type of covariance model to calculate, if not cached. Options=['empirical', 'dsets', 'autos', (other model names in known_cov)] How the covariances of the input data should be estimated. In 'dsets' mode, error bars are estimated from user-provided per baseline and per channel standard deivations. If 'empirical' is set, then error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. If 'autos' is set, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas over channel bandwidth and integration time. time_index : integer, compute covariance at specific time-step conj_1 : boolean, optional Whether to conjugate first copy of data in covar or not. Default: False conj_2 : boolean, optional Whether to conjugate second copy of data in covar or not. Default: True known_cov : dicts of covariance matrices Covariance matrices that are imported from a outer dict instead of using data stored or calculated inside the PSpecData object. known_cov could be initialized when using PSpecData.pspec() method. See PSpecData.pspec() for more details. include_extension : bool (optional) default=False If True, extend spw to include filtering window extensions. Returns ------- cross_covar : ndarray, (spw_Nfreqs, spw_Nfreqs) Cross covariance model for the specified key. """ # type check assert isinstance(key1, tuple), "key1 must be fed as a tuple" assert isinstance(key2, tuple), "key2 must be fed as a tuple" assert isinstance(model, str), "model must be a string" # parse key dset1, bl1 = self.parse_blkey(key1) dset2, bl2 = self.parse_blkey(key2) covar = None if model == 'empirical': covar = utils.cov(self.x(key1, include_extension=include_extension), self.w(key1, include_extension=include_extension), self.x(key2, include_extension=include_extension), self.w(key2, include_extension=include_extension), conj_1=conj_1, conj_2=conj_2) if model in ['dsets','autos']: covar = np.zeros((np.diff(self.get_spw(include_extension=include_extension))[0], np.diff(self.get_spw(include_extension=include_extension))[0]), dtype=np.float64) # Check if model exists in known_cov. If so, just overwrite covar with known_cov. if known_cov is not None: Ckey = ((dset1, dset2), (bl1,bl2), ) + (model, time_index, conj_1, conj_2,) if Ckey in known_cov.keys(): spw = slice(*self.get_spw(include_extension=include_extension)) covar = known_cov[Ckey][spw, spw] if covar is None: raise ValueError("didn't recognize model {}".format(model)) return covar
[docs] def I(self, key): """ Return identity covariance matrix. Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. Returns ------- I : array_like Identity covariance matrix, dimension (Nfreqs, Nfreqs). """ assert isinstance(key, tuple) # parse key dset, bl = self.parse_blkey(key) key = (dset,) + (bl,) if key not in self._I: self._I[key] = np.identity(self.spw_Nfreqs + np.sum(self.filter_extension)) return self._I[key]
[docs] def iC(self, key, model='empirical', time_index=None): """ Return the inverse covariance matrix, C^-1. Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. model : string, optional Type of covariance model to calculate, if not cached. Options=['empirical', 'dsets', 'autos'] How the covariances of the input data should be estimated. In 'dsets' mode, error bars are estimated from user-provided per baseline and per channel standard deivations. If 'empirical' is set, then error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. If 'autos' is set, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas over channel bandwidth and integration time. time_index : integer, compute covariance at specific time-step Returns ------- iC : array_like Inverse covariance matrix for specified dataset and baseline. """ assert isinstance(key, tuple) # parse key dset, bl = self.parse_blkey(key) key = (dset,) + (bl,) Ckey = ((dset, dset), (bl,bl), ) + (model, time_index, False, True,) # Calculate inverse covariance if not in cache if Ckey not in self._iC: C = self.C_model(key, model=model, time_index=time_index) #U,S,V = np.linalg.svd(C.conj()) # conj in advance of next step if np.linalg.cond(C) >= 1e9: warnings.warn("Poorly conditioned covariance. Computing Pseudo-Inverse") ic = np.linalg.pinv(C) else: ic = np.linalg.inv(C) # FIXME: Not sure what these are supposed to do #if self.lmin is not None: S += self.lmin # ensure invertibility #if self.lmode is not None: S += S[self.lmode-1] # FIXME: Is series of dot products quicker? self.set_iC({Ckey:ic}) return self._iC[Ckey]
[docs] def Y(self, key): """ Return the weighting (diagonal) matrix, Y. This matrix is calculated by taking the logical AND of flags across all times given the dset-baseline-pol specification in 'key', converted into a float, and inserted along the diagonal of an spw_Nfreqs x spw_Nfreqs matrix. The logical AND step implies that all time-dependent flagging patterns are automatically broadcasted across all times. This broadcasting follows the principle that, for each freq channel, if at least a single time is unflagged, then the channel is treated as unflagged for all times. Power spectra from certain times, however, can be given zero weight by setting the nsample array to be zero at those times (see self.broadcast_dset_flags). Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. Returns ------- Y : array_like spw_Nfreqs x spw_Nfreqs diagonal matrix holding AND of flags across all times for each freq channel. """ assert isinstance(key, tuple) # parse key dset, bl = self.parse_blkey(key) key = (dset,) + (bl,) if key not in self._Y: self._Y[key] = np.diag(np.max(self.w(key), axis=1)) if not np.all(np.isclose(self._Y[key], 0.0) \ + np.isclose(self._Y[key], 1.0)): raise NotImplementedError("Non-binary weights not currently implmented") return self._Y[key]
[docs] def set_iC(self, d): """ Set the cached inverse covariance matrix for a given dataset and baseline to a specified value. For now, you should already have applied weights to this matrix. Parameters ---------- d : dict Dictionary containing data to insert into inverse covariance matrix cache. Keys are tuples, following the same format as the input to self.iC(). """ for k in d: self._iC[k] = d[k]
[docs] def set_R(self, d): """ Set the data-weighting matrix for a given dataset and baseline to a specified value for later use in q_hat. Parameters ---------- d : dict Dictionary containing data to insert into data-weighting R matrix cache. Keys are tuples with the following form `key = (dset_index, bl_ant_pair_pol_tuple, data_weighting, taper)` Example: `(0, (37, 38, 'xx'), 'bh')` If data_weight == 'dayenu' then additional elements are appended: `key + (filter_extension, spw_Nfreqs, symmetric_taper)` """ for k in d: self._R[k] = d[k]
[docs] def R(self, key): """ Return the data-weighting matrix R, which is a product of data covariance matrix (I or C^-1), diagonal flag matrix (Y) and diagonal tapering matrix (T): R = sqrt(T^t) sqrt(Y^t) K sqrt(Y) sqrt(T) where T is a diagonal matrix holding the taper and Y is a diagonal matrix holding flag weights. The K matrix comes from either `I` or `iC` or a `dayenu` depending on self.data_weighting, T is informed by self.taper and Y is taken from self.Y(). Right now, the data covariance can be identity ('I'), C^-1 ('iC'), or dayenu weighting 'dayenu'. Parameters ---------- key : tuple Tuple containing indices of dataset and baselines. The first item specifies the index (ID) of a dataset in the collection, while subsequent indices specify the baseline index, in _key2inds format. """ # type checks assert isinstance(key, tuple) dset, bl = self.parse_blkey(key) key = (dset,) + (bl,) # Only add to Rkey if a particular mode is enabled # If you do add to this, you need to specify this in self.set_R docstring! Rkey = key + (self.data_weighting,) + (self.taper,) if self.data_weighting == 'dayenu': # add extra dayenu params Rkey = Rkey + tuple(self.filter_extension,) + (self.spw_Nfreqs,) \ + (self.symmetric_taper,) if Rkey not in self._R: # form sqrt(taper) matrix if self.taper == 'none': sqrtT = np.ones(self.spw_Nfreqs).reshape(1, -1) else: sqrtT = np.sqrt(dspec.gen_window(self.taper, self.spw_Nfreqs)).reshape(1, -1) # get flag weight vector: straight multiplication of vectors # mimics matrix multiplication sqrtY = np.sqrt(self.Y(key).diagonal().reshape(1, -1)) # replace possible nans with zero (when something dips negative # in sqrt for some reason) sqrtT[np.isnan(sqrtT)] = 0.0 sqrtY[np.isnan(sqrtY)] = 0.0 fext = self.filter_extension #if we want to use a full-band filter, set the R-matrix to filter and then truncate. tmat = np.zeros((self.spw_Nfreqs, self.spw_Nfreqs+np.sum(fext)),dtype=complex) tmat[:,fext[0]:fext[0] + self.spw_Nfreqs] = np.identity(self.spw_Nfreqs,dtype=complex) # form R matrix if self.data_weighting == 'identity': if self.symmetric_taper: self._R[Rkey] = sqrtT.T * sqrtY.T * self.I(key) * sqrtY * sqrtT else: self._R[Rkey] = sqrtT.T ** 2. * np.dot(tmat, sqrtY.T * self.I(key) * sqrtY) elif self.data_weighting == 'iC': if self.symmetric_taper: self._R[Rkey] = sqrtT.T * sqrtY.T * self.iC(key) * sqrtY * sqrtT else: self._R[Rkey] = sqrtT.T ** 2. * np.dot(tmat, sqrtY.T * self.iC(key) * sqrtY ) elif self.data_weighting == 'dayenu': r_param_key = (self.data_weighting,) + key if not r_param_key in self.r_params: raise ValueError("r_param not set for %s!"%str(r_param_key)) r_params = self.r_params[r_param_key] if not 'filter_centers' in r_params or\ not 'filter_half_widths' in r_params or\ not 'filter_factors' in r_params: raise ValueError("filtering parameters not specified!") #This line retrieves a the psuedo-inverse of a lazy covariance #matrix given by dspec.dayenu_mat_inv. # Note that we multiply sqrtY inside of the pinv #to apply flagging weights before taking psuedo inverse. if self.symmetric_taper: self._R[Rkey] = sqrtT.T * np.linalg.pinv(sqrtY.T * \ dspec.dayenu_mat_inv(x=self.freqs[self.spw_range[0]-fext[0]:self.spw_range[1]+fext[1]], filter_centers=r_params['filter_centers'], filter_half_widths=r_params['filter_half_widths'], filter_factors=r_params['filter_factors']) * sqrtY) * sqrtT else: self._R[Rkey] = sqrtT.T ** 2. * np.dot(tmat, np.linalg.pinv(sqrtY.T * \ dspec.dayenu_mat_inv(x=self.freqs[self.spw_range[0]-fext[0]:self.spw_range[1]+fext[1]], filter_centers=r_params['filter_centers'], filter_half_widths=r_params['filter_half_widths'], filter_factors=r_params['filter_factors']) * sqrtY)) return self._R[Rkey]
[docs] def set_symmetric_taper(self, use_symmetric_taper): """ Set the symmetric taper parameter If true, square R matrix will be computed as R=sqrtT K sqrt T where sqrtT is a diagonal matrix with the square root of the taper. This is only possible when K is a square-matrix (no filter extensions). If set to false, then the R-matrix will implement the taper as R = sqrtT ** 2 K Parameters ---------- use_taper : bool, do you want to use a symmetric taper? True or False? """ if use_symmetric_taper and (self.filter_extension[0] > 0 or self.filter_extension[1] > 0): raise ValueError("You cannot use a symmetric taper when there are nonzero filter extensions.") else: self.symmetric_taper = use_symmetric_taper
[docs] def set_filter_extension(self, filter_extension): """ Set extensions to filtering matrix Parameters ---------- filter_extension: 2-tuple or 2-list must be integers. Specify how many channels below spw_min/max filter will be applied to data. filter_extensions will be clipped to not extend beyond data range. """ if self.symmetric_taper and not filter_extension[0] == 0 and not filter_extension[1]==0: raise_warning("You cannot set filter extensions greater then zero when symmetric_taper==True! Setting symmetric_taper==False!") self.symmetric_taper = False assert isinstance(filter_extension, (list, tuple)), "filter_extension must a tuple or list" assert len(filter_extension) == 2, "filter extension must be length 2" assert isinstance(filter_extension[0], int) and\ isinstance(filter_extension[1], int) and \ filter_extension[0] >= 0 and\ filter_extension[1] >=0, "filter extension must contain only positive integers" filter_extension=list(filter_extension) if filter_extension[0] > self.spw_range[0]: warnings.warn("filter_extension[0] exceeds data spw_range. Defaulting to spw_range[0]!") if filter_extension[1] > self.Nfreqs - self.spw_range[1]: warnings.warn("filter_extension[1] exceeds channels between spw_range[1] and Nfreqs. Defaulting to Nfreqs-spw_range[1]!") filter_extension[0] = np.min([self.spw_range[0], filter_extension[0]])#clip extension to not extend beyond data range filter_extension[1] = np.min([self.Nfreqs - self.spw_range[1], filter_extension[1]])#clip extension to not extend beyond data range self.filter_extension = tuple(filter_extension)
[docs] def set_weighting(self, data_weighting): """ Set data weighting type. Parameters ---------- data_weighting : str Type of data weightings. Options=['identity', 'iC', dayenu] """ self.data_weighting = data_weighting
[docs] def set_r_param(self, key, r_params): """ Set the weighting parameters for baseline at (dset,bl, [pol]) Parameters ---------- key: tuple Key in the format: (dset, bl, [pol]) `dset` is the index of the dataset, `bl` is a 2-tuple, `pol` is a float or string specifying polarization. r_params: dict Dict containing parameters for weighting matrix. Proper fields and formats depend on the mode of data_weighting. For `data_weighting` set to `dayenu`, this is a dictionary with the following fields: `filter_centers`, list of floats (or float) specifying the (delay) channel numbers at which to center filtering windows. Can specify fractional channel number. `filter_half_widths`, list of floats (or float) specifying the width of each filter window in (delay) channel numbers. Can specify fractional channel number. `filter_factors`, list of floats (or float) specifying how much power within each filter window is to be suppressed. Absence of an `r_params` dictionary will result in an error. """ key = self.parse_blkey(key) key = (self.data_weighting,) + key self.r_params[key] = r_params
[docs] def set_taper(self, taper): """ Set data tapering type. Parameters ---------- taper : str Type of data tapering. See uvtools.dspec.gen_window for options. """ self.taper = taper
[docs] def set_spw(self, spw_range, ndlys=None): """ Set the spectral window range. Parameters ---------- spw_range : tuple, contains start and end of spw in channel indices used to slice the frequency array ndlys : integer Number of delay bins. Default: None, sets number of delay bins equal to the number of frequency channels in the spw. """ assert isinstance(spw_range, tuple), \ "spw_range must be fed as a len-2 integer tuple" assert isinstance(spw_range[0], (int, np.integer)), \ "spw_range must be fed as len-2 integer tuple" self.spw_range = spw_range self.spw_Nfreqs = spw_range[1] - spw_range[0] self.set_Ndlys(ndlys=ndlys)
[docs] def set_Ndlys(self, ndlys=None): """ Set the number of delay bins used. Parameters ---------- ndlys : integer Number of delay bins. Default: None, sets number of delay bins equal to the number of frequency channels in the current spw """ if ndlys is None: self.spw_Ndlys = self.spw_Nfreqs else: # Check that one is not trying to estimate more delay channels than there are frequencies if self.spw_Nfreqs < ndlys: raise ValueError("Cannot estimate more delays than there are frequency channels") self.spw_Ndlys = ndlys # Set the lru-cached get_Q_alt function, with a maxsize of Ndlys self._get_qalt_cached = lru_cache(maxsize=int(self.spw_Ndlys))(utils.get_Q_alt)
[docs] def cov_q_hat(self, key1, key2, model='empirical', exact_norm=False, pol=False, time_indices=None): """ Compute the un-normalized covariance matrix for q_hat for a given pair of visibility vectors. Returns the following matrix: Cov(\hat{q}_a,\hat{q}_b) !!!Only supports covariance between same power-spectrum estimates!!! (covariance between pair of baselines with the same pair of baselines) !!!Assumes that both baselines used in power-spectrum estimate !!!have independent noise relizations!!! #updated to be a multi-time wrapper to get_unnormed_V Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. exact_norm : boolean, optional Exact normalization (see HERA memo #44, Eq. 11 and documentation of q_hat for details). pol : str/int/bool, optional Polarization parameter to be used for extracting the correct beam. Used only if exact_norm is True. model : string, optional Type of covariance model to calculate, if not cached. Options=['empirical', 'dsets', 'autos'] How the covariances of the input data should be estimated. In 'dsets' mode, error bars are estimated from user-provided per baseline and per channel standard deivations. If 'empirical' is set, then error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. If 'autos' is set, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas over channel bandwidth and integration time. time_indices: list of indices of times to include or just a single time. default is None -> compute covariance for all times. Returns ------- cov_q_hat: array_like Matrix with covariances between un-normalized band powers (Ntimes, Nfreqs, Nfreqs) """ # type check if time_indices is None: time_indices = [tind for tind in range(self.Ntimes)] elif isinstance(time_indices, (int, np.integer)): time_indices = [time_indices] if not isinstance(time_indices, list): raise ValueError("time_indices must be an integer or list of integers.") if isinstance(key1,list): assert isinstance(key2, list), "key1 is a list, key2 must be a list" assert len(key2) == len(key1), "key1 length must equal key2 length" if isinstance(key2,list): assert isinstance(key1, list), "key2 is a list, key1 must be a list" #check time_indices for tind in time_indices: if not (tind >= 0 and tind <= self.Ntimes): raise ValueError("Invalid time index provided.") if not isinstance(key1,list): key1 = [key1] if not isinstance(key2,list): key2 = [key2] output = np.zeros((len(time_indices), self.spw_Ndlys, self.spw_Ndlys), dtype=complex) for k1, k2 in zip(key1, key2): if model == 'dsets': output+=1./np.asarray([self.get_unnormed_V(k1, k2, model=model, exact_norm=exact_norm, pol=pol, time_index=t)\ for t in time_indices]) elif model == 'empirical': cm = self.get_unnormed_V(k1, k2, model=model, exact_norm=exact_norm, pol=pol) output+=1./np.asarray([cm for m in range(len(time_indices))]) return float(len(key1)) / output
[docs] def q_hat(self, key1, key2, allow_fft=False, exact_norm=False, pol=False): """ If exact_norm is False: Construct an unnormalized bandpower, q_hat, from a given pair of visibility vectors. Returns the following quantity: \hat{q}_a = (1/2) conj(x_1) R_1 Q^alt_a R_2 x_2 Note that the R matrix need not be set to C^-1. This is something that is set by the user in the set_R method. This is related to Equation 13 of arXiv:1502.06016. However, notice that there is a Q^alt_a instead of Q_a. The latter is defined as Q_a \equiv dC/dp_a. Since this is the derivative of the covariance with respect to the power spectrum, it contains factors of the primary beam etc. Q^alt_a strips away all of this, leaving only the barebones job of taking a Fourier transform. See HERA memo #44 for details. This function uses the state of self.data_weighting and self.taper in constructing q_hat. See PSpecData.pspec for details. If exact_norm is True: Takes beam factors into account (Eq. 14 in HERA memo #44) Parameters ---------- key1, key2: tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors for each power spectrum estimate. q_a formed from key1, key2 If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. allow_fft : bool, optional Whether to use a fast FFT summation trick to construct q_hat, or a simpler brute-force matrix multiplication. The FFT method assumes a delta-fn bin in delay space. It also only works if the number of delay bins is equal to the number of frequencies. Default: False. exact_norm: bool, optional If True, beam and spectral window factors are taken in the computation of Q_matrix (dC/dp = Q, and not Q_alt) (HERA memo #44, Eq. 11). Q matrix, for each delay mode, is weighted by the integral of beam over theta,phi. pol: str/int/bool, optional Used only if exact_norm is True. This argument is passed to get_integral_beam to extract the requested beam polarization. Default is the first polarization passed to pspec. Returns ------- q_hat : array_like Unnormalized/normalized bandpowers """ Rx1, Rx2 = 0.0, 0.0 R1, R2 = 0.0, 0.0 # Calculate R x_1 if isinstance(key1, list): for _key in key1: Rx1 += np.dot(self.R(_key), self.x(_key)) R1 += self.R(_key) else: Rx1 = np.dot(self.R(key1), self.x(key1)) R1 = self.R(key1) # Calculate R x_2 if isinstance(key2, list): for _key in key2: Rx2 += np.dot(self.R(_key), self.x(_key)) R2 += self.R(_key) else: Rx2 = np.dot(self.R(key2), self.x(key2)) R2 = self.R(key2) # The set of operations for exact_norm == True are drawn from Equations # 11(a) and 11(b) from HERA memo #44. We are incorporating the # multiplicatives to the exponentials, and sticking to quantities in # their physical units. if exact_norm and allow_fft: #exact_norm approach is meant to enable non-uniform binnning as well, where FFT is not #applicable. As of now, we are using uniform binning. raise NotImplementedError("Exact normalization does not support FFT approach at present") elif exact_norm and not(allow_fft): q = [] del_tau = np.median(np.diff(self.delays()))*1e-9 #Get del_eta in Eq.11(a) (HERA memo #44) (seconds) integral_beam = self.get_integral_beam(pol) #Integral of beam in Eq.11(a) (HERA memo #44) for i in range(self.spw_Ndlys): # Ideally, del_tau and integral_beam should be part of get_Q. We use them here to # avoid their repeated computation for each delay mode. Q = del_tau * self.get_Q_alt(i) * integral_beam QRx2 = np.dot(Q, Rx2) # Square and sum over columns qi = 0.5 * np.einsum('i...,i...->...', Rx1.conj(), QRx2) q.append(qi) q = np.asarray(q) #(Ndlys X Ntime) return q # use FFT if possible and allowed elif allow_fft and (self.spw_Nfreqs == self.spw_Ndlys): _Rx1 = np.fft.fft(Rx1, axis=0) _Rx2 = np.fft.fft(Rx2, axis=0) return 0.5 * np.fft.fftshift(_Rx1, axes=0).conj() \ * np.fft.fftshift(_Rx2, axes=0) else: Q = self.get_Q_alt_tensor() QRx2 = np.dot(Q, Rx2) q = np.einsum('i...,ji...->j...', Rx1.conj(), QRx2) return 0.5 * np.array(q)
[docs] def get_G(self, key1, key2, exact_norm=False, pol=False): """ Calculates G_ab = (1/2) Tr[R_1 Q^alt_a R_2 Q^alt_b], which is needed for normalizing the power spectrum (see HERA memo #44). Note that in the limit that R_1 = R_2 = C^-1, this reduces to the Fisher matrix F_ab = 1/2 Tr [C^-1 Q^alt_a C^-1 Q^alt_b] (arXiv:1502.06016, Eq. 17) Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. exact_norm : boolean, optional Exact normalization (see HERA memo #44, Eq. 11 and documentation of q_hat for details). pol : str/int/bool, optional Polarization parameter to be used for extracting the correct beam. Used only if exact_norm is True. Returns ------- G : array_like, complex Fisher matrix, with dimensions (Nfreqs, Nfreqs). """ if self.spw_Ndlys == None: raise ValueError("Number of delay bins should have been set" "by now! Cannot be equal to None") G = np.zeros((self.spw_Ndlys, self.spw_Ndlys), dtype=complex) R1 = self.R(key1) R2 = self.R(key2) iR1Q1, iR2Q2 = {}, {} if (exact_norm): integral_beam = self.get_integral_beam(pol) del_tau = np.median(np.diff(self.delays()))*1e-9 if exact_norm: qnorm = del_tau * integral_beam else: qnorm = 1. for ch in range(self.spw_Ndlys): #G is given by Tr[E^\alpha C,\beta] #where E^\alpha = R_1^\dagger Q^\apha R_2 #C,\beta = Q2 and Q^\alpha = Q1 #Note that we conjugate transpose R #because we want to E^\alpha to #give the absolute value squared of z = m_\alpha \dot R @ x #where m_alpha takes the FT from frequency to the \alpha fourier mode. #Q is essentially m_\alpha^\dagger m # so we need to sandwhich it between R_1^\dagger and R_2 Q1 = self.get_Q_alt(ch) * qnorm Q2 = self.get_Q_alt(ch, include_extension=True) * qnorm iR1Q1[ch] = np.dot(np.conj(R1).T, Q1) # R_1 Q iR2Q2[ch] = np.dot(R2, Q2) # R_2 Q for i in range(self.spw_Ndlys): for j in range(self.spw_Ndlys): # tr(R_2 Q_i R_1 Q_j) G[i,j] = np.einsum('ab,ba', iR1Q1[i], iR2Q2[j]) # check if all zeros, in which case turn into identity if np.count_nonzero(G) == 0: G = np.eye(self.spw_Ndlys) return G / 2.
[docs] def get_H(self, key1, key2, sampling=False, exact_norm=False, pol=False): """ Calculates the response matrix H of the unnormalized band powers q to the true band powers p, i.e., <q_a> = \sum_b H_{ab} p_b This is given by H_ab = (1/2) Tr[R_1 Q_a^alt R_2 Q_b] (See HERA memo #44). As currently implemented, this approximates the primary beam as frequency independent. The sampling option determines whether one is assuming that the output points are integrals over k bins or samples at specific k values. The effect is to add a dampening of widely separated frequency correlations with the addition of the term sinc(pi \Delta eta (nu_i - nu_j)) Note that numpy uses the engineering definition of sinc, where sinc(x) = sin(pi x) / (pi x), whereas in the line above and in all of our documentation, we use the physicist definition where sinc(x) = sin(x) / x Note that in the limit that R_1 = R_2 = C^-1 and Q_a is used instead of Q_a^alt, this reduces to the Fisher matrix F_ab = 1/2 Tr [C^-1 Q_a C^-1 Q_b] (arXiv:1502.06016, Eq. 17) This function uses the state of self.taper in constructing H. See PSpecData.pspec for details. Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. sampling : boolean, optional Whether to sample the power spectrum or to assume integrated bands over wide delay bins. Default: False exact_norm : boolean, optional Exact normalization (see HERA memo #44, Eq. 11 and documentation of q_hat for details). pol : str/int/bool, optional Polarization parameter to be used for extracting the correct beam. Used only if exact_norm is True. Returns ------- H : array_like, complex Dimensions (Nfreqs, Nfreqs). """ if self.spw_Ndlys == None: raise ValueError("Number of delay bins should have been set" "by now! Cannot be equal to None.") H = np.zeros((self.spw_Ndlys, self.spw_Ndlys), dtype=complex) R1 = self.R(key1) R2 = self.R(key2) if not sampling: nfreq=np.sum(self.filter_extension) + self.spw_Nfreqs sinc_matrix = np.zeros((nfreq, nfreq)) for i in range(nfreq): for j in range(nfreq): sinc_matrix[i,j] = float(i - j) sinc_matrix = np.sinc(sinc_matrix / float(nfreq)) iR1Q1, iR2Q2 = {}, {} if (exact_norm): integral_beam = self.get_integral_beam(pol) del_tau = np.median(np.diff(self.delays()))*1e-9 if exact_norm: qnorm = del_tau * integral_beam else: qnorm = 1. for ch in range(self.spw_Ndlys): Q1 = self.get_Q_alt(ch) * qnorm Q2 = self.get_Q_alt(ch, include_extension=True) * qnorm if not sampling: Q2 *= sinc_matrix #H is given by Tr([E^\alpha C,\beta]) #where E^\alpha = R_1^\dagger Q^\apha R_2 #C,\beta = Q2 and Q^\alpha = Q1 #Note that we conjugate transpose R #because we want to E^\alpha to #give the absolute value squared of z = m_\alpha \dot R @ x #where m_alpha takes the FT from frequency to the \alpha fourier mode. #Q is essentially m_\alpha^\dagger m # so we need to sandwhich it between R_1^\dagger and R_2 iR1Q1[ch] = np.dot(np.conj(R1).T, Q1) # R_1 Q_alt iR2Q2[ch] = np.dot(R2, Q2) # R_2 Q for i in range(self.spw_Ndlys): # this loop goes as nchan^4 for j in range(self.spw_Ndlys): # tr(R_2 Q_i R_1 Q_j) H[i,j] = np.einsum('ab,ba', iR1Q1[i], iR2Q2[j]) # check if all zeros, in which case turn into identity if np.count_nonzero(H) == 0: H = np.eye(self.spw_Ndlys) return H / 2.
[docs] def get_unnormed_E(self, key1, key2, exact_norm=False, pol=False): """ Calculates a series of unnormalized E matrices, such that q_a = x_1^* E^{12,a} x_2 so that E^{12,a} = (1/2) R_1 Q^a R_2. In principle, this could be used to actually estimate q-hat. In other words, we could call this function to get E, and then sandwich it with two data vectors to get q_a. However, this should be slower than the implementation in q_hat. So this should only be used as a helper method for methods such as get_unnormed_V. Note for the future: There may be advantages to doing the matrix multiplications separately Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. exact_norm : boolean, optional Exact normalization (see HERA memo #44, Eq. 11 and documentation of q_hat for details). pol : str/int/bool, optional Polarization parameter to be used for extracting the correct beam. Used only if exact_norm is True. Returns ------- E : array_like, complex Set of E matrices, with dimensions (Ndlys, Nfreqs, Nfreqs). """ if self.spw_Ndlys == None: raise ValueError("Number of delay bins should have been set" "by now! Cannot be equal to None") nfreq = self.spw_Nfreqs + np.sum(self.filter_extension) E_matrices = np.zeros((self.spw_Ndlys, nfreq, nfreq), dtype=complex) R1 = self.R(key1) R2 = self.R(key2) if (exact_norm): integral_beam = self.get_integral_beam(pol) del_tau = np.median(np.diff(self.delays()))*1e-9 for dly_idx in range(self.spw_Ndlys): if exact_norm: QR2 = del_tau * integral_beam * np.dot(self.get_Q_alt(dly_idx), R2) else: QR2 = np.dot(self.get_Q_alt(dly_idx), R2) E_matrices[dly_idx] = np.dot(np.conj(R1).T, QR2) return 0.5 * E_matrices
[docs] def get_unnormed_V(self, key1, key2, model='empirical', exact_norm=False, pol=False, time_index=None): """ Calculates the covariance matrix for unnormed bandpowers (i.e., the q vectors). If the data were real and x_1 = x_2, the expression would be .. math :: V_ab = 2 tr(C E_a C E_b), where E_a = (1/2) R Q^a R When the data are complex, the expression becomes considerably more complicated. Define .. math :: E^{12,a} = (1/2) R_1 Q^a R_2 C^1 = <x1 x1^\dagger> - <x1><x1^\dagger> C^2 = <x2 x2^\dagger> - <x2><x2^\dagger> P^{12} = <x1 x2> - <x1><x2> S^{12} = <x1^* x2^*> - <x1^*> <x2^*> Then .. math :: V_ab = tr(E^{12,a} C^2 E^{21,b} C^1) + tr(E^{12,a} P^{21} E^{12,b *} S^{21}) Note that .. math :: E^{12,a}_{ij}.conj = E^{21,a}_{ji} This function estimates C^1, C^2, P^{12}, and S^{12} empirically by default. (So while the pointy brackets <...> should in principle be ensemble averages, in practice the code performs averages in time.) Empirical covariance estimates are in principle a little risky, as they can potentially induce signal loss. This is probably ok if we are just looking intending to look at V. It is most dangerous when C_emp^-1 is applied to the data. The application of using this to form do a V^-1/2 decorrelation is probably medium risk. But this has yet to be proven, and results coming from V^-1/2 should be interpreted with caution. Note for future: Although the V matrix should be Hermitian by construction, in practice there are precision issues and the Hermiticity is violated at ~ 1 part in 10^15. (Which is ~the expected roundoff error). If something messes up, it may be worth investigating this more. Note for the future: If this ends up too slow, Cholesky tricks can be employed to speed up the computation by a factor of a few. Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. exact_norm : boolean, optional Exact normalization (see HERA memo #44, Eq. 11 and documentation of q_hat for details). pol : str/int/bool, optional Polarization parameter to be used for extracting the correct beam. Used only if exact_norm is True. model : string, optional Type of covariance model to calculate, if not cached. Options=['empirical', 'dsets', 'autos'] How the covariances of the input data should be estimated. In 'dsets' mode, error bars are estimated from user-provided per baseline and per channel standard deivations. If 'empirical' is set, then error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. If 'autos' is set, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas over channel bandwidth and integration time. time_index : int, optional Compute covariance at specific time-step. Default: None. Returns ------- V : array_like, complex Bandpower covariance matrix, with dimensions (Ndlys, Ndlys). """ # Collect all the relevant pieces E_matrices = self.get_unnormed_E(key1, key2, exact_norm=exact_norm, pol=pol) C1 = self.C_model(key1, model=model, time_index=time_index) C2 = self.C_model(key2, model=model, time_index=time_index) P21 = self.cross_covar_model(key2, key1, model=model, conj_1=False, conj_2=False, time_index=time_index) S21 = self.cross_covar_model(key2, key1, model=model, conj_1=True, conj_2=True, time_index=time_index) E21C1 = np.dot(np.transpose(E_matrices.conj(), (0,2,1)), C1) E12C2 = np.dot(E_matrices, C2) auto_term = np.einsum('aij,bji', E12C2, E21C1) E12starS21 = np.dot(E_matrices.conj(), S21) E12P21 = np.dot(E_matrices, P21) cross_term = np.einsum('aij,bji', E12P21, E12starS21) return auto_term + cross_term
[docs] def get_analytic_covariance(self, key1, key2, M=None, exact_norm=False, pol=False, model='empirical', known_cov=None): """ Calculates the auto-covariance matrix for both the real and imaginary parts of bandpowers (i.e., the q vectors and the p vectors). Define: Real part of q_a = (1/2) (q_a + q_a^*) Imaginary part of q_a = (1/2i) (q_a - q_a^\dagger) Real part of p_a = (1/2) (p_a + p_a^\dagger) Imaginary part of p_a = (1/2i) (p_a - p_a^\dagger) .. math :: E^{12,a} = (1/2) R_1 Q^a R_2 C^{12} = <x1 x2^\dagger> - <x1><x2^\dagger> P^{12} = <x1 x2> - <x1><x2> S^{12} = <x1^* x2^*> - <x1^*> <x2^*> p_a = M_{ab} q_b Then: The variance of (1/2) (q_a + q_a^\dagger): .. math :: (1/4){ (<q_a q_a> - <q_a><q_a>) + 2(<q_a q_a^\dagger> - <q_a><q_a^\dagger>) + (<q_a^\dagger q_a^\dagger> - <q_a^\dagger><q_a^\dagger>) } The variance of (1/2i) (q_a - q_a^\dagger): .. math :: (-1/4){ (<q_a q_a> - <q_a><q_a>) - 2(<q_a q_a^\dagger> - <q_a><q_a^\dagger>) + (<q_a^\dagger q_a^\dagger> - <q_a^\dagger><q_a^\dagger>) } The variance of (1/2) (p_a + p_a^\dagger): .. math :: (1/4) { M_{ab} M_{ac} (<q_b q_c> - <q_b><q_c>) + M_{ab} M_{ac}^* (<q_b q_c^\dagger> - <q_b><q_c^\dagger>) + M_{ab}^* M_{ac} (<q_b^\dagger q_c> - <q_b^\dagger><q_c>) + M_{ab}^* M_{ac}^* (<q_b^\dagger q_c^\dagger> - <q_b^\dagger><q_c^\dagger>) } The variance of (1/2i) (p_a - p_a^\dagger): .. math :: (-1/4) { M_{ab} M_{ac} (<q_b q_c> - <q_b><q_c>) - M_{ab} M_{ac}^* (<q_b q_c^\dagger> - <q_b><q_c^\dagger>) - M_{ab}^* M_{ac} (<q_b^\dagger q_c> - <q_b^\dagger><q_c>) + M_{ab}^* M_{ac}^* (<q_b^\dagger q_c^\dagger> - <q_b^\dagger><q_c^\dagger>) } where .. math :: <q_a q_b> - <q_a><q_b> = tr(E^{12,a} C^{21} E^{12,b} C^{21}) + tr(E^{12,a} P^{22} E^{21,b*} S^{11}) <q_a q_b^\dagger> - <q_a><q_b^\dagger> = tr(E^{12,a} C^{22} E^{21,b} C^{11}) + tr(E^{12,a} P^{21} E^{12,b *} S^{21}) <q_a^\dagger q_b^\dagger> - <q_a^\dagger><q_b^\dagger> = tr(E^{21,a} C^{12} E^{21,b} C^{12}) + tr(E^{21,a} P^{11} E^{12,b *} S^{22}) Note that .. math :: E^{12,a}_{ij}.conj = E^{21,a}_{ji} This function estimates C^1, C^2, P^{12}, and S^{12} empirically by default. (So while the pointy brackets <...> should in principle be ensemble averages, in practice the code performs averages in time.) Note: Time-dependent flags that differ from frequency channel-to-channel can create spurious spectral structure. Consider factorizing the flags with self.broadcast_dset_flags() before using model='time_average' Parameters ---------- key1, key2 : tuples or lists of tuples Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. M : array_like Normalization matrix, M. Ntimes x Ndlys x Ndlys exact_norm : boolean If True, beam and spectral window factors are taken in the computation of Q_matrix (dC/dp = Q, and not Q_alt) (HERA memo #44, Eq. 11). Q matrix, for each delay mode, is weighted by the integral of beam over theta,phi. Therefore the output power spectra is, by construction, normalized. If True, it returns normalized power spectrum, except for X2Y term. If False, Q_alt is used (HERA memo #44, Eq. 16), and the power spectrum is normalized separately. pol : str/int/bool, optional Polarization parameter to be used for extracting the correct beam. Used only if exact_norm is True. model : string, optional Type of covariance model to use. if not cached. Options=['empirical', 'dsets', 'autos', 'foreground_dependent', (other model names in known_cov)]. In `dsets` mode, error bars are estimated from user-provided per baseline and per channel standard deivations. In `empirical` mode, error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. In `autos` mode, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas forming the baseline across channel bandwidth and integration time. In `foreground_dependent` mode, it involves using auto-correlation amplitudes to model the input noise covariance and visibility outer products to model the input systematics covariance. When model is chosen as `autos` or `dsets`, only C^{11} and C^{22} are accepted as non-zero values, and the two matrices are also expected to be diagonal, thus only <q_a q_b^\dagger> - <q_a><q_b^\dagger> = tr[ E^{12,a} C^{22} E^{21,b} C^{11} ] exists in the covariance terms of q vectors. When model is chosen as `foreground_dependent`, we further include the signal-noise coupling term besides the noise in the output covariance. Still only <q_a q_b^\dagger> - <q_a><q_b^\dagger> is non-zero, while it takes a form of tr[ E^{12,a} Cn^{22} E^{21,b} Cn^{11} + E^{12,a} Cs^{22} E^{21,b} Cn^{11} + E^{12,a} Cn^{22} E^{21,b} Cs^{11} ], where Cn is just Cautos, the input noise covariance estimated by the auto-correlation amplitudes (by calling C_model(model='autos')), and Cs uses the outer product of input visibilities to model the covariance on systematics. To construct a symmetric and unbiased covariance matrix, we choose Cs^{11}_{ij} = Cs^{22}_{ij} = 1/2 * [ x1_i x2_j^{*} + x2_i x1_j^{*} ], which preserves the property Cs_{ij}^* = Cs_{ji}. known_cov : dicts of covariance matrices Covariance matrices that are not calculated internally from data. Returns ------- V : array_like, complex Bandpower covariance, with dimension (Ntimes, spw_Ndlys, spw_Ndlys). """ # Collect all the relevant pieces if M.ndim == 2: M = np.asarray([M for time in range(self.Ntimes)]) # M has a shape of (Ntimes, spw_Ndlys,spw_Ndlys) E_matrices = self.get_unnormed_E(key1, key2, exact_norm=exact_norm, pol=pol) # E_matrices has a shape of (spw_Ndlys, spw_Nfreqs, spw_Nfreqs) # using numpy.einsum_path to speed up the array products with numpy.einsum einstein_path_0 = np.einsum_path('bij, cji->bc', E_matrices, E_matrices, optimize='optimal')[0] einstein_path_1 = np.einsum_path('bi, ci,i->bc', E_matrices[:,:,0], E_matrices[:,:,0],E_matrices[0,:,0], optimize='optimal')[0] einstein_path_2 = np.einsum_path('ab,cd,bd->ac', M[0], M[0], M[0], optimize='optimal')[0] # check if the covariance matrix is uniform along the time axis. If so, we just calculate the result for one timestamp and duplicate its copies # along the time axis. check_uniform_input = False if model != 'foreground_dependent': # When model is 'foreground_dependent', since we are processing the outer products of visibilities from different times, # we are expected to have time-dependent inputs, thus check_uniform_input is always set to be False here. C11_first = self.C_model(key1, model=model, known_cov=known_cov, time_index=0) C11_last = self.C_model(key1, model=model, known_cov=known_cov, time_index=self.dsets[0].Ntimes-1) if np.isclose(C11_first, C11_last).all(): check_uniform_input = True cov_q_real, cov_q_imag, cov_p_real, cov_p_imag = [], [], [], [] for time_index in range(self.dsets[0].Ntimes): if model in ['dsets','autos']: # calculate <q_a q_b^\dagger> - <q_a><q_b^\dagger> = tr[ E^{12,a} C^{22} E^{21,b} C^{11} ] # We have used tr[A D_1 B D_2] = \sum_{ijkm} A_{ij} d_{1j} \delta_{jk} B_{km} d_{2m} \delta_{mi} = \sum_{ik} [A_{ik}*d_{1k}] * [B_{ki}*d_{2i}] # to simplify the computation. C11 = self.C_model(key1, model=model, known_cov=known_cov, time_index=time_index) C22 = self.C_model(key2, model=model, known_cov=known_cov, time_index=time_index) E21C11 = np.multiply(np.transpose(E_matrices.conj(), (0,2,1)), np.diag(C11)) E12C22 = np.multiply(E_matrices, np.diag(C22)) # Get q_q, q_qdagger, qdagger_qdagger q_q, qdagger_qdagger = 0.+1.j*0, 0.+1.j*0 q_qdagger = np.einsum('bij, cji->bc', E12C22, E21C11, optimize=einstein_path_0) elif model == 'foreground_dependent': # calculate tr[ E^{12,b} Cautos^{22} E^{21,c} Cautos^{11} + # E^{12,b} Cs E^{21,c} Cautos^{11} + # E^{12,b} Cautos^{22} E^{21,c} Cs ], # and we take Cs_{ij} = 1/2 * [ x1_i x2_j^{*} + x2_i x1_j^{*} ]. # For terms like E^{12,b} Cs E^{21,c} Cautos^{11}, # we have used tr[A u u*^t B D_2] = \sum_{ijkm} A_{ij} u_j u*_k B_{km} D_{2mi} \\ # = \sum_{i} [ \sum_j A_{ij} u_j ] * [\sum_k u*_k B_{ki} ] * d_{2i} # to simplify the computation. C11_autos = self.C_model(key1, model='autos', known_cov=known_cov, time_index=time_index) C22_autos = self.C_model(key2, model='autos', known_cov=known_cov, time_index=time_index) E21C11_autos = np.multiply(np.transpose(E_matrices.conj(), (0,2,1)), np.diag(C11_autos)) E12C22_autos = np.multiply(E_matrices, np.diag(C22_autos)) # Get q_q, q_qdagger, qdagger_qdagger q_q, qdagger_qdagger = 0.+1.j*0, 0.+1.j*0 q_qdagger = np.einsum('bij, cji->bc', E12C22_autos, E21C11_autos, optimize=einstein_path_0) x1 = self.w(key1)[:,time_index] * self.x(key1)[:,time_index] x2 = self.w(key2)[:,time_index] * self.x(key2)[:,time_index] E12_x1 = np.dot(E_matrices, x1) E12_x2 = np.dot(E_matrices, x2) x2star_E21 = E12_x2.conj() x1star_E21 = E12_x1.conj() x1star_E12 = np.dot(np.transpose(E_matrices,(0,2,1)), x1.conj()) x2star_E12 = np.dot(np.transpose(E_matrices,(0,2,1)), x2.conj()) E21_x1 = x1star_E12.conj() E21_x2 = x2star_E12.conj() SN_cov = np.einsum('bi,ci,i->bc', E12_x1, x2star_E21, np.diag(C11_autos), optimize=einstein_path_1)/2. + np.einsum('bi,ci,i->bc', E12_x2, x1star_E21, np.diag(C11_autos), optimize=einstein_path_1)/2.\ + np.einsum('bi,ci,i->bc', x2star_E12, E21_x1, np.diag(C22_autos), optimize=einstein_path_1)/2. + np.einsum('bi,ci,i->bc', x1star_E12, E21_x2, np.diag(C22_autos), optimize=einstein_path_1)/2. # Apply zero clipping on the columns and rows containing negative diagonal elements SN_cov[np.real(np.diag(SN_cov))<=0., :] = 0. + 1.j*0 SN_cov[:, np.real(np.diag(SN_cov))<=0.,] = 0. + 1.j*0 q_qdagger += SN_cov else: # for general case (which is the slowest without simplification) C11 = self.C_model(key1, model=model, known_cov=known_cov, time_index=time_index) C22 = self.C_model(key2, model=model, known_cov=known_cov, time_index=time_index) C21 = self.cross_covar_model(key2, key1, model=model, conj_1=False, conj_2=True, known_cov=known_cov, time_index=time_index) C12 = self.cross_covar_model(key1, key2, model=model, conj_1=False, conj_2=True, known_cov=known_cov, time_index=time_index) P11 = self.cross_covar_model(key1, key1, model=model, conj_1=False, conj_2=False, known_cov=known_cov, time_index=time_index) S11 = self.cross_covar_model(key1, key1, model=model, conj_1=True, conj_2=True, known_cov=known_cov, time_index=time_index) P22 = self.cross_covar_model(key2, key2, model=model, conj_1=False, conj_2=False, known_cov=known_cov, time_index=time_index) S22 = self.cross_covar_model(key2, key2, model=model, conj_1=True, conj_2=True, known_cov=known_cov, time_index=time_index) P21 = self.cross_covar_model(key2, key1, model=model, conj_1=False, conj_2=False, known_cov=known_cov, time_index=time_index) S21 = self.cross_covar_model(key2, key1, model=model, conj_1=True, conj_2=True, known_cov=known_cov, time_index=time_index) # Get q_q, q_qdagger, qdagger_qdagger if np.isclose(P22, 0).all() or np.isclose(S11,0).all(): q_q = 0.+1.j*0 else: E12P22 = np.matmul(E_matrices, P22) E21starS11 = np.matmul(np.transpose(E_matrices, (0,2,1)), S11) q_q = np.einsum('bij, cji->bc', E12P22, E21starS11, optimize=einstein_path_0) if np.isclose(C21, 0).all(): q_q += 0.+1.j*0 else: E12C21 = np.matmul(E_matrices, C21) q_q += np.einsum('bij, cji->bc', E12C21, E12C21, optimize=einstein_path_0) E21C11 = np.matmul(np.transpose(E_matrices.conj(), (0,2,1)), C11) E12C22 = np.matmul(E_matrices, C22) q_qdagger = np.einsum('bij, cji->bc', E12C22, E21C11, optimize=einstein_path_0) if np.isclose(P21, 0).all() or np.isclose(S21,0).all(): q_qdagger += 0.+1.j*0 else: E12P21 = np.matmul(E_matrices, P21) E12starS21 = np.matmul(E_matrices.conj(), S21) q_qdagger += np.einsum('bij, cji->bc', E12P21, E12starS21, optimize=einstein_path_0) if np.isclose(C12, 0).all(): qdagger_qdagger = 0.+1.j*0 else: E21C12 = np.matmul(np.transpose(E_matrices.conj(), (0,2,1)), C12) qdagger_qdagger = np.einsum('bij, cji->bc', E21C12, E21C12, optimize=einstein_path_0) if np.isclose(P11, 0).all() or np.isclose(S22,0).all(): qdagger_qdagger += 0.+1.j*0 else: E21P11 = np.matmul(np.transpose(E_matrices.conj(), (0,2,1)), P11) E12starS22 = np.matmul(E_matrices.conj(), S22) qdagger_qdagger += np.einsum('bij, cji->bc', E21P11, E12starS22, optimize=einstein_path_0) cov_q_real_temp = (q_q + qdagger_qdagger + q_qdagger + q_qdagger.conj() ) / 4. cov_q_imag_temp = -(q_q + qdagger_qdagger - q_qdagger - q_qdagger.conj() ) / 4. m = M[time_index] # calculate \sum_{bd} [ M_{ab} M_{cd} (<q_b q_d> - <q_b><q_d>) ] if np.isclose([q_q], 0).all(): MMq_q = np.zeros((E_matrices.shape[0],E_matrices.shape[0])).astype(np.complex128) else: assert np.shape(q_q) == np.shape(m), "covariance matrix and normalization matrix has different shapes." MMq_q = np.einsum('ab,cd,bd->ac', m, m, q_q, optimize=einstein_path_2) # calculate \sum_{bd} [ M_{ab} M_{cd}^* (<q_b q_d^\dagger> - <q_b><q_d^\dagger>) ] # and \sum_{bd} [ M_{ab}^* M_{cd} (<q_b^\dagger q_d> - <q_b^\dagger><q_d>) ] if np.isclose([q_qdagger], 0).all(): MM_q_qdagger = 0.+1.j*0 M_Mq_qdagger_ = 0.+1.j*0 else: assert np.shape(q_qdagger) == np.shape(m), "covariance matrix and normalization matrix has different shapes." MM_q_qdagger = np.einsum('ab,cd,bd->ac', m, m.conj(), q_qdagger, optimize=einstein_path_2) M_Mq_qdagger_ = np.einsum('ab,cd,bd->ac', m.conj(), m, q_qdagger.conj(), optimize=einstein_path_2) # calculate \sum_{bd} [ M_{ab}^* M_{cd}^* (<q_b^\dagger q_d^\dagger> - <q_b^\dagger><q_d^\dagger>) ] if np.isclose([qdagger_qdagger], 0).all(): M_M_qdagger_qdagger = 0.+1.j*0 else: assert np.shape(qdagger_qdagger) == np.shape(m), "covariance matrix and normalization matrix has different shapes." M_M_qdagger_qdagger = np.einsum('ab,cd,bd->ac', m.conj(), m.conj(), qdagger_qdagger, optimize=einstein_path_2) cov_p_real_temp = ( MMq_q + MM_q_qdagger + M_Mq_qdagger_ + M_M_qdagger_qdagger)/ 4. cov_p_imag_temp = -( MMq_q - MM_q_qdagger - M_Mq_qdagger_ + M_M_qdagger_qdagger)/ 4. # cov_p_real_temp has a shaoe of (spw_Ndlys, spw_Ndlys) if check_uniform_input: # if the covariance matrix is uniform along the time axis, we just calculate the result for one timestamp and duplicate its copies # along the time axis. cov_q_real.extend([cov_q_real_temp]*self.dsets[0].Ntimes) cov_q_imag.extend([cov_q_imag_temp]*self.dsets[0].Ntimes) cov_p_real.extend([cov_p_real_temp]*self.dsets[0].Ntimes) cov_p_imag.extend([cov_p_imag_temp]*self.dsets[0].Ntimes) warnings.warn("Producing time-uniform covariance matrices between bandpowers.") break else: cov_q_real.append(cov_q_real_temp) cov_q_imag.append(cov_q_imag_temp) cov_p_real.append(cov_p_real_temp) cov_p_imag.append(cov_p_imag_temp) cov_q_real = np.asarray(cov_q_real) cov_q_imag = np.asarray(cov_q_imag) cov_p_real = np.asarray(cov_p_real) cov_p_imag = np.asarray(cov_p_imag) # (Ntimes, spw_Ndlys, spw_Ndlys) return cov_q_real, cov_q_imag, cov_p_real, cov_p_imag
[docs] def get_MW(self, G, H, mode='I', band_covar=None, exact_norm=False, rcond=1e-15): """ Construct the normalization matrix M and window function matrix W for the power spectrum estimator. These are defined through Eqs. 14-16 of arXiv:1502.06016: \hat{p} = M \hat{q} <\hat{p}> = W p W = M H, where p is the true band power and H is the response matrix (defined above in get_H) of unnormalized bandpowers to normed bandpowers. Several choices for M are supported: 'I': Set M to be diagonal (e.g. HERA Memo #44) 'H^-1': Set M = H^-1, the (pseudo)inverse response matrix. 'V^-1/2': Set M = V^-1/2, the root-inverse response matrix (using SVD). These choices will be supported very soon: 'L^-1': Set M = L^-1, Cholesky decomposition. As written, the window functions will not be correclty normalized; it needs to be adjusted by the pspec scalar for it to be approximately correctly normalized. If the beam is being provided, this will be done in the pspec function. Parameters ---------- G : array_like Denominator matrix for the bandpowers, with dimensions (Nfreqs, Nfreqs). H : array_like Response matrix for the bandpowers, with dimensions (Nfreqs, Nfreqs). mode : str, optional Definition to use for M. Must be one of the options listed above. Default: 'I'. band_covar : array_like, optional Covariance matrix of the unnormalized bandpowers (i.e., q). Used only if requesting the V^-1/2 normalization. Use get_unnormed_V to get the covariance to put in here, or provide your own array. Default: None exact_norm : boolean, optional Exact normalization (see HERA memo #44, Eq. 11 and documentation of q_hat for details). Currently, this is supported only for mode I rcond : float, optional rcond parameter of np.linalg.pinv for truncating near-zero eigenvalues Returns ------- M : array_like Normalization matrix, M. (If G was passed in as a dict, a dict of array_like will be returned.) W : array_like Window function matrix, W. (If G was passed in as a dict, a dict of array_like will be returned.) """ ### Next few lines commented out because (while elegant), the situation ### here where G is a distionary is not supported by the rest of the code. # Recursive case, if many G's were specified at once # if type(G) is dict: # M,W = {}, {} # for key in G: M[key], W[key] = self.get_MW(G[key], mode=mode) # return M, W # Check that mode is supported modes = ['H^-1', 'V^-1/2', 'I', 'L^-1'] assert (mode in modes) if mode != 'I' and exact_norm is True: raise NotImplementedError("Exact norm is not supported for non-I modes") # Build M matrix according to specified mode if mode == 'H^-1': try: M = np.linalg.inv(H) except np.linalg.LinAlgError as err: if 'Singular matrix' in str(err): M = np.linalg.pinv(H, rcond=rcond) raise_warning("Warning: Window function matrix is singular " "and cannot be inverted, so using " " pseudoinverse instead.") else: raise np.linalg.LinAlgError("Linear algebra error with H matrix " "during MW computation.") W = np.dot(M, H) W_norm = np.sum(W, axis=1) W = (W.T / W_norm).T elif mode == 'V^-1/2': if np.sum(band_covar) == None: raise ValueError("Covariance not supplied for V^-1/2 normalization") # First find the eigenvectors and eigenvalues of the unnormalizd covariance # Then use it to compute V^-1/2 eigvals, eigvects = np.linalg.eigh(band_covar) nonpos_eigvals = eigvals <= 1e-20 if (nonpos_eigvals).any(): raise_warning("At least one non-positive eigenvalue for the " "unnormed bandpower covariance matrix.") # truncate them eigvals = eigvals[~nonpos_eigvals] eigvects = eigvects[:, ~nonpos_eigvals] V_minus_half = np.dot(eigvects, np.dot(np.diag(1./np.sqrt(eigvals)), eigvects.T)) W_norm = np.diag(1. / np.sum(np.dot(V_minus_half, H), axis=1)) M = np.dot(W_norm, V_minus_half) W = np.dot(M, H) elif mode == 'I': # This is not the M matrix as is rigorously defined in the # OQE formalism, because the power spectrum scalar is excluded # in this matrix normalization (i.e., M doesn't do the full # normalization) M = np.diag(1. / np.sum(G, axis=1)) W_norm = np.diag(1. / np.sum(H, axis=1)) W = np.dot(W_norm, H) else: raise NotImplementedError("Cholesky decomposition mode not currently supported.") # # Cholesky decomposition # order = np.arange(G.shape[0]) - np.ceil((G.shape[0]-1.)/2.) # order[order < 0] = order[order < 0] - 0.1 # # Negative integers have larger absolute value so they are sorted # # after positive integers. # order = (np.abs(order)).argsort() # if np.mod(G.shape[0], 2) == 1: # endindex = -2 # else: # endindex = -1 # order = np.hstack([order[:5], order[endindex:], order[5:endindex]]) # iorder = np.argsort(order) # G_o = np.take(np.take(G, order, axis=0), order, axis=1) # L_o = np.linalg.cholesky(G_o) # U,S,V = np.linalg.svd(L_o.conj()) # M_o = np.dot(np.transpose(V), np.dot(np.diag(1./S), np.transpose(U))) # M = np.take(np.take(M_o, iorder, axis=0), iorder, axis=1) return M, W
[docs] def get_Q_alt(self, mode: int, allow_fft=True, include_extension=False): """ Response of the covariance to a given bandpower, dC / dp_alpha, EXCEPT without the primary beam factors. This is Q_alt as defined in HERA memo #44, so it's not dC / dp_alpha, strictly, but is just the part that does the Fourier transforms. Assumes that Q will operate on a visibility vector in frequency space. In the limit that self.spw_Ndlys equals self.spw_Nfreqs, this will produce a matrix Q that performs a two-sided FFT and extracts a particular Fourier mode. (Computing x^t Q y is equivalent to Fourier transforming x and y separately, extracting one element of the Fourier transformed vectors, and then multiplying them.) When self.spw_Ndlys < self.spw_Nfreqs, the effect is similar except the delay bins need not be in the locations usually mandated by the FFT algorithm. Parameters ---------- mode : int Central wavenumber (index) of the bandpower, p_alpha. allow_fft : boolean, optional If set to True, allows a shortcut FFT method when the number of delay bins equals the number of delay channels. Default: True include_extension: If True, return a matrix that is spw_Nfreq x spw_Nfreq (required if using \partial C_{ij} / \partial p_\alpha since C_{ij} is (spw_Nfreq x spw_Nfreq). Return ------- Q : array_like Response matrix for bandpower p_alpha. """ if self.spw_Ndlys is None: self.set_Ndlys() return self._get_qalt_cached( mode=mode, n_delays=self.spw_Ndlys, n_freqs=self.spw_Nfreqs, allow_fft=allow_fft, n_extend=np.sum(self.filter_extension) if include_extension else 0.0, phase_correction=self.filter_extension[0] if include_extension else 0.0 )
[docs] def get_Q_alt_tensor(self, allow_fft=True, include_extension=False): """ Gets the (Ndly, Nfreq, Nfreq) tensor of Q_alt matrices. Parameters ---------- allow_fft : boolean, optional If set to True, allows a shortcut FFT method when the number of delay bins equals the number of delay channels. Default: True include_extension : boolean, optional If True, return a matrix that is spw_Nfreq x spw_Nfreq (required if using \partial C_{ij} / \partial p_\alpha since C_{ij} is (spw_Nfreq x spw_Nfreq). Return ------- Q : array_like Response matrix for bandpower p_alpha. """ if self.spw_Ndlys is None: self.set_Ndlys() return self._get_qalt_cached_tensor( n_delays=self.spw_Ndlys, n_freqs=self.spw_Nfreqs, allow_fft=allow_fft, n_extend=np.sum(self.filter_extension) if include_extension else 0.0, phase_correction=self.filter_extension[0] if include_extension else 0.0 )
[docs] def get_integral_beam(self, pol=False): """ Computes the integral containing the spectral beam and tapering function in Q_alpha(i,j). Parameters ---------- pol : str/int/bool, optional Which beam polarization to use. If the specified polarization doesn't exist, a uniform isotropic beam (with integral 4pi for all frequencies) is assumed. Default: False (uniform beam). Return ------- integral_beam : array_like integral containing the spectral beam and tapering. """ nu = self.freqs[self.spw_range[0]:self.spw_range[1]] # in Hz try: # Get beam response in (frequency, pixel), beam area(freq) and # Nside, used in computing dtheta beam_res, beam_omega, N = \ self.primary_beam.beam_normalized_response(pol, nu) prod = 1. / beam_omega beam_prod = beam_res * prod[:, np.newaxis] # beam_prod has omega subsumed, but taper is still part of R matrix # The nside term is dtheta^2, where dtheta is the resolution in # healpix map integral_beam = np.pi/(3.*N*N) * np.dot(beam_prod, beam_prod.T) except(AttributeError): warnings.warn("The beam response could not be calculated. " "PS will not be normalized!") integral_beam = np.ones((len(nu), len(nu))) return integral_beam
[docs] def get_Q(self, mode): """ Computes Q_alt(i,j), which is the exponential part of the response of the data covariance to the bandpower (dC/dP_alpha). Note: This function is not being used right now, since get_q_alt and get_Q are essentially the same functions. However, since we want to attempt non-uniform bins, we do intend to use get_Q (which uses physical units, and hence there is not contraint of uniformly spaced data). Parameters ---------- mode : int Central wavenumber (index) of the bandpower, p_alpha. Return ------- Q_alt : array_like Exponential part of Q (HERA memo #44, Eq. 11). """ if self.spw_Ndlys == None: self.set_Ndlys() if mode >= self.spw_Ndlys: raise IndexError("Cannot compute Q matrix for a mode outside" "of allowed range of delay modes.") tau = self.delays()[int(mode)] * 1.0e-9 # delay in seconds nu = self.freqs[self.spw_range[0]:self.spw_range[1]] # in Hz eta_int = np.exp(-2j * np.pi * tau * nu) # exponential part Q_alt = np.einsum('i,j', eta_int.conj(), eta_int) # dot with conjugate return Q_alt
[docs] def p_hat(self, M, q): """ Optimal estimate of bandpower p_alpha, defined as p_hat = M q_hat. Parameters ---------- M : array_like Normalization matrix, M. q : array_like Unnormalized bandpowers, \hat{q}. Returns ------- p_hat : array_like Optimal estimate of bandpower, \hat{p}. """ return np.dot(M, q)
[docs] def cov_p_hat(self, M, q_cov): """ Covariance estimate between two different band powers p_alpha and p_beta given by M_{alpha i} M^*_{beta,j} C_q^{ij} where C_q^{ij} is the q-covariance. Parameters ---------- M : array_like Normalization matrix, M. q_cov : array_like covariance between bandpowers in q_alpha and q_beta """ p_cov = np.zeros_like(q_cov) for tnum in range(len(p_cov)): p_cov[tnum] = np.einsum('ab,cd,bd->ac', M, M, q_cov[tnum]) return p_cov
[docs] def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2, unflag=False): """ For each dataset in self.dset, update the flag_array such that the flagging patterns are time-independent for each baseline given a selection for spectral windows. For each frequency pixel in a selected spw, if the fraction of flagged times exceeds time_thresh, then all times are flagged. If it does not, the specific integrations which hold flags in the spw are flagged across all frequencies in the spw. Additionally, one can also unflag the flag_array entirely if desired. Note: although technically allowed, this function may give unexpected results if multiple spectral windows in spw_ranges have frequency overlap. Note: it is generally not recommended to set time_thresh > 0.5, which could lead to substantial amounts of data being flagged. Parameters ---------- spw_ranges : list of tuples list of len-2 spectral window tuples, specifying the start (inclusive) and stop (exclusive) index of the frequency array for each spw. Default is to use the whole band. time_thresh : float Fractional threshold of flagged pixels across time needed to flag all times per freq channel. It is not recommend to set this greater than 0.5. unflag : bool If True, unflag all data in the spectral window. """ # validate datasets self.validate_datasets() # clear matrix cache (which may be holding weight matrices Y) self.clear_cache() # spw type check if spw_ranges is None: spw_ranges = [(0, self.Nfreqs)] assert isinstance(spw_ranges, list), \ "spw_ranges must be fed as a list of tuples" # iterate over datasets for dset in self.dsets: # iterate over spw ranges for spw in spw_ranges: self.set_spw(spw) # unflag if unflag: # unflag for all times dset.flag_array[:, self.spw_range[0]:self.spw_range[1], :] = False continue # enact time threshold on flag waterfalls # iterate over polarizations for i in range(dset.Npols): # iterate over unique baselines ubl = np.unique(dset.baseline_array) for bl in ubl: # get baseline-times indices bl_inds = np.where(np.in1d(dset.baseline_array, bl))[0] # get flag waterfall flags = dset.flag_array[bl_inds, :, i].copy() Ntimes = float(flags.shape[0]) Nfreqs = float(flags.shape[1]) # get time- and freq-continguous flags freq_contig_flgs = np.sum(flags, axis=1) / Nfreqs > 0.999999 Ntimes_noncontig = np.sum(~freq_contig_flgs, dtype=float) # get freq channels where non-contiguous flags exceed threshold exceeds_thresh = np.sum(flags[~freq_contig_flgs], axis=0, dtype=float) / Ntimes_noncontig > time_thresh # flag channels for all times that exceed time_thresh dset.flag_array[bl_inds, np.where(exceeds_thresh)[0][:, None], i] = True # for pixels that have flags but didn't meet broadcasting limit # flag the integration within the spw flags[:, np.where(exceeds_thresh)[0]] = False flag_ints = np.max(flags[:, self.spw_range[0]:self.spw_range[1]], axis=1) dset.flag_array[bl_inds[flag_ints], self.spw_range[0]:self.spw_range[1], i] = True
[docs] def units(self, little_h=True): """ Return the units of the power spectrum. These are inferred from the units reported by the input visibilities (UVData objects). Parameters ---------- little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc Returns ------- pspec_units : str Units of the power spectrum that is returned by pspec(). """ # Work out the power spectrum units if len(self.dsets) == 0: raise IndexError("No datasets have been added yet; cannot " "calculate power spectrum units.") # get visibility units vis_units = self.dsets[0].vis_units # set pspec norm units if self.primary_beam is None: norm_units = "Hz str [beam normalization not specified]" else: if little_h: h_unit = "h^-3 " else: h_unit = "" norm_units = "{}Mpc^3".format(h_unit) return vis_units, norm_units
[docs] def delays(self): """ Return an array of delays, tau, corresponding to the bins of the delay power spectrum output by pspec() using self.spw_range to specify the spectral window. Returns ------- delays : array_like Delays, tau. Units: ns. """ # Calculate the delays if len(self.dsets) == 0: raise IndexError("No datasets have been added yet; cannot " "calculate delays.") else: return utils.get_delays(self.freqs[self.spw_range[0]:self.spw_range[1]], n_dlys=self.spw_Ndlys) * 1e9 # convert to ns
[docs] def scalar(self, polpair, little_h=True, num_steps=2000, beam=None, taper_override='no_override', exact_norm=False): """ Computes the scalar function to convert a power spectrum estimate in "telescope units" to cosmological units, using self.spw_range to set spectral window. See arxiv:1304.4991 and HERA memo #27 for details. This function uses the state of self.taper in constructing scalar. See PSpecData.pspec for details. Parameters ---------- polpair: tuple, int, or str Which pair of polarizations to compute the beam scalar for, e.g. ('pI', 'pI') or ('XX', 'YY'). If string, will assume that the specified polarization is to be cross-correlated with itself, e.g. 'XX' implies ('XX', 'XX'). little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc num_steps : int, optional Number of steps to use when interpolating primary beams for numerical integral Default: 10000 beam : PSpecBeam object Option to use a manually-fed PSpecBeam object instead of using self.primary_beam. taper_override : str, optional Option to override the taper chosen in self.taper (does not overwrite self.taper; just applies to this function). Default: no_override exact_norm : boolean, optional If True, scalar would just be the X2Y term, as the beam and spectral terms are taken into account while constructing power spectrum. Returns ------- scalar: float [\int dnu (\Omega_PP / \Omega_P^2) ( B_PP / B_P^2 ) / (X^2 Y)]^-1 in h^-3 Mpc^3 or Mpc^3. """ # make sure polarizations are the same if isinstance(polpair, (int, np.integer)): polpair = uvputils.polpair_int2tuple(polpair) if isinstance(polpair, str): polpair = (polpair, polpair) if polpair[0] != polpair[1]: raise NotImplementedError( "Polarizations don't match. Beam scalar can only be " "calculated for auto-polarization pairs at the moment.") pol = polpair[0] # set spw_range and get freqs freqs = self.freqs[self.spw_range[0]:self.spw_range[1]] start = freqs[0] end = freqs[0] + np.median(np.diff(freqs)) * len(freqs) # Override the taper if desired if taper_override == 'no_override': taper = self.taper else: taper = taper_override # calculate scalar if beam is None: scalar = self.primary_beam.compute_pspec_scalar( start, end, len(freqs), pol=pol, taper=self.taper, little_h=little_h, num_steps=num_steps, exact_norm=exact_norm) else: scalar = beam.compute_pspec_scalar(start, end, len(freqs), pol=pol, taper=self.taper, little_h=little_h, num_steps=num_steps, exact_norm=exact_norm) return scalar
[docs] def scalar_delay_adjustment(self, key1=None, key2=None, sampling=False, Gv=None, Hv=None): """ Computes an adjustment factor for the pspec scalar. There are two reasons why this might be needed: 1) When the number of delay bins is not equal to the number of frequency channels. This adjustment is necessary because \sum_gamma tr[Q^alt_alpha Q^alt_gamma] = N_freq**2 is something that is true only when N_freqs = N_dlys. If the data weighting is equal to "identity", then the result is independent of alpha, but is no longer given by N_freq**2. (Nor is it just N_dlys**2!) If the data weighting is not equal to "identity" then we generally need a separate scalar adjustment for each alpha. 2) Even when the number of delay bins is equal to the number of frequency channels, there is an extra adjustment necessary to account for tapering functions. The reason for this is that our current code accounts for the tapering function in the normalization matrix M *and* accounts for it again in the pspec scalar. The adjustment provided by this function essentially cancels out one of these extra copies. This function uses the state of self.taper in constructing adjustment. See PSpecData.pspec for details. Parameters ---------- key1, key2 : tuples or lists of tuples, optional Tuples containing indices of dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. If Gv and Hv are specified, these arguments will be ignored. Default: None. sampling : boolean, optional Whether to sample the power spectrum or to assume integrated bands over wide delay bins. Default: False Gv, Hv : array_like, optional If specified, use these arrays instead of calling self.get_G() and self.get_H(). Using precomputed Gv and Hv will speed up this function significantly. Default: None. Returns ------- adjustment : float if the data_weighting is 'identity' 1d array of floats with length spw_Ndlys otherwise. """ if Gv is None: Gv = self.get_G(key1, key2) if Hv is None: Hv = self.get_H(key1, key2, sampling) # get ratio summed_G = np.sum(Gv, axis=1) summed_H = np.sum(Hv, axis=1) ratio = summed_H.real / summed_G.real # fill infs and nans from zeros in summed_G ratio[np.isnan(ratio)] = 1.0 ratio[np.isinf(ratio)] = 1.0 ## XXX: Adjustments like this are hacky and wouldn't be necessary ## if we deprecate the incorrectly normalized ## Q and M matrix definitions. #In the future, we need to do our normalizations properly and #stop introducing arbitrary normalization factors. #if the input identity weighting is diagonal, then the #adjustment factor is independent of alpha. # get mean ratio. if self.data_weighting == 'identity': mean_ratio = np.mean(ratio) scatter = np.abs(ratio - mean_ratio) if (scatter > 10**-4 * mean_ratio).any(): raise ValueError("The normalization scalar is band-dependent!") adjustment = self.spw_Ndlys / (self.spw_Nfreqs * mean_ratio) #otherwise, the adjustment factor is dependent on alpha. else: adjustment = self.spw_Ndlys / (self.spw_Nfreqs * ratio) if self.taper != 'none': tapering_fct = dspec.gen_window(self.taper, self.spw_Nfreqs) adjustment *= np.mean(tapering_fct**2) return adjustment
[docs] def validate_pol(self, dsets, pol_pair): """ Validate polarization and returns the index of the datasets so that the polarization pair is consistent with the UVData objects. Parameters ---------- dsets : length-2 list or length-2 tuple of integers or str Contains indices of self.dsets to use in forming power spectra, where the first index is for the Left-Hand dataset and second index is used for the Right-Hand dataset (see above). pol_pair : length-2 tuple of integers or strings Contains polarization pair which will be used in estiamting the power spectrum e,g (-5, -5) or ('xy', 'xy'). Only auto-polarization pairs are implemented for the time being. Returns ------- valid : boolean True if the UVData objects polarizations are consistent with the pol_pair (user specified polarizations) else False. """ err_msg = "polarization must be fed as len-2 tuple of strings or ints" assert isinstance(pol_pair, tuple), err_msg # take x_orientation from first dset x_orientation = self.dsets[0].x_orientation # convert elements to integers if fed as strings if isinstance(pol_pair[0], str): pol_pair = (uvutils.polstr2num(pol_pair[0], x_orientation=x_orientation), pol_pair[1]) if isinstance(pol_pair[1], str): pol_pair = (pol_pair[0], uvutils.polstr2num(pol_pair[1], x_orientation=x_orientation)) assert isinstance(pol_pair[0], (int, np.integer)), err_msg assert isinstance(pol_pair[1], (int, np.integer)), err_msg #if pol_pair[0] != pol_pair[1]: # raise NotImplementedError("Only auto/equal polarizations are implement at the moment.") dset_ind1 = self.dset_idx(dsets[0]) dset_ind2 = self.dset_idx(dsets[1]) dset1 = self.dsets[dset_ind1] # first UVData object dset2 = self.dsets[dset_ind2] # second UVData object valid = True if pol_pair[0] not in dset1.polarization_array: print("dset {} does not contain data for polarization {}".format(dset_ind1, pol_pair[0])) valid = False if pol_pair[1] not in dset2.polarization_array: print("dset {} does not contain data for polarization {}".format(dset_ind2, pol_pair[1])) valid = False return valid
[docs] def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identity', norm='I', taper='none', sampling=False, little_h=True, spw_ranges=None, symmetric_taper=True, baseline_tol=1.0, store_cov=False, store_cov_diag=False, return_q=False, store_window=True, exact_windows=False, ftbeam=None, verbose=True, filter_extensions=None, exact_norm=False, history='', r_params=None, cov_model='empirical', known_cov=None, allow_fft=False): """ Estimate the delay power spectrum from a pair of datasets contained in this object, using the optimal quadratic estimator of arXiv:1502.06016. In this formulation, the power spectrum is proportional to the visibility data via P = M data_{LH} E data_{RH} where E contains the data weighting and FT matrices, M is a normalization matrix, and the two separate datasets are denoted as "left-hand" and "right-hand". Each power spectrum is generated by taking a baseline (specified by an antenna-pair and polarization key) from bls1 out of dsets[0] and assigning it as data_LH, and a bl from bls2 out of the dsets[1] and assigning it as data_RH. If the bl chosen from bls1 is (ant1, ant2) and the bl chosen from bls2 is (ant3, ant4), the "baseline-pair" describing their cross multiplication is ((ant1, ant2), (ant3, ant4)). Parameters ---------- bls1, bls2 : list List of baseline groups, each group being a list of ant-pair tuples. dsets : length-2 tuple or list Contains indices of self.dsets to use in forming power spectra, where the first index is for the Left-Hand dataset and second index is used for the Right-Hand dataset (see above). pols : tuple or list of tuple Contains polarization pairs to use in forming power spectra e.g. ('XX','XX') or [('XX','XX'),('pI','pI')] or a list of polarization pairs. Individual strings are also supported, and will be expanded into a matching pair of polarizations, e.g. 'xx' becomes ('xx', 'xx'). If a primary_beam is specified, only equal-polarization pairs can be cross-correlated, as the beam scalar normalization is only implemented in this case. To obtain unnormalized spectra for pairs of different polarizations, set the primary_beam to None. n_dlys : list of integer, optional The number of delay bins to use. The order in the list corresponds to the order in spw_ranges. Default: None, which then sets n_dlys = number of frequencies. input_data_weight : str, optional String specifying which weighting matrix to apply to the input data. See the options in the set_R() method for details. Default: 'identity'. norm : str, optional String specifying how to choose the normalization matrix, M. See the 'mode' argument of get_MW() for options. Default: 'I'. taper : str, optional Tapering (window) function to apply to the data. Takes the same arguments as uvtools.dspec.gen_window(). Default: 'none'. sampling : boolean, optional Whether output pspec values are samples at various delay bins or are integrated bandpowers over delay bins. Default: False little_h : boolean, optional Whether to have cosmological length units be h^-1 Mpc or Mpc Default: h^-1 Mpc spw_ranges : list of tuples, optional A list of spectral window channel ranges to select within the total bandwidth of the datasets, each of which forms an independent power spectrum estimate. Example: [(220, 320), (650, 775)]. Each tuple should contain a start (inclusive) and stop (exclusive) channel used to index the `freq_array` of each dataset. The default (None) is to use the entire band provided in each dataset. symmetric_taper : bool, optional Specify if taper should be applied symmetrically to K-matrix (if true) or on the left (if False). Default: True baseline_tol : float, optional Distance tolerance for notion of baseline "redundancy" in meters. Default: 1.0. store_cov : bool, optional If True, calculate an analytic covariance between bandpowers given an input visibility noise model, and store the output in the UVPSpec object. store_cov_diag : bool, optional If True, store the square root of the diagonal of the output covariance matrix calculated by using get_analytic_covariance(). The error bars will be stored in the form of: `sqrt(diag(cov_array_real)) + 1.j*sqrt(diag(cov_array_imag))`. It's a way to save the disk space since the whole cov_array data with a size of Ndlys x Ndlys x Ntimes x Nblpairs x Nspws is too large. return_q : bool, optional If True, return the results (delay spectra and covariance matrices) for the unnormalized bandpowers in the UVPSpec object. store_window : bool, optional If True, store the window function of the bandpowers. Default: True exact_windows : bool, optional If True, compute exact window functions and sets store_window=True. Default: False ftbeam : str or FTBeam, optional Definition of the beam Fourier transform to be used. Options include; - Root name of the file to use, without the polarisation Ex : FT_beam_HERA_dipole (+ path) - '' for computation from beam simulations (slow) - FTBeam object. Make sure polarisations and bandwidths are consistent with the data set. cov_model : string, optional Type of covariance model to calculate, if not cached. Options=['empirical', 'dsets', 'autos', 'foreground_dependent', (other model names in known_cov)] In 'dsets' mode, error bars are estimated from user-provided per baseline and per channel standard deivations. In 'empirical' mode, error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. In 'autos' mode, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas forming the baseline across channel bandwidth and integration time. In 'foreground_dependent' mode, it involves using auto-correlation amplitudes to model the input noise covariance and visibility outer products to model the input systematics covariance. For more details see ds.get_analytic_covariance(). known_cov : dicts of input covariance matrices `known_cov` has a type {Ckey:covariance}, which is the same as ds._C. The matrices stored in known_cov are constructed outside the PSpecData object, unlike those in ds._C which are constructed internally. The Ckey should conform to: `(dset_pair_index, blpair_int, model, time_index, conj_1, conj_2)`, e.g. `((0, 1), ((25,37,"xx"), (25, 37, "xx")), 'empirical', False, True)`, while covariance are ndarrays with shape (Nfreqs, Nfreqs). Also see PSpecData.set_C() for more details. verbose : bool, optional If True, print progress, warnings and debugging info to stdout. filter_extensions : list of 2-tuple or 2-list, optional Set number of channels to extend filtering width. exact_norm : bool, optional If True, estimates power spectrum using Q instead of Q_alt (HERA memo #44). The default options is False. Beware that computing a power spectrum when exact_norm is set to False runs two times faster than setting it to True. history : str, optional History string to attach to UVPSpec object r_params: dictionary with parameters for weighting matrix. Proper fields and formats depend on the mode of data_weighting. For `data_weighting` set to 'dayenu', `r_params` should be a dict with the following fields: `filter_centers`, a list of floats (or float) specifying the (delay) channel numbers at which to center filtering windows. Can specify fractional channel number. `filter_half_widths`, a list of floats (or float) specifying the width of each filter window in (delay) channel numbers. Can specify fractional channel number. `filter_factors`, list of floats (or float) specifying how much power within each filter window is to be suppressed. Absence of an `r_params` dictionary will result in an error. allow_fft : bool, optional Use an fft to compute q-hat. Default is False. Returns ------- uvp : UVPSpec object Instance of UVPSpec that holds the normalized output power spectrum data. Examples -------- *Example 1:* No grouping; i.e. each baseline is its own group, no brackets needed for each bl. If:: A = (1, 2); B = (2, 3); C = (3, 4); D = (4, 5); E = (5, 6); F = (6, 7) and:: bls1 = [ A, B, C ] bls2 = [ D, E, F ] then:: blpairs = [ (A, D), (B, E), (C, F) ] *Example 2:* Grouping; blpairs come in lists of blgroups, which are considered "grouped" in OQE. If:: bls1 = [ [A, B], [C, D] ] bls2 = [ [C, D], [E, F] ] then:: blpairs = [ [(A, C), (B, D)], [(C, E), (D, F)] ] *Example 3:* Mixed grouping; i.e. some blpairs are grouped, others are not. If:: bls1 = [ [A, B], C ] bls2 = [ [D, E], F ] then:: blpairs = [ [(A, D), (B, E)], (C, F)] """ # set taper and data weighting self.set_taper(taper) self.set_symmetric_taper(symmetric_taper) self.set_weighting(input_data_weight) # Validate the input data to make sure it's sensible self.validate_datasets(verbose=verbose) # Currently the "pspec normalization scalar" doesn't work if a # non-identity data weighting AND a non-trivial taper are used if taper != 'none' and input_data_weight != 'identity': raise_warning("Warning: Scalar power spectrum normalization " "doesn't work with current implementation " "if the tapering AND non-identity " "weighting matrices are both used.", verbose=verbose) # get datasets assert isinstance(dsets, (list, tuple)), \ "dsets must be fed as length-2 tuple of integers" assert len(dsets) == 2, "len(dsets) must be 2" assert isinstance(dsets[0], (int, np.integer)) \ and isinstance(dsets[1], (int, np.integer)), \ "dsets must contain integer indices" dset1 = self.dsets[self.dset_idx(dsets[0])] dset2 = self.dsets[self.dset_idx(dsets[1])] # assert form of bls1 and bls2 assert isinstance(bls1, list), \ "bls1 and bls2 must be fed as a list of antpair tuples" assert isinstance(bls2, list), \ "bls1 and bls2 must be fed as a list of antpair tuples" assert len(bls1) == len(bls2) and len(bls1) > 0, \ "length of bls1 must equal length of bls2 and be > 0" for i in range(len(bls1)): if isinstance(bls1[i], tuple): assert isinstance(bls2[i], tuple), \ "bls1[{}] type must match bls2[{}] type".format(i, i) else: assert len(bls1[i]) == len(bls2[i]), \ "len(bls1[{}]) must match len(bls2[{}])".format(i, i) # construct list of baseline pairs bl_pairs = [] for i in range(len(bls1)): if isinstance(bls1[i], tuple): bl_pairs.append( (bls1[i], bls2[i]) ) elif isinstance(bls1[i], list) and len(bls1[i]) == 1: bl_pairs.append( (bls1[i][0], bls2[i][0]) ) else: bl_pairs.append( [ (bls1[i][j], bls2[i][j]) for j in range(len(bls1[i])) ] ) # validate bl-pair redundancy validate_blpairs(bl_pairs, dset1, dset2, baseline_tol=baseline_tol) # configure spectral window selections if spw_ranges is None: spw_ranges = [(0, self.Nfreqs)] if isinstance(spw_ranges, tuple): spw_ranges = [spw_ranges,] if filter_extensions is None: filter_extensions = [(0, 0) for m in range(len(spw_ranges))] # convert to list if only a tuple was given if isinstance(filter_extensions, tuple): filter_extensions = [filter_extensions,] assert len(spw_ranges) == len(filter_extensions), "must provide same number of spw_ranges as filter_extensions" # Check that spw_ranges is list of len-2 tuples assert np.isclose([len(t) for t in spw_ranges], 2).all(), \ "spw_ranges must be fed as a list of length-2 tuples" # if using default setting of number of delay bins equal to number # of frequency channels if n_dlys is None: n_dlys = [None for i in range(len(spw_ranges))] elif isinstance(n_dlys, (int, np.integer)): n_dlys = [n_dlys] # if using the whole band in the dataset, then there should just be # one n_dly parameter specified if spw_ranges is None and n_dlys != None: assert len(n_dlys) == 1, \ "Only one spw, so cannot specify more than one n_dly value" # assert that the same number of ndlys has been specified as the # number of spws assert len(spw_ranges) == len(n_dlys), \ "Need to specify number of delay bins for each spw" if store_cov_diag and store_cov: store_cov = False # Only store diagnonal parts of the cov_array to save the disk space if store_cov_diag==True, # no matter what the initial choice for store_cov. if exact_windows and not store_window: warnings.warn('exact_windows is True... setting store_window to True.') store_window = True # setup polarization selection if isinstance(pols, (tuple, str)): pols = [pols] # convert all polarizations to integers if fed as strings _pols = [] for p in pols: if isinstance(p, str): # Convert string to pol-integer pair p = (uvutils.polstr2num(p, x_orientation=self.dsets[0].x_orientation), uvutils.polstr2num(p, x_orientation=self.dsets[0].x_orientation)) if isinstance(p[0], str): p = (uvutils.polstr2num(p[0], x_orientation=self.dsets[0].x_orientation), p[1]) if isinstance(p[1], str): p = (p[0], uvutils.polstr2num(p[1], x_orientation=self.dsets[0].x_orientation)) _pols.append(p) pols = _pols # initialize empty lists # We should really pre-allocate arrays instead. data_array = odict() wgt_array = odict() integration_array = odict() cov_array_real = odict() cov_array_imag = odict() stats_array_cov_model = odict() window_function_array = odict() time1 = [] time2 = [] lst1 = [] lst2 = [] dly_spws = [] freq_spws = [] dlys = [] freqs = [] sclr_arr = [] blp_arr = [] bls_arr = [] ndone = 0 ntodo = len(bls1) * len(pols) * len(spw_ranges) nblps = len(bl_pairs) npols = len(pols) nspws = len(spw_ranges) nchunks = 500 nper_chunk = max(ntodo // nchunks, 1) # Loop over spectral windows for i in range(len(spw_ranges)): # set spectral range logger.info(f"\nSetting spectral range: {spw_ranges[i]}") self.set_spw(spw_ranges[i], ndlys=n_dlys[i]) self.set_filter_extension(filter_extensions[i]) # clear covariance cache self.clear_cache() # setup empty data arrays spw_data = [] spw_wgts = [] spw_ints = [] spw_scalar = [] spw_polpair = [] spw_cov_real = [] spw_cov_imag = [] spw_stats_array_cov_model = [] spw_window_function = [] d = self.delays() * 1e-9 f = dset1.freq_array.flatten()[spw_ranges[i][0]:spw_ranges[i][1]] dlys.extend(d) dly_spws.extend(np.ones_like(d, np.int16) * i) freq_spws.extend(np.ones_like(f, np.int16) * i) freqs.extend(f) # Loop over polarizations for j, p in enumerate(pols): p_str = tuple([uvutils.polnum2str(_p) for _p in p]) logger.info(f"\nUsing polarization pair: {p_str}") # validating polarization pair on UVData objects valid = self.validate_pol(dsets, tuple(p)) if not valid: # Polarization pair is invalid; skip warnings.warn( f"Polarization pair: {p_str} failed the validation test, continuing..." ) continue spw_polpair.append( uvputils.polpair_tuple2int(p) ) pol_data = [] pol_wgts = [] pol_ints = [] pol_cov_real = [] pol_cov_imag = [] pol_stats_array_cov_model = [] pol_window_function = [] # Compute scalar to convert "telescope units" to "cosmo units" if self.primary_beam is not None: # Raise error if cross-pol is requested if (p[0] != p[1]): raise NotImplementedError( "Visibilities with different polarizations can only " "be cross-correlated if primary_beam = None. Cannot " "compute beam scalar for mixed polarizations.") # using zero'th indexed polarization, as cross-polarized # beams are not yet implemented if norm == 'H^-1': # If using decorrelation, the H^-1 normalization # already deals with the taper, so we need to override # the taper when computing the scalar scalar = self.scalar(p, little_h=little_h, taper_override='none', exact_norm=exact_norm) else: scalar = self.scalar(p, little_h=little_h, exact_norm=exact_norm) else: raise_warning("Warning: self.primary_beam is not defined, " "so pspectra are not properly normalized", verbose=verbose) scalar = 1.0 pol = (p[0]) # used in get_integral_beam function to specify the correct polarization for the beam spw_scalar.append(scalar) # Loop over baseline pairs for k, blp in enumerate(bl_pairs): # assign keys if isinstance(blp, list): # interpet blp as group of baseline-pairs raise NotImplementedError("Baseline lists bls1 and bls2" " must be lists of tuples (not lists of lists" " of tuples).\n" "Use hera_pspec.pspecdata.construct_blpairs()" " to construct appropriately grouped baseline" " lists.") #key1 = [(dsets[0],) + _blp[0] + (p[0],) for _blp in blp] #key2 = [(dsets[1],) + _blp[1] + (p[1],) for _blp in blp] elif isinstance(blp, tuple): # interpret blp as baseline-pair key1 = (dsets[0],) + blp[0] + (p_str[0],) key2 = (dsets[1],) + blp[1] + (p_str[1],) ndone += 1 if ndone % nper_chunk == 0: logger.info( f"[{100*ndone/ntodo:5.2f}%] blp {k+1}/{nblps} | " f"pol {j+1}/{npols} | spw {i+1}/{nspws}" ) # Check that number of non-zero weight chans >= n_dlys key1_dof = np.sum(~np.isclose(self.Y(key1).diagonal(), 0.0)) key2_dof = np.sum(~np.isclose(self.Y(key2).diagonal(), 0.0)) if key1_dof - np.sum(self.filter_extension) < self.spw_Ndlys\ or key2_dof - np.sum(self.filter_extension) < self.spw_Ndlys: if verbose: print("WARNING: Number of unflagged chans for key1 " "and/or key2 < n_dlys\n which may lead to " "normalization instabilities.") #if using inverse sinc weighting, set r_params if input_data_weight == 'dayenu': key1 = (dsets[0],) + blp[0] + (p_str[0],) key2 = (dsets[1],) + blp[1] + (p_str[1],) if not key1 in r_params: raise ValueError("No r_param dictionary supplied" " for baseline %s"%(str(key1))) if not key2 in r_params: raise ValueError("No r_param dictionary supplied" " for baseline %s"%(str(key2))) self.set_r_param(key1, r_params[key1]) self.set_r_param(key2, r_params[key2]) # Build Fisher matrix if input_data_weight == 'identity': # in this case, all Gv and Hv differ only by flagging pattern # so check if we've already computed this # First: get flag weighting matrices given key1 & key2 Y = np.vstack([self.Y(key1).diagonal(), self.Y(key2).diagonal()]) # Second: check cache for Y matches = [np.isclose(Y, y).all() for y in self._identity_Y.values()] if True in matches: # This Y exists, so pick appropriate G and H and continue match = list(self._identity_Y.keys())[matches.index(True)] Gv = self._identity_G[match] Hv = self._identity_H[match] else: # This Y doesn't exist, so compute it if nper_chunk == 1: logger.info(" Building G...") Gv = self.get_G(key1, key2, exact_norm=exact_norm, pol = pol) Hv = self.get_H(key1, key2, sampling=sampling, exact_norm=exact_norm, pol = pol) # cache it self._identity_Y[(key1, key2)] = Y self._identity_G[(key1, key2)] = Gv self._identity_H[(key1, key2)] = Hv else: # for non identity weighting (i.e. iC weighting) # Gv and Hv are always different, so compute them if nper_chunk == 1: logger.info(" Building G...") Gv = self.get_G(key1, key2, exact_norm=exact_norm, pol = pol) Hv = self.get_H(key1, key2, sampling=sampling, exact_norm=exact_norm, pol = pol) # Calculate unnormalized bandpowers if nper_chunk == 1: logger.info(" Building q_hat...") qv = self.q_hat(key1, key2, exact_norm=exact_norm, pol=pol, allow_fft=allow_fft) if nper_chunk == 1: logger.info(" Normalizing power spectrum...") if norm == 'V^-1/2': V_mat = self.get_unnormed_V(key1, key2, exact_norm=exact_norm, pol = pol) Mv, Wv = self.get_MW(Gv, Hv, mode=norm, band_covar=V_mat, exact_norm=exact_norm) else: Mv, Wv = self.get_MW(Gv, Hv, mode=norm, exact_norm=exact_norm) pv = self.p_hat(Mv, qv) # Multiply by scalar if self.primary_beam != None: if nper_chunk == 1: logger.info(" Computing and multiplying scalar...") pv *= scalar # Wide bin adjustment of scalar, which is only needed for # the diagonal norm matrix mode (i.e., norm = 'I') if norm == 'I' and not(exact_norm): sa = self.scalar_delay_adjustment(Gv=Gv, Hv=Hv) if isinstance(sa, float): pv *= sa else: pv = np.atleast_2d(sa).T * pv #Generate the covariance matrix if error bars provided if store_cov or store_cov_diag: if nper_chunk == 1: logger.info(" Building q_hat covariance...") cov_q_real, cov_q_imag, cov_real, cov_imag \ = self.get_analytic_covariance(key1, key2, Mv, exact_norm=exact_norm, pol=pol, model=cov_model, known_cov=known_cov, ) if self.primary_beam != None: cov_real = cov_real * (scalar)**2. cov_imag = cov_imag * (scalar)**2. if norm == 'I' and not(exact_norm): if isinstance(sa, float): cov_real = cov_real * (sa)**2. cov_imag = cov_imag * (sa)**2. else: cov_real = cov_real * np.outer(sa, sa)[None] cov_imag = cov_imag * np.outer(sa, sa)[None] if not return_q: if store_cov: pol_cov_real.extend(np.real(cov_real).astype(np.float64)) pol_cov_imag.extend(np.real(cov_imag).astype(np.float64)) if store_cov_diag: stats = np.sqrt(np.diagonal(np.real(cov_real), axis1=1, axis2=2)) + 1.j*np.sqrt(np.diagonal(np.real(cov_imag), axis1=1, axis2=2)) pol_stats_array_cov_model.extend(stats) else: if store_cov: pol_cov_real.extend(np.real(cov_q_real).astype(np.float64)) pol_cov_imag.extend(np.real(cov_q_imag).astype(np.float64)) if store_cov_diag: stats = np.sqrt(np.diagonal(np.real(cov_q_real), axis1=1, axis2=2)) + 1.j*np.sqrt(np.diagonal(np.real(cov_q_imag), axis1=1, axis2=2)) pol_stats_array_cov_model.extend(stats) # store the window_function if store_window: pol_window_function.extend(np.repeat(Wv[np.newaxis,:,:], qv.shape[1], axis=0).astype(np.float64)) # Get baseline keys if isinstance(blp, list): bl1 = blp[0][0] bl2 = blp[0][1] else: bl1 = blp[0] bl2 = blp[1] # append bls bls_arr.extend([bl1, bl2]) # insert pspectra if not return_q: pol_data.extend(pv.T) else: pol_data.extend(qv.T) # get weights wgts1 = self.w(key1).T wgts2 = self.w(key2).T # get avg of nsample across frequency axis, weighted by wgts nsamp1 = np.sum(dset1.get_nsamples(bl1 + (p[0],))[:, slice(*self.get_spw())] * wgts1, axis=1) \ / np.sum(wgts1, axis=1).clip(1, np.inf) nsamp2 = np.sum(dset2.get_nsamples(bl2 + (p[1],))[:, slice(*self.get_spw())] * wgts2, axis=1) \ / np.sum(wgts2, axis=1).clip(1, np.inf) # get integ1 blts1 = dset1.antpair2ind(bl1, ordered=False) integ1 = dset1.integration_time[blts1] * nsamp1 # get integ2 blts2 = dset2.antpair2ind(bl2, ordered=False) integ2 = dset2.integration_time[blts2] * nsamp2 # take inverse avg of integ1 and integ2 to get total integ # inverse avg is done b/c integ ~ 1/noise_var # and due to non-linear operation of V_1 * V_2 pol_ints.extend(1./np.mean([1./integ1, 1./integ2], axis=0)) # combined weight is geometric mean pol_wgts.extend(np.concatenate([wgts1[:, :, None], wgts2[:, :, None]], axis=2)) # insert time and blpair info only once per blpair if i < 1 and j < 1: # insert time info inds1 = dset1.antpair2ind(bl1, ordered=False) inds2 = dset2.antpair2ind(bl2, ordered=False) time1.extend(dset1.time_array[inds1]) time2.extend(dset2.time_array[inds2]) lst1.extend(dset1.lst_array[inds1]) lst2.extend(dset2.lst_array[inds2]) # insert blpair info blp_arr.extend(np.ones_like(inds1, int) \ * uvputils._antnums_to_blpair(blp)) # insert into data and wgts integrations dictionaries spw_data.append(pol_data) spw_wgts.append(pol_wgts) spw_ints.append(pol_ints) spw_stats_array_cov_model.append(pol_stats_array_cov_model) spw_cov_real.append(pol_cov_real) spw_cov_imag.append(pol_cov_imag) spw_window_function.append(pol_window_function) # insert into data and integration dictionaries spw_data = np.moveaxis(np.array(spw_data), 0, -1) spw_wgts = np.moveaxis(np.array(spw_wgts), 0, -1) spw_ints = np.moveaxis(np.array(spw_ints), 0, -1) spw_stats_array_cov_model = np.moveaxis(np.array(spw_stats_array_cov_model), 0, -1) if store_cov: spw_cov_real = np.moveaxis(np.array(spw_cov_real), 0, -1) spw_cov_imag = np.moveaxis(np.array(spw_cov_imag), 0, -1) if store_window: spw_window_function = np.moveaxis(np.array(spw_window_function), 0, -1) data_array[i] = spw_data stats_array_cov_model[i] = spw_stats_array_cov_model if store_cov: cov_array_real[i] = spw_cov_real cov_array_imag[i] = spw_cov_imag if store_window: window_function_array[i] = spw_window_function wgt_array[i] = spw_wgts integration_array[i] = spw_ints sclr_arr.append(spw_scalar) # raise error if none of pols are consistent with the UVData objects if len(spw_polpair) == 0: raise ValueError("None of the specified polarization pairs " "match that of the UVData objects") self.set_filter_extension((0, 0)) # set filter_extension to be zero when ending the loop # fill uvp object uvp = uvpspec.UVPSpec() uvp.symmetric_taper=symmetric_taper # fill meta-data uvp.time_1_array = np.array(time1) uvp.time_2_array = np.array(time2) uvp.time_avg_array = np.mean([uvp.time_1_array, uvp.time_2_array], axis=0) uvp.lst_1_array = np.array(lst1) uvp.lst_2_array = np.array(lst2) uvp.lst_avg_array = np.mean([np.unwrap(uvp.lst_1_array), np.unwrap(uvp.lst_2_array)], axis=0) \ % (2*np.pi) uvp.blpair_array = np.array(blp_arr) uvp.Nblpairs = len(np.unique(blp_arr)) # Ntimes in a uvpspec object now means the total number of times. # In all possible datasets that could be produced by pspecdata this # is equal to Ntpairs or 2 x Ntpairs if we interleave # but we have given upspec the capability to have this # not necessarily be the same. # Ntimes could still be the number of "average times" which might be useful for noise purposes. uvp.Ntimes = len(np.unique(np.hstack([uvp.time_1_array, uvp.time_2_array]))) uvp.Nbltpairs = len(set([(blp, t1, t2) for blp, t1, t2 in zip(uvp.blpair_array, uvp.time_1_array, uvp.time_2_array)])) uvp.Ntpairs = len(set([(t1, t2) for t1, t2 in zip(uvp.time_1_array, uvp.time_2_array)])) bls_arr = sorted(set(bls_arr)) uvp.bl_array = np.array([uvp.antnums_to_bl(bl) for bl in bls_arr]) antpos = dict(zip(dset1.antenna_numbers, dset1.antenna_positions)) uvp.bl_vecs = np.array([antpos[bl[0]] - antpos[bl[1]] for bl in bls_arr]) uvp.Nbls = len(uvp.bl_array) uvp.spw_dly_array = np.array(dly_spws) uvp.spw_freq_array = np.array(freq_spws) uvp.Nspws = len(np.unique(dly_spws)) uvp.spw_array = np.arange(uvp.Nspws, dtype=np.int16) uvp.freq_array = np.array(freqs) uvp.dly_array = np.array(dlys) uvp.Ndlys = len(np.unique(dlys)) uvp.Nspwdlys = len(uvp.spw_dly_array) uvp.Nspwfreqs = len(uvp.spw_freq_array) uvp.Nfreqs = len(np.unique(freqs)) uvp.polpair_array = np.array(spw_polpair, int) uvp.Npols = len(spw_polpair) uvp.scalar_array = np.array(sclr_arr) uvp.channel_width = np.array(dset1.channel_width) # all dsets validated to agree uvp.exact_windows = False uvp.weighting = input_data_weight uvp.vis_units, uvp.norm_units = self.units(little_h=little_h) uvp.telescope_location = dset1.telescope_location filename1 = json.loads(dset1.extra_keywords.get('filename', '""')) cal1 = json.loads(dset1.extra_keywords.get('calibration', '""')) filename2 = json.loads(dset2.extra_keywords.get('filename', '""')) cal2 = json.loads(dset2.extra_keywords.get('calibration', '""')) label1 = self.labels[self.dset_idx(dsets[0])] label2 = self.labels[self.dset_idx(dsets[1])] uvp.labels = sorted(set([label1, label2])) uvp.label_1_array = np.ones((uvp.Nspws, uvp.Nbltpairs, uvp.Npols), int) \ * uvp.labels.index(label1) uvp.label_2_array = np.ones((uvp.Nspws, uvp.Nbltpairs, uvp.Npols), int) \ * uvp.labels.index(label2) uvp.labels = np.array(uvp.labels, str) uvp.r_params = uvputils.compress_r_params(r_params) uvp.taper = taper if not return_q: uvp.norm = norm else: uvp.norm = 'Unnormalized' # save version of hera_pspec with backward compatibility uvp.history = "UVPSpec written on {} with hera_pspec git hash {}\n{}\n" \ "dataset1: filename: {}, label: {}, cal: {}, history:\n{}\n{}\n" \ "dataset2: filename: {}, label: {}, cal: {}, history:\n{}\n{}\n" \ "".format(datetime.datetime.utcnow(), __version__, '-'*20, filename1, label1, cal1, dset1.history, '-'*20, filename2, label2, cal2, dset2.history, '-'*20) if self.primary_beam is not None: # attach cosmology uvp.cosmo = self.primary_beam.cosmo # attach beam info uvp.beam_freqs = self.primary_beam.beam_freqs uvp.OmegaP, uvp.OmegaPP = \ self.primary_beam.get_Omegas(uvp.polpair_array) if hasattr(self.primary_beam, 'filename'): uvp.beamfile = self.primary_beam.filename # fill data arrays uvp.data_array = data_array uvp.integration_array = integration_array uvp.wgt_array = wgt_array uvp.nsample_array = dict( [ (k, np.ones_like(uvp.integration_array[k], float)) for k in uvp.integration_array.keys() ] ) # covariance if store_cov: uvp.cov_array_real = cov_array_real uvp.cov_array_imag = cov_array_imag uvp.cov_model = cov_model if store_cov_diag: uvp.stats_array = odict() uvp.stats_array[cov_model+"_diag"] = stats_array_cov_model # window functions if store_window: if exact_windows: # compute and store exact window functions uvp.get_exact_window_functions(ftbeam=ftbeam, verbose=verbose, x_orientation=self.dsets[0].x_orientation, inplace=True) else: uvp.window_function_array = window_function_array # run check uvp.check() return uvp
[docs] def rephase_to_dset(self, dset_index=0, inplace=True): """ Rephase visibility data in self.dsets to the LST grid of dset[dset_index] using hera_cal.utils.lst_rephase. Each integration in all other dsets is phased to the center of the corresponding LST bin (by index) in dset[dset_index]. Will only phase if the dataset's phase type is 'drift'. This is because the rephasing algorithm assumes the data is drift-phased when applying the phasor term. Note that PSpecData.Jy_to_mK() must be run *after* rephase_to_dset(), if one intends to use the former capability at any point. Parameters ---------- dset_index : int or str Index or label of dataset in self.dset to phase other datasets to. inplace : bool, optional If True, edits data in dsets in-memory. Else, makes a copy of dsets, edits data in the copy and returns to user. Returns ------- if inplace: return new_dsets else: return None """ # run dataset validation self.validate_datasets() # assign dsets if inplace: dsets = self.dsets else: dsets = copy.deepcopy(self.dsets) # Parse dset_index dset_index = self.dset_idx(dset_index) # get LST grid we are phasing to lst_grid = [] lst_array = dsets[dset_index].lst_array.ravel() for l in lst_array: if l not in lst_grid: lst_grid.append(l) lst_grid = np.array(lst_grid) # get polarization list pol_list = dsets[dset_index].polarization_array.tolist() # iterate over dsets for i, dset in enumerate(dsets): # don't rephase dataset we are using as our LST anchor if i == dset_index: # even though not phasing this dset, must set to match all other # dsets due to phasing-check validation dset.phase_type = 'unknown' continue # skip if dataset is not drift phased if dset.phase_type != 'drift': warnings.warn(f"Skipping dataset {i} b/c it isn't drift phased") # convert UVData to DataContainers. Note this doesn't make # a copy of the data (data, flgs, antpos, ants, freqs, times, lsts, pols) = hc.io.load_vis(dset, return_meta=True) # make bls dictionary bls = dict([(k, antpos[k[0]] - antpos[k[1]]) for k in data.keys()]) # Get dlst array dlst = lst_grid - lsts # get telescope latitude lat = dset.telescope_location_lat_lon_alt_degrees[0] # rephase hc.utils.lst_rephase(data, bls, freqs, dlst, lat=lat) # re-insert into dataset for j, k in enumerate(data.keys()): # get blts indices of basline indices = dset.antpair2ind(k[:2], ordered=False) # get index in polarization_array for this polarization polind = pol_list.index(uvutils.polstr2num(k[-1], x_orientation=self.dsets[0].x_orientation)) # insert into dset dset.data_array[indices, :, polind] = data[k] # set phasing in UVData object to unknown b/c there isn't a single # consistent phasing for the entire data set. dset.phase_type = 'unknown' if inplace is False: return dsets
[docs] def Jy_to_mK(self, beam=None): """ Convert internal datasets from a Jy-scale to mK scale using a primary beam model if available. Note that if you intend to rephase_to_dset(), Jy to mK conversion must be done *after* that step. Parameters ---------- beam : PSpecBeam object Beam object. """ # get all unique polarizations of all the datasets pols = set(np.ravel([dset.polarization_array for dset in self.dsets])) # assign beam if beam is None: beam = self.primary_beam else: if self.primary_beam is not None: warnings.warn( "Feeding a beam model when self.primary_beam already exists..." ) # Check beam is not None assert beam is not None, \ "Cannot convert Jy --> mK b/c beam object is not defined..." # assert type of beam assert isinstance(beam, pspecbeam.PSpecBeamBase), \ "beam model must be a subclass of pspecbeam.PSpecBeamBase" # iterate over all pols and get conversion factors factors = {} for p in pols: factors[p] = beam.Jy_to_mK(self.freqs, pol=p) # iterate over datasets and apply factor for i, dset in enumerate(self.dsets): # check dset vis units if dset.vis_units.upper() != 'JY': warnings.warn( f"Cannot convert dset {i} Jy -> mK because vis_units = {dset.vis_units}" ) continue for j, p in enumerate(dset.polarization_array): dset.data_array[:, :, j] *= factors[p][None, :] dset.vis_units = 'mK'
[docs] def trim_dset_lsts(self, lst_tol=6): """ Assuming all datasets in self.dsets are locked to the same LST grid (but each may have a constant offset), trim LSTs from each dset that aren't found in all other dsets (within some decimal tolerance specified by lst_tol). Warning: this edits the data in dsets in-place, and is not reversible. Parameters ---------- lst_tol : float Decimal tolerance [radians] for comparing float-valued LST bins. """ # ensure each dset has same dLST within tolerance / Ntimes dlst = np.median(np.diff(np.unique(self.dsets[0].lst_array))) for dset in self.dsets: _dlst = np.median(np.diff(np.unique(dset.lst_array))) if not np.isclose(dlst, _dlst, atol=10**(-lst_tol) / dset.Ntimes): raise ValueError("Not all datasets in self.dsets are on the same LST " "grid, cannot LST trim.") # get lst array of each dataset, turn into string and add to common_lsts lst_arrs = [] common_lsts = set() for i, dset in enumerate(self.dsets): lsts = ["{lst:0.{tol}f}".format(lst=l, tol=lst_tol) for l in dset.lst_array] lst_arrs.append(lsts) if i == 0: common_lsts = common_lsts.union(set(lsts)) else: common_lsts = common_lsts.intersection(set(lsts)) # iterate through dsets and trim off integrations whose lst isn't # in common_lsts for i, dset in enumerate(self.dsets): trim_inds = np.array([l not in common_lsts for l in lst_arrs[i]]) if np.any(trim_inds): self.dsets[i].select(times=dset.time_array[~trim_inds])
def pspec_run(dsets, filename, dsets_std=None, cals=None, cal_flag=True, groupname=None, dset_labels=None, dset_pairs=None, psname_ext=None, spw_ranges=None, n_dlys=None, pol_pairs=None, blpairs=None, input_data_weight='identity', norm='I', taper='none', sampling=False, exclude_auto_bls=False, exclude_cross_bls=False, exclude_permutations=True, Nblps_per_group=None, bl_len_range=(0, 1e10), bl_deg_range=(0, 180), bl_error_tol=1.0, store_window=True, exact_windows=False, ftbeam=None, beam=None, cosmo=None, interleave_times=False, rephase_to_dset=None, trim_dset_lsts=False, broadcast_dset_flags=True, time_thresh=0.2, Jy2mK=False, overwrite=True, symmetric_taper=True, file_type='miriad', verbose=True, exact_norm=False, store_cov=False, store_cov_diag=False, filter_extensions=None, history='', r_params=None, tsleep=0.1, maxiter=1, return_q=False, known_cov=None, cov_model='empirical', include_autocorrs=False, include_crosscorrs=True, xant_flag_thresh=0.95, allow_fft=False): """ Create a PSpecData object, run OQE delay spectrum estimation and write results to a PSpecContainer object. Warning: if dsets is a list of UVData objects, they might be edited in place! Parameters ---------- dsets : list Contains UVData objects or string filepaths to UVData-compatible files filename : str Output filepath for HDF5 PSpecContainer object groupname : str Groupname of the subdirectory in the HDF5 container to store the UVPSpec objects in. Default is a concatenation the dset_labels. dsets_std : list Contains UVData objects or string filepaths to miriad files. Default is none. cals : list List of UVCal objects or calfits filepaths. Default is None. cal_flag : bool If True, use flags in calibration to flag data. dset_labels : list List of strings to label the input datasets. These labels form the psname of each UVPSpec object. Default is "dset0_x_dset1" where 0 and 1 are replaced with the dset index in dsets. Note: it is not advised to put underscores in the dset label names, as some downstream functions use this as a special character. dset_pairs : list of len-2 integer tuples List of tuples specifying the dset pairs to use in OQE estimation. Default is to form all N_choose_2 pairs from input dsets. psname_ext : string A string extension for the psname in the PSpecContainer object. Example: 'group/psname{}'.format(psname_ext) spw_ranges : list of len-2 integer tuples List of tuples specifying the spectral window range. See PSpecData.pspec() for details. Default is the entire band. n_dlys : list List of integers denoting number of delays to use per spectral window. Same length as spw_ranges. pol_pairs : list of len-2 tuples List of string or integer tuples specifying the polarization pairs to use in OQE with each dataset pair in dset_pairs. Default is to get all unique pols in the datasets and to form all auto-pol pairs. See PSpecData.pspec() for details. blpairs : list of tuples List of tuples specifying the desired baseline pairs to use in OQE. Ex. [((1, 2), (3, 4)), ((1, 2), (5, 6)), ...] The first bl in a tuple is drawn from zeroth index of a tuple in dset_pairs, while the second bl is drawn from the first index. See pspecdata.construct_blpairs for details. If None, the default behavior is to use the antenna positions in each UVData object to construct lists of redundant baseline groups to to take all cross-multiplies in each redundant baseline group. input_data_weight : string Data weighting to use in OQE. See PSpecData.pspec for details. Default: 'identity' norm : string Normalization scheme to use in OQE. See PSpecData.pspec for details. Default: 'I' taper : string Tapering to apply to data in OQE. See PSpecData.pspec for details. Default: 'none' sampling : boolean, optional Whether output pspec values are samples at various delay bins or are integrated bandpowers over delay bins. Default: False exclude_auto_bls : boolean If blpairs is None, redundant baseline groups will be formed and all cross-multiplies will be constructed. In doing so, if exclude_auto_bls is True, eliminate all instances of a bl crossed with itself. Default: False exclude_cross_bls : boolean If True and if blpairs is None, exclude all bls crossed with a different baseline. Note if this and exclude_auto_bls are True then no blpairs will exist. Default: False exclude_permutations : boolean If blpairs is None, redundant baseline groups will be formed and all cross-multiplies will be constructed. In doing so, if exclude_permutations is True, eliminates instances of (bl_B, bl_A) if (bl_A, bl_B) also exists. Default: True Nblps_per_group : integer If blpairs is None, group blpairs into sub-groups of baseline-pairs of this size. See utils.calc_blpair_reds() for details. Default: None bl_len_range : len-2 float tuple A tuple containing the minimum and maximum baseline length to use in utils.calc_blpair_reds call. Only used if blpairs is None. bl_deg_range : len-2 float tuple A tuple containing the min and max baseline angle (ENU frame in degrees) to use in utils.calc_blpair_reds. Total range is between 0 and 180 degrees. bl_error_tol : float, optional Baseline vector error tolerance when constructing redundant groups. Default: 1.0. store_window : bool If True, store computed window functions (warning, these can be large!) in UVPSpec objects. exact_windows : bool, optional If True, compute exact window functions and sets store_window=True. Default: False ftbeam : str or FTBeam, optional Definition of the beam Fourier transform to be used. Options include; - Root name of the file to use, without the polarisation Ex : FT_beam_HERA_dipole (+ path) - '' for computation from beam simulations (slow) - FTBeam object. Make sure polarisation and bandwidth are compatible with the dataset. beam : PSpecBeam object, UVBeam object or string Beam model to use in OQE. Can be a PSpecBeam object or a filepath to a beamfits healpix map (see UVBeam) cosmo : conversions.Cosmo_Conversions object A Cosmo_Conversions object to use as the cosmology when normalizing the power spectra. Default is a Planck cosmology. See conversions.Cosmo_Conversions for details. interleave_times : bool Only applicable if Ndsets == 1. If True, copy dset[0] into a dset[1] slot and interleave their time arrays. This updates dset_pairs to [(0, 1)]. rephase_to_dset : integer Integer index of the anchor dataset when rephasing all other datasets. This adds a phasor correction to all others dataset to phase the visibility data to the LST-grid of this dataset. Default behavior is no rephasing. trim_dset_lsts : boolean If True, look for constant offset in LST between all dsets, and trim non-overlapping LSTs. broadcast_dset_flags : boolean If True, broadcast dset flags across time using fractional time_thresh. time_thresh : float Fractional flagging threshold, above which a broadcast of flags across time is triggered (if broadcast_dset_flags == True). This is done independently for each baseline's visibility waterfall. Jy2mK : boolean If True, use the beam model provided to convert the units of each dataset from Jy to milli-Kelvin. If the visibility data are not in Jy, this correction is not applied. exact_norm : bool, optional If True, estimates power spectrum using Q instead of Q_alt (HERA memo #44), where q = R_1 x_1 Q R_2 x_2. The default options is False. Beware that turning this True would take ~ 7 sec for computing power spectrum for 100 channels per time sample per baseline. store_cov : boolean, optional If True, solve for covariance between bandpowers and store in output UVPSpec object. store_cov_diag : bool, optional If True, store the square root of the diagonal of the output covariance matrix calculated by using get_analytic_covariance(). The error bars will be stored in the form of: sqrt(diag(cov_array_real)) + 1.j*sqrt(diag(cov_array_imag)). It's a way to save the disk space since the whole cov_array data with a size of Ndlys x Ndlys x Ntimes x Nblpairs x Nspws is too large. return_q : bool, optional If True, return the results (delay spectra and covariance matrices) for the unnormalized bandpowers in the separate UVPSpec object. known_cov : dicts of input covariance matrices known_cov has the type {Ckey:covariance}, which is the same with ds._C. The matrices stored in known_cov must be constructed externally, different from those in ds._C which are constructed internally. return_q : bool, optional If True, return the results (delay spectra and covariance matrices) for the unnormalized bandpowers in the separate UVPSpec object. known_cov : dicts of input covariance matrices known_cov has the type {Ckey:covariance}, which is the same with ds._C. The matrices stored in known_cov must be constructed externally, different from those in ds._C which are constructed internally. filter_extensions : list of 2-tuple or 2-list, optional Set number of channels to extend filtering width. overwrite : boolean If True, overwrite outputs if they exist on disk. symmetric_taper : bool, optional speicfy if taper should be applied symmetrically to K-matrix (if true) or on the left (if False). default is True file_type : str, optional If dsets passed as a list of filenames, specify which file format the files use. Default: 'miriad'. verbose : boolean If True, report feedback to standard output. history : str String to add to history of each UVPSpec object. tsleep : float, optional Time to wait in seconds after each attempt at opening the container file. maxiter : int, optional Maximum number of attempts to open container file (useful for concurrent access when file may be locked temporarily by other processes). cov_model : string, optional Type of covariance model to calculate, if not cached. Options=['empirical', 'dsets', 'autos', 'foreground_dependent', (other model names in known_cov)] In 'dsets' mode, error bars are estimated from user-provided per baseline and per channel standard deivations. In 'empirical' mode, error bars are estimated from the data by averaging the channel-channel covariance of each baseline over time and then applying the appropriate linear transformations to these frequency-domain covariances. In 'autos' mode, the covariances of the input data over a baseline is estimated from the autocorrelations of the two antennas forming the baseline across channel bandwidth and integration time. In 'foreground_dependent' mode, it involves using auto-correlation amplitudes to model the input noise covariance and visibility outer products to model the input systematics covariance. For more details see ds.get_analytic_covariance(). Note: if dsets are str and cov_model is autos or fg_dependent, will also load auto correlations. r_params: dict, optional Dictionary with parameters for weighting matrix. Required fields and formats depend on the mode of `data_weighting`. Default: None. - `sinc_downweight` fields: - `filter_centers`: list of floats (or float) specifying the (delay) channel numbers at which to center filtering windows. Can specify fractional channel number. - `filter_half_widths`: list of floats (or float) specifying the width of each filter window in (delay) channel numbers. Can specify fractional channel number. - `filter_factors`: list of floats (or float) specifying how much power within each filter window is to be suppressed. include_autocorrs : bool, optional If True, include power spectra of autocorrelation visibilities. Default is False. include_crosscorrs: bool, optional If True, include power spectra from crosscorrelation visibilities. Default is True. xant_flag_thresh : float, optional fraction of waterfall that needs to be flagged for entire baseline to be considered flagged and excluded from data. Default is 0.95 allow_fft : bool, optional Use an fft to compute q-hat. Default is False. Returns ------- ds : PSpecData object The PSpecData object used for OQE of power spectrum, with cached weighting matrices. """ # type check assert isinstance(dsets, (list, tuple, np.ndarray)), \ "dsets must be fed as a list of dataset string paths or UVData objects." # parse psname if psname_ext is not None: assert isinstance(psname_ext, str) else: psname_ext = '' # polarizations check if pol_pairs is not None: pols = sorted(set(np.ravel(pol_pairs))) else: pols = None # baselines check if blpairs is not None: err_msg = "blpairs must be fed as a list of baseline-pair tuples, Ex: [((1, 2), (3, 4)), ...]" assert isinstance(blpairs, list), err_msg assert np.all([isinstance(blp, tuple) for blp in blpairs]), err_msg bls1 = [blp[0] for blp in blpairs] bls2 = [blp[1] for blp in blpairs] bls = sorted(set(bls1 + bls2)) else: # get redundant baseline groups bls = None # check cov_model if cov_model in ["autos", "foreground_dependent"] and bls is not None: # include autos if cov_model is autos bls += [(ant, ant) for ant in np.unique(utils.flatten(bls))] # Construct dataset pairs to operate on Ndsets = len(dsets) if dset_pairs is None: if len(dsets) > 1: dset_pairs = list(itertools.combinations(range(Ndsets), 2)) else: dset_pairs = [(0, 0)] if dset_labels is None: dset_labels = ["dset{}".format(i) for i in range(Ndsets)] else: assert not np.any(['_' in dl for dl in dset_labels]), \ "cannot accept underscores in input dset_labels: {}".format(dset_labels) # if dsets are not UVData, assume they are filepaths or list of filepaths if not isinstance(dsets[0], UVData): try: # load data into UVData objects if fed as list of strings t0 = time.time() dsets = _load_dsets(dsets, bls=bls, pols=pols, file_type=file_type, verbose=verbose) utils.log("Loaded data in %1.1f sec." % (time.time() - t0), lvl=1, verbose=verbose) except ValueError: # at least one of the dset loads failed due to no data being present utils.log("One of the dset loads failed due to no data overlap given " "the bls and pols selection", verbose=verbose) return None assert np.all([isinstance(d, UVData) for d in dsets]), \ "dsets must be fed as a list of dataset string paths or UVData objects." # check dsets_std input if dsets_std is not None: err_msg = "input dsets_std must be a list of UVData objects or " \ "filepaths to miriad files" assert isinstance(dsets_std,(list, tuple, np.ndarray)), err_msg assert len(dsets_std) == Ndsets, "len(dsets_std) must equal len(dsets)" # load data if not UVData if not isinstance(dsets_std[0], UVData): try: # load data into UVData objects if fed as list of strings t0 = time.time() dsets_std = _load_dsets(dsets_std, bls=bls, pols=pols, file_type=file_type, verbose=verbose) utils.log("Loaded data in %1.1f sec." % (time.time() - t0), lvl=1, verbose=verbose) except ValueError: # at least one of the dsets_std loads failed due to no data # being present utils.log("One of the dsets_std loads failed due to no data overlap given " "the bls and pols selection", verbose=verbose) return None assert np.all([isinstance(d, UVData) for d in dsets_std]), err_msg # Check that the given spw ranges are valid, before doing anything too hefty. if spw_ranges is not None: for spw in spw_ranges: assert len(spw) == 2, f"spw_ranges must be a list of tuples of length 2. Got {spw}" assert 0 <= spw[0] < spw[1], f"spw_ranges must be a list of tuples of length 2, with both elements being non-negative integers of increasing value. Got {spw}" for dset in dsets: assert spw[1] <= dset.Nfreqs, f"spw_range of {spw} out of range for dset {dset.filename[0]} with the second element being less than the number of frequencies in the data" utils.log(f"Using spw_range: {np.squeeze(dsets[0].freq_array)[spw[0]] /1e6} - {np.squeeze(dsets[0].freq_array)[spw[1] - 1]/1e6} MHz", verbose=verbose) # read calibration if provided (calfits partial IO not yet supported) if cals is not None: if not isinstance(cals, (list, tuple)): cals = [cals for d in dsets] if not isinstance(cals[0], UVCal): t0 = time.time() cals = _load_cals(cals, verbose=verbose) utils.log("Loaded calibration in %1.1f sec." % (time.time() - t0), lvl=1, verbose=verbose) err_msg = "cals must be a list of UVCal, filepaths, or list of filepaths" assert np.all([isinstance(c, UVCal) for c in cals]), err_msg # configure polarization if pol_pairs is None: unique_pols = np.unique(np.hstack([d.polarization_array for d in dsets])) unique_pols = [uvutils.polnum2str(up) for up in unique_pols] pol_pairs = [(up, up) for up in unique_pols] assert len(pol_pairs) > 0, "no pol_pairs specified" # load beam if isinstance(beam, str): beam = pspecbeam.PSpecBeamUV(beam, cosmo=cosmo) # beam and cosmology check if beam is not None: assert isinstance(beam, pspecbeam.PSpecBeamBase) if cosmo is not None: beam.cosmo = cosmo # package into PSpecData ds = PSpecData(dsets=dsets, wgts=[None for d in dsets], labels=dset_labels, dsets_std=dsets_std, beam=beam, cals=cals, cal_flag=cal_flag) # erase calibration as they are no longer needed del cals # trim dset LSTs if trim_dset_lsts: ds.trim_dset_lsts() # interleave times if interleave_times: if len(ds.dsets) != 1: raise ValueError("interleave_times only applicable for Ndsets == 1") Ntimes = ds.dsets[0].Ntimes # get smallest Ntimes Ntimes -= Ntimes % 2 # make it an even number # update dsets ds.dsets.append(ds.dsets[0].select(times=np.unique(ds.dsets[0].time_array)[1:Ntimes:2], inplace=False)) ds.dsets[0].select(times=np.unique(ds.dsets[0].time_array)[0:Ntimes:2], inplace=True) ds.labels.append("dset1") # update dsets_std if ds.dsets_std[0] is None: ds.dsets_std.append(None) else: ds.dsets_std.append(ds.dsets_std[0].select(times=np.unique(ds.dsets_std[0].time_array)[1:Ntimes:2], inplace=False)) ds.dsets_std[0].select(times=np.unique(ds.dsets_std[0].time_array)[0:Ntimes:2], inplace=True) # wgts is currently always None ds.wgts.append(None) dset_pairs = [(0, 1)] dsets = ds.dsets dsets_std = ds.dsets_std wgts = ds.wgts dset_labels = ds.labels # rephase if desired if rephase_to_dset is not None: ds.rephase_to_dset(rephase_to_dset) # broadcast flags if broadcast_dset_flags: ds.broadcast_dset_flags(time_thresh=time_thresh, spw_ranges=spw_ranges) # perform Jy to mK conversion if desired if Jy2mK: ds.Jy_to_mK() # Print warning if auto_bls is set to exclude correlations of the # same baseline with itself, because this may cause a bias if one # is already cross-correlating different times to avoid noise bias. # See issue #160 on hera_pspec repo if exclude_auto_bls: raise_warning("Skipping the cross-multiplications of a baseline " "with itself may cause a bias if one is already " "cross-correlating different times to avoid the " "noise bias. Please see hera_pspec github issue 160 " "to make sure you know what you are doing! " "https://github.com/HERA-Team/hera_pspec/issues/160", verbose=verbose) # check dset pair type err_msg = "dset_pairs must be fed as a list of len-2 integer tuples" assert isinstance(dset_pairs, list), err_msg assert np.all([isinstance(d, tuple) for d in dset_pairs]), err_msg # Get baseline-pairs to use for each dataset pair bls1_list, bls2_list = [], [] for i, dsetp in enumerate(dset_pairs): # get bls if blpairs not fed if blpairs is None: (bls1, bls2, blps, xants1, xants2) = utils.calc_blpair_reds( dsets[dsetp[0]], dsets[dsetp[1]], filter_blpairs=True, exclude_auto_bls=exclude_auto_bls, exclude_cross_bls=exclude_cross_bls, exclude_permutations=exclude_permutations, Nblps_per_group=Nblps_per_group, bl_len_range=bl_len_range, bl_deg_range=bl_deg_range, include_autocorrs=include_autocorrs, include_crosscorrs=include_crosscorrs, bl_tol=bl_error_tol, xant_flag_thresh=xant_flag_thresh) bls1_list.append(bls1) bls2_list.append(bls2) # ensure fed blpairs exist in each of the datasets else: dset1_bls = dsets[dsetp[0]].get_antpairs() dset2_bls = dsets[dsetp[1]].get_antpairs() _bls1 = [] _bls2 = [] for _bl1, _bl2 in zip(bls1, bls2): if (_bl1 in dset1_bls or _bl1[::-1] in dset1_bls) \ and (_bl2 in dset2_bls or _bl2[::-1] in dset2_bls): _bls1.append(_bl1) _bls2.append(_bl2) bls1_list.append(_bls1) bls2_list.append(_bls2) # Open PSpecContainer to store all output in logger.info(f"Opening {filename} in transactional mode") psc = container.PSpecContainer(filename, mode='rw', keep_open=False, tsleep=tsleep, maxiter=maxiter) # assign group name if groupname is None: groupname = '_'.join(dset_labels) # Loop over dataset combinations for i, dset_idxs in enumerate(dset_pairs): # check bls lists aren't empty if len(bls1_list[i]) == 0 or len(bls2_list[i]) == 0: continue # Run OQE uvp = ds.pspec(bls1_list[i], bls2_list[i], dset_idxs, pol_pairs, symmetric_taper=symmetric_taper, spw_ranges=spw_ranges, n_dlys=n_dlys, r_params=r_params, store_cov=store_cov, store_cov_diag=store_cov_diag, input_data_weight=input_data_weight, exact_norm=exact_norm, sampling=sampling, return_q=return_q, cov_model=cov_model, known_cov=known_cov, norm=norm, taper=taper, history=history, verbose=verbose, filter_extensions=filter_extensions, store_window=store_window, exact_windows=exact_windows, ftbeam=ftbeam) # Store output psname = f'{dset_labels[dset_idxs[0]]}_x_{dset_labels[dset_idxs[1]]}{psname_ext}' # write in transactional mode logger.info(f"Storing {psname}") psc.set_pspec(group=groupname, psname=psname, pspec=uvp, overwrite=overwrite) return ds def get_pspec_run_argparser(): a = argparse.ArgumentParser(description="argument parser for pspecdata.pspec_run()") def list_of_int_tuples(v): """Format for parsing lists of integer pairs for different OQE args. Two acceptable formats are Ex1: '0~0,1~1' --> [(0, 0), (1, 1), ...] and Ex2: '0 0, 1 1' --> [(0, 0), (1, 1), ...]""" if '~' in v: v = [tuple([int(_x) for _x in x.split('~')]) for x in v.split(",")] else: v = [tuple([int(_x) for _x in x.split()]) for x in v.split(",")] return v def list_of_str_tuples(v): """Lists of string 2-tuples for various OQE args (ex. Polarization pairs). Two acceptable formats are Ex1: 'xx~xx,yy~yy' --> [('xx', 'xx'), ('yy', 'yy'), ...] and Ex2: 'xx xx, yy yy' --> [('xx', 'xx'), ('yy', 'yy'), ...]""" if '~' in v: v = [tuple([str(_x) for _x in x.split('~')]) for x in v.split(",")] else: v = [tuple([str(_x) for _x in x.split()]) for x in v.split(",")] return v def list_of_tuple_tuples(v): """List of tuple tuples for various OQE args (ex. baseline pair lists). Two acceptable formats are Ex1: '1~2~3~4,5~6~7~8' --> [((1 2), (3, 4)), ((5, 6), (7, 8)), ...] and Ex2: '1 2 3 4, 5 6 7 8' --> [((1 2), (3, 4)), ((5, 6), (7, 8)), ...])""" if '~' in v: v = [tuple([int(_x) for _x in x.split('~')]) for x in v.split(",")] else: v = [tuple([int(_x) for _x in x.split()]) for x in v.split(",")] v = [(x[:2], x[2:]) for x in v] return v a.add_argument("dsets", nargs='*', help="List of UVData objects or miriad filepaths.") a.add_argument("filename", type=str, help="Output filename of HDF5 container.") a.add_argument("--dsets_std", nargs='*', default=None, type=str, help="List of miriad filepaths to visibility standard deviations.") a.add_argument("--groupname", default=None, type=str, help="Groupname for the UVPSpec objects in the HDF5 container.") a.add_argument("--dset_pairs", default=None, type=list_of_int_tuples, help="List of dset pairings for OQE. Two acceptable formats are " "Ex1: '0~0,1~1' --> [(0, 0), (1, 1), ...] and " "Ex2: '0 0, 1 1' --> [(0, 0), (1, 1), ...]") a.add_argument("--dset_labels", default=None, type=str, nargs='*', help="List of string labels for each input dataset.") a.add_argument("--spw_ranges", default=None, type=list_of_int_tuples, help="List of spw channel selections. Two acceptable formats are " "Ex1: '200~300,500~650' --> [(200, 300), (500, 650), ...] and " "Ex2: '200 300, 500 650' --> [(200, 300), (500, 650), ...]") a.add_argument("--n_dlys", default=None, type=int, nargs='+', help="List of integers specifying number of delays to use per spectral window selection.") a.add_argument("--pol_pairs", default=None, type=list_of_str_tuples, help="List of pol-string pairs to use in OQE. Two acceptable formats are " "Ex1: 'xx~xx,yy~yy' --> [('xx', 'xx'), ('yy', 'yy'), ...] and " "Ex2: 'xx xx, yy yy' --> [('xx', 'xx'), ('yy', 'yy'), ...]") a.add_argument("--blpairs", default=None, type=list_of_tuple_tuples, help="List of baseline-pair antenna integers to run OQE on. Two acceptable formats are " "Ex1: '1~2~3~4,5~6~7~8' --> [((1 2), (3, 4)), ((5, 6), (7, 8)), ...] and " "Ex2: '1 2 3 4, 5 6 7 8' --> [((1 2), (3, 4)), ((5, 6), (7, 8)), ...]") a.add_argument("--input_data_weight", default='identity', type=str, help="Data weighting for OQE. See PSpecData.pspec for details.") a.add_argument("--norm", default='I', type=str, help='M-matrix normalization type for OQE. See PSpecData.pspec for details.') a.add_argument("--taper", default='none', type=str, help="Taper function to use in OQE delay transform. See PSpecData.pspec for details.") a.add_argument("--beam", default=None, type=str, help="Filepath to UVBeam healpix map of antenna beam.") a.add_argument("--cosmo", default=None, nargs='+', type=float, help="List of float values for [Om_L, Om_b, Om_c, H0, Om_M, Om_k].") a.add_argument("--rephase_to_dset", default=None, type=int, help="dset integer index to phase all other dsets to. Default is no rephasing.") a.add_argument("--trim_dset_lsts", default=False, action='store_true', help="Trim non-overlapping dset LSTs.") a.add_argument("--broadcast_dset_flags", default=False, action='store_true', help="Broadcast dataset flags across time according to time_thresh.") a.add_argument("--time_thresh", default=0.2, type=float, help="Fractional flagging threshold across time to trigger flag broadcast if broadcast_dset_flags is True") a.add_argument("--Jy2mK", default=False, action='store_true', help="Convert datasets from Jy to mK if a beam model is provided.") a.add_argument("--exclude_auto_bls", default=False, action='store_true', help='If blpairs is not provided, exclude all baselines paired with itself.') a.add_argument("--exclude_cross_bls", default=False, action='store_true', help='If blpairs is not provided, exclude all baselines paired with a different baseline.') a.add_argument("--exclude_permutations", default=False, action='store_true', help='If blpairs is not provided, exclude a basline-pair permutations. Ex: if (A, B) exists, exclude (B, A).') a.add_argument("--Nblps_per_group", default=None, type=int, help="If blpairs is not provided and group == True, set the number of blpairs in each group.") a.add_argument("--bl_len_range", default=(0, 1e10), nargs='+', type=float, help="If blpairs is not provided, limit the baselines used based on their minimum and maximum length in meters.") a.add_argument("--bl_deg_range", default=(0, 180), nargs='+', type=float, help="If blpairs is not provided, limit the baseline used based on a min and max angle cut in ENU frame in degrees.") a.add_argument("--bl_error_tol", default=1.0, type=float, help="If blpairs is not provided, this is the error tolerance in forming redundant baseline groups in meters.") a.add_argument("--store_cov", default=False, action='store_true', help="Compute and store covariance of bandpowers given dsets_std files or empirical covariance.") a.add_argument("--store_cov_diag", default=False, action='store_true', help="Compute and store the error bars calculated by QE formalism.") a.add_argument("--return_q", default=False, action='store_true', help="Return unnormalized bandpowers given dsets files.") a.add_argument("--overwrite", default=False, action='store_true', help="Overwrite output if it exists.") a.add_argument("--cov_model", default='empirical', type=str, help="Model for computing covariance, currently supports empirical or dsets") a.add_argument("--psname_ext", default='', type=str, help="Extension for pspectra name in PSpecContainer.") a.add_argument("--verbose", default=False, action='store_true', help="Report feedback to standard output.") a.add_argument("--file_type", default="uvh5", help="filetypes of input UVData. Default is 'uvh5'") a.add_argument("--filter_extensions", default=None, type=list_of_int_tuples, help="List of spw filter extensions wrapped in quotes. Ex:20~20,40~40' ->> [(20, 20), (40, 40), ...]") a.add_argument("--symmetric_taper", default=True, type=bool, help="If True, apply sqrt of taper before foreground filtering and then another sqrt after. If False, apply full taper after foreground Filter. ") a.add_argument("--include_autocorrs", default=False, action="store_true", help="Include power spectra of autocorr visibilities.") a.add_argument("--exclude_crosscorrs", default=False, action="store_true", help="If True, exclude cross-correlations from power spectra (autocorr power spectra only).") a.add_argument("--interleave_times", default=False, action="store_true", help="Cross multiply even/odd time intervals.") a.add_argument("--xant_flag_thresh", default=0.95, type=float, help="fraction of baseline waterfall that needs to be flagged for entire baseline to be flagged (and excluded from pspec)") a.add_argument("--store_window", default=False, action="store_true", help="store window function array.") a.add_argument("--allow_fft", default=False, action="store_true", help="use an FFT to comptue q-hat.") return a def validate_blpairs(blpairs, uvd1, uvd2, baseline_tol=1.0, verbose=True): """ Validate baseline pairings in the blpair list are redundant within the specified tolerance. Parameters ---------- blpairs : list of baseline-pair tuples Ex. [((1,2),(1,2)), ((2,3),(2,3))] See docstring of PSpecData.pspec() for details on format. uvd1, uvd2 : UVData UVData instances containing visibility data that first/second bl in blpair will draw from baseline_tol : float, optional Distance tolerance for notion of baseline "redundancy" in meters. Default: 1.0. verbose : bool, optional If True report feedback to stdout. Default: True. """ # ensure uvd1 and uvd2 are UVData objects if isinstance(uvd1, UVData) == False: raise TypeError("uvd1 must be a UVData instance") if isinstance(uvd2, UVData) == False: raise TypeError("uvd2 must be a UVData instance") # get antenna position dictionary ap1, a1 = uvd1.get_ENU_antpos(pick_data_ants=True) ap2, a2 = uvd2.get_ENU_antpos(pick_data_ants=True) ap1 = dict(zip(a1, ap1)) ap2 = dict(zip(a2, ap2)) # ensure shared antenna keys match within tolerance shared = sorted(set(ap1.keys()) & set(ap2.keys())) for k in shared: assert np.linalg.norm(ap1[k] - ap2[k]) <= baseline_tol, \ "uvd1 and uvd2 don't agree on antenna positions within " \ "tolerance of {} m".format(baseline_tol) ap = ap1 ap.update(ap2) # iterate through baselines and check baselines crossed with each other # are within tolerance for i, blg in enumerate(blpairs): if isinstance(blg, tuple): blg = [blg] for blp in blg: bl1_vec = ap[blp[0][0]] - ap[blp[0][1]] bl2_vec = ap[blp[1][0]] - ap[blp[1][1]] if np.linalg.norm(bl1_vec - bl2_vec) >= baseline_tol: raise_warning("blpair {} exceeds redundancy tolerance of " "{} m".format(blp, baseline_tol), verbose=verbose) def raise_warning(warning, verbose=True): """ Warning function. """ if verbose: print(warning) def _load_dsets(fnames, bls=None, pols=None, logf=None, verbose=True, file_type='miriad', cals=None, cal_flag=True): """ Helper function for loading UVData-compatible datasets in pspec_run. Parameters ---------- fnames : list of str, or list of list of str Filenames of load. if an element in fnames is a list of str load them all in one call bls : list of tuples Baselines to load. Default is all. pols : list of str Polarizations to load, default is all. logf : file descriptor Log file to write to verbose : bool Report output to logfile. file_type : str File type of input files. Returns ------- list List of UVData objects """ ### TODO: data loading for cross-polarization power ### spectra is sub-optimal: only dset1 pol1 and dset2 pol2 ### is needed instead of pol1 & pol2 for dset1 & dset2 dsets = [] Ndsets = len(fnames) for i, dset in enumerate(fnames): utils.log("Reading {} / {} datasets...".format(i+1, Ndsets), f=logf, lvl=1, verbose=verbose) # read data uvd = UVData() if isinstance(dset, str): dfiles = glob.glob(dset) else: dfiles = dset uvd.read(dfiles, bls=bls, polarizations=pols, file_type=file_type, use_future_array_shapes=True) uvd.extra_keywords['filename'] = json.dumps(dfiles) dsets.append(uvd) return dsets def _load_cals(cnames, logf=None, verbose=True): """ Helper function for loading calibration files. Parameters ---------- cnames : list of str, or list of list of str Calfits filepaths to load. If an element in cnames is a list, load it all at once. logf : file descriptor Log file to write to. verbose : bool Report feedback to log file. Returns ------- list List of UVCal objects """ cals = [] Ncals = len(cnames) for i, cfile in enumerate(cnames): utils.log("Reading {} / {} calibrations...".format(i+1, Ncals), f=logf, lvl=1, verbose=verbose) # read data uvc = UVCal() if isinstance(cfile, str): uvc.read_calfits(glob.glob(cfile), use_future_array_shapes=True) else: uvc.read_calfits(cfile, use_future_array_shapes=True) uvc.extra_keywords['filename'] = json.dumps(cfile) cals.append(uvc) return cals