import numpy as np
import pyuvdata
import copy
from collections import OrderedDict as odict
import astropy.units as u
import astropy.constants as c
from pyuvdata import UVData
import uvtools
from . import conversions, uvpspec, utils
[docs]def delay_spectrum(uvp, blpairs, spw, pol, average_blpairs=False,
average_times=False, fold=False, plot_noise=False,
delay=True, deltasq=False, legend=False, ax=None,
component='real', lines=True, markers=False, error=None,
times=None, logscale=True, force_plot=False,
label_type='blpairt', plot_stats=None, **kwargs):
"""
Plot a 1D delay spectrum (or spectra) for a group of baselines.
Parameters
----------
uvp : UVPspec
UVPSpec object, containing delay spectra for a set of baseline-pairs,
times, polarizations, and spectral windows.
blpairs : list of tuples or lists of tuples
List of baseline-pair tuples, or groups of baseline-pair tuples.
spw, pol : int or str
Which spectral window and polarization to plot.
average_blpairs : bool, optional
If True, average over the baseline pairs within each group.
average_times : bool, optional
If True, average spectra over the time axis. Default: False.
fold : bool, optional
Whether to fold the power spectrum in :math:`|k_\parallel|`.
Default: False.
plot_noise : bool, optional
Whether to plot noise power spectrum curves or not. Default: False.
delay : bool, optional
Whether to plot the power spectrum in delay units (ns) or cosmological
units (h/Mpc). Default: True.
deltasq : bool, optional
If True, plot dimensionless power spectra, Delta^2. This is ignored if
delay=True. Default: False.
legend : bool, optional
Whether to switch on the plot legend. Default: False.
ax : matplotlib.axes, optional
Use this to pass in an existing Axes object, which the power spectra
will be added to. (Warning: Labels and legends will not be altered in
this case, even if the existing plot has completely different axis
labels etc.) If None, a new Axes object will be created. Default: None.
component : str, optional
Component of complex spectra to plot, options=['abs', 'real', 'imag'].
Default: 'real'.
lines : bool, optional
If True, plot lines between bandpowers for a given pspectrum.
Default: True.
markers : bool, optional
If True, plot circles at bandpowers. Filled for positive, empty
for negative. Default: False.
error : str, optional
If not None and if error exists in uvp stats_array, plot errors
on bandpowers. Default: None.
times : array_like, optional
Float ndarray containing elements from time_avg_array to plot.
logscale : bool, optional
If True, put plot on a log-scale. Else linear scale. Default: True.
force_plot : bool, optional
If plotting a large number of spectra (>100), this function will error.
Set this to True to override this large plot error and force plot.
Default: False.
label_type : int, optional
Line label type in legend, options=['key', 'blpair', 'blpairt'].
key : Label lines based on (spw, blpair, pol) key.
blpair : Label lines based on blpair.
blpairt : Label lines based on blpair-time.
plot_stats : string, optional
If not None, plot an entry in uvp.stats_array instead
of power spectrum in uvp.data_array.
kwargs : dict, optional
Extra kwargs to pass to _all_ ax.plot calls.
Returns
-------
fig : matplotlib.pyplot.Figure
Matplotlib Figure instance.
"""
import matplotlib
import matplotlib.pyplot as plt
# Create new Axes if none specified
new_plot = False
if ax is None:
new_plot = True
fig, ax = plt.subplots(1, 1)
# Select times if requested
if times is not None:
uvp = uvp.select(times=times, inplace=False)
# Add ungrouped baseline-pairs into a group of their own (expected by the
# averaging routines)
blpairs_in = blpairs
blpairs = [] # Must be a list, not an array
for i, blpgrp in enumerate(blpairs_in):
if not isinstance(blpgrp, list):
blpairs.append([blpairs_in[i],])
else:
blpairs.append(blpairs_in[i])
# Average over blpairs or times if requested
blpairs_in = copy.deepcopy(blpairs) # Save input blpair list
if average_blpairs:
uvp_plt = uvp.average_spectra(blpair_groups=blpairs,
time_avg=average_times, inplace=False)
else:
uvp_plt = copy.deepcopy(uvp)
if average_times:
# Average over times, but not baseline-pairs
uvp_plt.average_spectra(time_avg=True, inplace=True)
# Check plot size
if uvp_plt.Ntimes * len(blpairs) > 100 and force_plot == False:
raise ValueError("Trying to plot > 100 spectra... Set force_plot=True to continue.")
# Fold the power spectra if requested
if fold:
uvp_plt.fold_spectra()
# Convert to Delta^2 units if requested
if deltasq and not delay:
uvp_plt.convert_to_deltasq()
# Get x-axis units (delays in ns, or k_parallel in Mpc^-1 or h Mpc^-1)
if delay:
dlys = uvp_plt.get_dlys(spw) * 1e9 # ns
x = dlys
else:
k_para = uvp_plt.get_kparas(spw)
x = k_para
# Check plot_stats
if plot_stats is not None:
assert plot_stats in uvp_plt.stats_array, "specified key {} not found in stats_array".format(plot_stats)
# Plot power spectra
for blgrp in blpairs:
# Loop over blpairs in group and plot power spectrum for each one
for blp in blgrp:
# setup key and casting function
key = (spw, blp, pol)
if component == 'real':
cast = np.real
elif component == 'imag':
cast = np.imag
elif component == 'abs':
cast = np.abs
# get data array and repeat x array
if plot_stats is None:
data = cast(uvp_plt.get_data(key))
else:
data = cast(uvp_plt.get_stats(plot_stats, key))
# flag records that have zero integration
flags = np.isclose(uvp_plt.get_integrations(key), 0.0)
data[flags] = np.nan
# get errs if requessted
if error is not None and hasattr(uvp_plt, 'stats_array'):
if error in uvp_plt.stats_array:
errs = uvp_plt.get_stats(error, key)
errs[flags] = np.nan
# get times
blptimes = uvp_plt.time_avg_array[uvp_plt.blpair_to_indices(blp)]
# iterate over integrations per blp
for i in range(data.shape[0]):
# get y data
y = data[i]
t = blptimes[i]
# form label
if label_type == 'key':
label = "{}".format(key)
elif label_type == 'blpair':
label = "{}".format(blp)
elif label_type == 'blpairt':
label = "{}, {:0.5f}".format(blp, t)
else:
raise ValueError("Couldn't undestand label_type {}".format(label_type))
# plot elements
cax = None
if lines:
if logscale:
_y = np.abs(y)
else:
_y = y
cax, = ax.plot(x, _y, marker='None', label=label, **kwargs)
if markers:
if cax is None:
c = None
else:
c = cax.get_color()
if lines:
label = None
# plot markers
if logscale:
# plot positive w/ filled circles
cax, = ax.plot(x[y >= 0], np.abs(y[y >= 0]), c=c, ls='None', marker='o',
markerfacecolor=c, markeredgecolor=c, label=label, **kwargs)
# plot negative w/ unfilled circles
c = cax.get_color()
cax, = ax.plot(x[y < 0], np.abs(y[y < 0]), c=c, ls='None', marker='o',
markerfacecolor='None', markeredgecolor=c, **kwargs)
else:
cax, = ax.plot(x, y, c=c, ls='None', marker='o',
markerfacecolor=c, markeredgecolor=c, label=label, **kwargs)
if error is not None and hasattr(uvp_plt, 'stats_array'):
if error in uvp_plt.stats_array:
if cax is None:
c = None
else:
c = cax.get_color()
if logscale:
_y = np.abs(y)
else:
_y = y
cax = ax.errorbar(x, _y, fmt='none', ecolor=c, yerr=cast(errs[i]), **kwargs)
else:
raise KeyError("Error variable '%s' not found in stats_array of UVPSpec object." % error)
# If blpairs were averaged, only the first blpair in the group
# exists any more (so skip the rest)
if average_blpairs: break
# Set log scale
if logscale:
ax.set_yscale('log')
# Add legend
if legend:
ax.legend(loc='upper left')
# Add labels with units
if ax.get_xlabel() == "":
if delay:
ax.set_xlabel(r"$\tau$ $[{\rm ns}]$", fontsize=16)
else:
ax.set_xlabel("$k_{\parallel}\ h\ Mpc^{-1}$", fontsize=16)
if ax.get_ylabel() == "" and plot_stats is None:
# Sanitize power spectrum units
psunits = uvp_plt.units
if "h^-1" in psunits: psunits = psunits.replace("h^-1", "h^{-1}")
if "h^-3" in psunits: psunits = psunits.replace("h^-3", "h^{-3}")
if "Mpc" in psunits and "\\rm" not in psunits:
psunits = psunits.replace("Mpc", r"{\rm Mpc}")
if "pi" in psunits and "\\pi" not in psunits:
psunits = psunits.replace("pi", r"\pi")
# Power spectrum type
if deltasq:
ax.set_ylabel("$\Delta^2$ $[%s]$" % psunits, fontsize=16)
else:
ax.set_ylabel("$P(k_\parallel)$ $[%s]$" % psunits, fontsize=16)
# Return Figure: the axis is an attribute of figure
if new_plot:
return fig
def delay_waterfall(uvp, blpairs, spw, pol, component='abs-real',
average_blpairs=False, fold=False, delay=True,
deltasq=False, log=True, lst_in_hrs=True,
vmin=None, vmax=None, cmap='YlGnBu', axes=None,
figsize=(14, 6), force_plot=False, times=None,
title_type='blpair', colorbar=True, **kwargs):
"""
Plot a 1D delay spectrum waterfall (or spectra) for a group of baselines.
Parameters
----------
uvp : UVPspec
UVPSpec object, containing delay spectra for a set of baseline-pairs,
times, polarizations, and spectral windows.
blpairs : list of tuples or lists of tuples
List of baseline-pair tuples, or groups of baseline-pair tuples.
spw, pol : int or str
Which spectral window and polarization to plot.
component : str
Component of complex spectra to plot, options=['abs', 'real', 'imag', 'abs-real', 'abs-imag'].
abs-real is abs(real(data)), whereas 'real' is real(data)
Default: 'abs-real'.
average_blpairs : bool, optional
If True, average over the baseline pairs within each group.
fold : bool, optional
Whether to fold the power spectrum in :math:`|k_\parallel|`.
Default: False.
delay : bool, optional
Whether to plot the power spectrum in delay units (ns) or cosmological
units (h/Mpc). Default: True.
deltasq : bool, optional
If True, plot dimensionless power spectra, Delta^2. This is ignored if
delay=True. Default: False.
log : bool, optional
Whether to plot the log10 of the data. Default: True.
lst_in_hrs : bool, optional
If True, LST is plotted in hours, otherwise its plotted in radians.
vmin, vmax : float, optional
Clip the color scale of the delay spectrum to these min./max. values.
If None, use the natural range of the data. Default: None.
cmap : str, optional
Matplotlib colormap to use. Default: 'YlGnBu'.
axes : array of matplotlib.axes, optional
Use this to pass in an existing Axes object or array of axes, which
the power spectra will be added to. (Warning: Labels and legends will
not be altered in this case, even if the existing plot has completely different axis
labels etc.) If None, a new Axes object will be created. Default: None.
figsize : tuple
len-2 integer tuple specifying figure size if axes is None
force_plot : bool
If plotting a large number of blpairs (>20), this routine will quit
unless force_plot == True.
times : array_like, optional
Float ndarray containing elements from time_avg_array to plot.
title_type : str, optional
Type of title to put above plot(s). Options = ['blpair', 'blvec']
blpair : "bls: {bl1} x {bl2}"
blvec : "bl len {len} m & ang {ang} deg"
colorbar : bool, optional
Whether to make a colorbar. Default: True
kwargs : keyword arguments
Additional kwargs to pass to ax.matshow()
Returns
-------
fig : matplotlib.pyplot.Figure
Matplotlib Figure instance if input ax is None.
"""
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.axes
# assert component
assert component in ['real', 'abs', 'imag', 'abs-real', 'abs-imag'], "Can't parse specified component {}".format(component)
fix_negval = component in ['real', 'imag'] and log
# Add ungrouped baseline-pairs into a group of their own (expected by the
# averaging routines)
blpairs_in = blpairs
blpairs = [] # Must be a list, not an array
for i, blpgrp in enumerate(blpairs_in):
if not isinstance(blpgrp, list):
blpairs.append([blpairs_in[i],])
else:
blpairs.append(blpairs_in[i])
# iterate through and make sure they are blpair integers
_blpairs = []
for blpgrp in blpairs:
_blpgrp = []
for blp in blpgrp:
if isinstance(blp, tuple):
blp_int = uvp.antnums_to_blpair(blp)
else:
blp_int = blp
_blpgrp.append(blp_int)
_blpairs.append(_blpgrp)
blpairs = _blpairs
# Select times if requested
if times is not None:
uvp = uvp.select(times=times, inplace=False)
# Average over blpairs or times if requested
blpairs_in = copy.deepcopy(blpairs) # Save input blpair list
if average_blpairs:
uvp_plt = uvp.average_spectra(blpair_groups=blpairs,
time_avg=False, inplace=False)
else:
uvp_plt = copy.deepcopy(uvp)
# Fold the power spectra if requested
if fold:
uvp_plt.fold_spectra()
# Convert to Delta^2 units if requested
if deltasq and not delay:
uvp_plt.convert_to_deltasq()
# Get x-axis units (delays in ns, or k_parallel in Mpc^-1 or h Mpc^-1)
if delay:
dlys = uvp_plt.get_dlys(spw) * 1e9 # ns
x = dlys
else:
k_para = uvp_plt.get_kparas(spw)
x = k_para
# Extract power spectra into array
waterfall = odict()
for blgrp in blpairs:
# Loop over blpairs in group and plot power spectrum for each one
for blp in blgrp:
# make key
key = (spw, blp, pol)
# get power data
power = uvp_plt.get_data(key, omit_flags=False)
# set flagged power data to nan
flags = np.isclose(uvp_plt.get_integrations(key), 0.0)
power[flags, :] = np.nan
# get component
if component == 'abs':
power = np.abs(power)
elif component == 'real':
power = np.real(power)
elif component == 'abs-real':
power = np.abs(np.real(power))
elif component == 'imag':
power = np.imag(power)
elif component == 'abs-imag':
power = np.abs(np.real(power))
# if real or imag and log is True, set negative values to near zero
# this is done so that one can use cmap.set_under() and cmap.set_bad() separately
if fix_negval:
power[power < 0] = np.abs(power).min() * 1e-6 + 1e-10
# assign to waterfall
waterfall[key] = power
# If blpairs were averaged, only the first blpair in the group
# exists any more (so skip the rest)
if average_blpairs: break
# check for reasonable number of blpairs to plot...
Nkeys = len(waterfall)
if Nkeys > 20 and force_plot == False:
raise ValueError("Nblps > 20 and force_plot == False, quitting...")
# Take logarithm of data if requested
if log:
for k in waterfall:
waterfall[k] = np.log10(np.abs(waterfall[k]))
logunits = "\log_{10}"
else:
logunits = ""
# Create new Axes if none specified
new_plot = False
if axes is None:
new_plot = True
# figure out how many subplots to make
Nkeys = len(waterfall)
Nside = int(np.ceil(np.sqrt(Nkeys)))
fig, axes = plt.subplots(Nside, Nside, figsize=figsize)
# Ensure axes is an ndarray
if isinstance(axes, matplotlib.axes.Axes):
axes = np.array([[axes]])
if isinstance(axes, list):
axes = np.array(axes)
# Ensure its 2D and get side lengths
if axes.ndim == 1:
axes = axes[:, None]
assert axes.ndim == 2, "input axes must have ndim == 2"
Nvert, Nhorz = axes.shape
# Get LST range: setting y-ticks is tricky due to LST wrapping...
y = uvp_plt.lst_avg_array[
uvp_plt.key_to_indices(list(waterfall.keys())[0])[1] ]
y = np.unwrap(y)
if y[0] > np.pi:
# if start is closer to 2pi than 0, lower axis by an octave
y -= 2 * np.pi
if lst_in_hrs:
lst_units = "Hr"
y *= 24 / (2 * np.pi)
else:
lst_units = "rad"
# get baseline vectors
blvecs = dict(zip([uvp_plt.bl_to_antnums(bl) for bl in uvp_plt.bl_array], uvp_plt.get_ENU_bl_vecs()))
# Sanitize power spectrum units
psunits = uvp_plt.units
if "h^-1" in psunits: psunits = psunits.replace("h^-1", "h^{-1}")
if "h^-3" in psunits: psunits = psunits.replace("h^-3", "h^{-3}")
if "Hz" in psunits: psunits = psunits.replace("Hz", r"{\rm Hz}")
if "str" in psunits: psunits = psunits.replace("str", r"\,{\rm str}")
if "Mpc" in psunits and "\\rm" not in psunits:
psunits = psunits.replace("Mpc", r"{\rm Mpc}")
if "pi" in psunits and "\\pi" not in psunits:
psunits = psunits.replace("pi", r"\pi")
if "beam normalization not specified" in psunits:
psunits = psunits.replace("beam normalization not specified",
r"{\rm unnormed}")
# Iterate over waterfall keys
keys = list(waterfall.keys())
k = 0
for i in range(Nvert):
for j in range(Nhorz):
# set ax
ax = axes[i, j]
# turn off subplot if no more plots to make
if k >= Nkeys:
ax.axis('off')
continue
# get blpair key for this subplot
key = keys[k]
blp = uvp_plt.blpair_to_antnums(key[1])
# plot waterfall
cax = ax.matshow(waterfall[key], cmap=cmap, aspect='auto',
vmin=vmin, vmax=vmax,
extent=[x[0], x[-1], y[-1], y[0]], **kwargs)
# ax config
ax.xaxis.set_ticks_position('bottom')
ax.tick_params(labelsize=12)
if ax.get_title() == '':
if title_type == 'blpair':
ax.set_title("bls: {} x {}".format(*blp), y=1)
elif title_type == 'blvec':
blv = 0.5 * (blvecs[blp[0]] + blvecs[blp[1]])
lens, angs = utils.get_bl_lens_angs([blv], bl_error_tol=1.0)
ax.set_title("bl len {len:0.2f} m & {ang:0.0f} deg".format(len=lens[0], ang=angs[0]), y=1)
# set colorbar
if colorbar:
if fix_negval:
cb_extend = 'min'
else:
cb_extend = 'neither'
cbar = ax.get_figure().colorbar(cax, ax=ax, extend=cb_extend)
cbar.ax.tick_params(labelsize=14)
if fix_negval:
cbar.ax.set_title("$< 0$",y=-0.05, fontsize=16)
# configure left-column plots
if j == 0:
# set yticks
ax.set_ylabel(r"LST [{}]".format(lst_units), fontsize=16)
else:
ax.set_yticklabels([])
# configure bottom-row plots
if k + Nhorz >= Nkeys:
if ax.get_xlabel() == "":
if delay:
ax.set_xlabel(r"$\tau$ $[{\rm ns}]$", fontsize=16)
else:
ax.set_xlabel("$k_{\parallel}\ h\ Mpc^{-1}$", fontsize=16)
else:
ax.set_xticklabels([])
k += 1
# make suptitle
if axes[0][0].get_figure()._suptitle is None:
if deltasq:
units = "$%s\Delta^2$ $[%s]$" % (logunits, psunits)
else:
units = "$%sP(k_\parallel)$ $[%s]$" % (logunits, psunits)
spwrange = np.around(np.array(uvp_plt.get_spw_ranges()[spw][:2]) / 1e6, 2)
axes[0][0].get_figure().suptitle("{}\n{} polarization | {} -- {} MHz".format(units, pol, *spwrange),
y=1.03, fontsize=14)
# Return Axes
if new_plot:
return fig
def delay_wedge(uvp, spw, pol, blpairs=None, times=None, error_weights=None, fold=False, delay=True,
rotate=False, component='abs-real', log10=True, loglog=False,
red_tol=1.0, center_line=False, horizon_lines=False,
title=None, ax=None, cmap='viridis', figsize=(8, 6),
deltasq=False, colorbar=False, cbax=None, vmin=None, vmax=None,
edgecolor='none', flip_xax=False, flip_yax=False, lw=2,
set_bl_tick_major=False, set_bl_tick_minor=False,
xtick_size=10, xtick_rot=0, ytick_size=10, ytick_rot=0,
**kwargs):
"""
Plot a 2D delay spectrum (or spectra) from a UVPSpec object. Note that
all integrations and redundant baselines are averaged (unless specifying
times) before plotting.
Note: this deepcopies input uvp before averaging.
Parameters
----------
uvp : UVPSpec
UVPSpec object containing delay spectra to plot.
spw : integer
Which spectral window to plot.
pol : int or tuple
Polarization-pair integer or tuple, e.g. ('pI', 'pI')
blpairs : list of tuples, optional
List of baseline-pair tuples to use in plotting.
times : list, optional
An ndarray or list of times from uvp.time_avg_array to
select on before plotting. Default: None.
error_weights : string, optional
error_weights specify which kind of errors we use for weights
during averaging power spectra.
fold : bool, optional
Whether to fold the power spectrum in k_parallel.
Default: False.
delay : bool, optional
Whether to plot the axes in tau (ns). If False, axes will
be plotted in cosmological units.
Default: True.
rotate : bool, optional
If False, use baseline-type as x-axis and delay as y-axis,
else use baseline-type as y-axis and delay as x-axis.
Default: False
component : str, optional
Component of complex spectra to plot. Options=['real', 'imag', 'abs', 'abs-real', 'abs-imag']
abs-real is abs(real(data)), whereas 'real' is real(data)
Default: 'abs-real'.
log10 : bool, optional
If True, take log10 of data before plotting. Default: True
loglog : bool, optional
If True, turn x-axis and y-axis into log-log scale. Default: False
red_tol : float, optional
Redundancy tolerance when solving for redundant groups in meters.
Default: 1.0
center_line : bool, optional
Whether to plot a dotted line at k_perp = 0.
Default: False.
horizon_lines : bool, optional
Whether to plot dotted lines along the horizon.
Default: False.
title : string, optional
Title for subplot. Default: None.
ax : matplotlib.axes, optional
If not None, use this axes as a subplot for delay wedge.
cmap : str, optional
Colormap of wedge plot. Default: 'viridis'
figsize : len-2 integer tuple, optional
If ax is None, this is the new figure size.
deltasq : bool, optional
Convert to Delta^2 before plotting. This is ignored if delay=True.
Default: False
colorbar : bool, optional
Add a colorbar to the plot. Default: False
cbax : matplotlib.axes, optional
Axis object for adding colorbar if True. Default: None
vmin : float, optional
Minimum range of colorscale. Default: None
vmax : float, optional
Maximum range of colorscale. Default: None
edgecolor : str, optional
Edgecolor of bins in pcolormesh. Default: 'none'
flip_xax : bool, optional
Flip xaxis if True. Default: False
flip_yax : bool, optional
Flip yaxis if True. Default: False
lw : int, optional
Line-width of horizon and center lines if plotted. Default: 2.
set_bl_tick_major : bool, optional
If True, use the baseline lengths as major ticks, rather than default
uniform grid.
set_bl_tick_minor : bool, optional
If True, use the baseline lengths as minor ticks, which have no labels.
kwargs : dictionary
Additional keyword arguments to pass to pcolormesh() call.
Returns
-------
fig : matplotlib.pyplot.Figure
Matplotlib Figure instance if ax is None.
"""
import matplotlib
import matplotlib.pyplot as plt
# type checking
uvp = copy.deepcopy(uvp)
assert isinstance(uvp, uvpspec.UVPSpec), "input uvp must be a UVPSpec object"
assert isinstance(spw, (int, np.integer))
assert isinstance(pol, (int, np.integer, tuple))
fix_negval = component in ['real', 'imag'] and log10
# check pspec units for little h
little_h = 'h^-3' in uvp.norm_units
# Create new ax if none specified
new_plot = False
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
new_plot = True
else:
fig = ax.get_figure()
# Select out times and blpairs if provided
if times is not None:
uvp.select(blpairs=blpairs, times=times, inplace=True)
# Average across redundant groups and time
# this also ensures blpairs are ordered from short_bl --> long_bl
blp_grps, lens, angs, tags = utils.get_blvec_reds(uvp, bl_error_tol=red_tol,
match_bl_lens=True)
uvp.average_spectra(blpair_groups=blp_grps, time_avg=True, error_weights=error_weights, inplace=True)
# get blpairs and order by len and enforce bl len ordering anyways
blpairs, blpair_seps = uvp.get_blpairs(), uvp.get_blpair_seps()
osort = np.argsort(blpair_seps)
blpairs, blpair_seps = [blpairs[oi] for oi in osort], blpair_seps[osort]
# Convert to DeltaSq
if deltasq and not delay:
uvp.convert_to_deltasq(inplace=True)
# Fold array
if fold:
uvp.fold_spectra()
# Format ticks
if delay:
x_axis = uvp.get_dlys(spw) * 1e9
y_axis = blpair_seps
else:
x_axis = uvp.get_kparas(spw, little_h=little_h)
y_axis = uvp.get_kperps(spw, little_h=little_h)
if rotate:
_x_axis = y_axis
y_axis = x_axis
x_axis = _x_axis
# Conigure Units
psunits = "({})^2\ {}".format(uvp.vis_units, uvp.norm_units)
if "h^-1" in psunits: psunits = psunits.replace("h^-1", "h^{-1}\ ")
if "h^-3" in psunits: psunits = psunits.replace("h^-3", "h^{-3}\ ")
if "Hz" in psunits: psunits = psunits.replace("Hz", r"{\rm Hz}\ ")
if "str" in psunits: psunits = psunits.replace("str", r"\,{\rm str}\ ")
if "Mpc" in psunits and "\\rm" not in psunits:
psunits = psunits.replace("Mpc", r"{\rm Mpc}")
if "pi" in psunits and "\\pi" not in psunits:
psunits = psunits.replace("pi", r"\pi")
if "beam normalization not specified" in psunits:
psunits = psunits.replace("beam normalization not specified",
r"{\rm unnormed}")
# get data with shape (Nblpairs, Ndlys)
data = [uvp.get_data((spw, blp, pol)).squeeze() for blp in blpairs]
# get component
if component == 'real':
data = np.real(data)
elif component == 'abs-real':
data = np.abs(np.real(data))
elif component == 'imag':
data = np.imag(data)
elif component == 'abs-imag':
data = np.abs(np.imag(data))
elif component == 'abs':
data = np.abs(data)
else:
raise ValueError("Did not understand component {}".format(component))
# if real or imag and log is True, set negative values to near zero
# this is done so that one can use cmap.set_under() and cmap.set_bad() separately
if fix_negval:
data[data < 0] = np.abs(data).min() * 1e-6 + 1e-10
# take log10
if log10:
data = np.log10(data)
# loglog
if loglog:
ax.set_xscale('log')
ax.set_yscale('log')
ax.yaxis.set_major_formatter(matplotlib.ticker.LogFormatterSciNotation())
ax.yaxis.set_minor_formatter(matplotlib.ticker.NullFormatter())
ax.xaxis.set_major_formatter(matplotlib.ticker.LogFormatterSciNotation())
ax.xaxis.set_minor_formatter(matplotlib.ticker.NullFormatter())
# rotate
if rotate:
data = np.rot90(data[:, ::-1], k=1)
# Get bin edges
xdiff = np.diff(x_axis)
x_edges = np.array([x_axis[0]-xdiff[0]/2.0] + list(x_axis[:-1]+xdiff/2.0) + [x_axis[-1]+xdiff[-1]/2.0])
ydiff = np.diff(y_axis)
y_edges = np.array([y_axis[0]-ydiff[0]/2.0] + list(y_axis[:-1]+ydiff/2.0) + [y_axis[-1]+ydiff[-1]/2.0])
X, Y = np.meshgrid(x_edges, y_edges)
# plot
cax = ax.pcolormesh(X, Y, data, cmap=cmap, edgecolor=edgecolor, lw=0.01,
vmin=vmin, vmax=vmax, **kwargs)
# Configure ticks
if set_bl_tick_major:
if rotate:
ax.set_xticks([np.around(x, _get_sigfig(x)+2) for x in x_axis])
else:
ax.set_yticks([np.around(x, _get_sigfig(x)+2) for x in y_axis])
if set_bl_tick_minor:
if rotate:
ax.set_xticks([np.around(x, _get_sigfig(x)+2) for x in x_axis],
minor=True)
else:
ax.set_yticks([np.around(x, _get_sigfig(x)+2) for x in y_axis],
minor=True)
# Add colorbar
if colorbar:
if fix_negval:
cb_extend = 'min'
else:
cb_extend = 'neither'
if cbax is None:
cbax = ax
cbar = fig.colorbar(cax, ax=cbax, extend=cb_extend)
if deltasq:
p = "\Delta^2"
else:
p = "P"
if delay:
p = "{}({},\ {})".format(p, r'\tau', r'|\vec{b}|')
else:
p = "{}({},\ {})".format(p, r'k_\parallel', r'k_\perp')
if log10:
psunits = r"$\log_{{10}}\ {}\ [{}]$".format(p, psunits)
else:
psunits = r"${}\ [{}]$".format(p, psunits)
cbar.set_label(psunits, fontsize=14)
if fix_negval:
cbar.ax.set_title("$< 0$",y=-0.05, fontsize=16)
# Configure tick labels
if delay:
xlabel = r"$\tau$ $[{\rm ns}]$"
ylabel = r"$|\vec{b}|$ $[{\rm m}]$"
else:
xlabel = r"$k_{\parallel}\ [h\ \rm Mpc^{-1}]$"
ylabel = r"$k_{\perp}\ [h\ \rm Mpc^{-1}]$"
if rotate:
_xlabel = ylabel
ylabel = xlabel
xlabel = _xlabel
if ax.get_xlabel() == '':
ax.set_xlabel(xlabel, fontsize=16)
if ax.get_ylabel() == '':
ax.set_ylabel(ylabel, fontsize=16)
# Configure center line
if center_line:
if rotate:
ax.axhline(y=0, color='#000000', ls='--', lw=lw)
else:
ax.axvline(x=0, color='#000000', ls='--', lw=lw)
# Plot horizons
if horizon_lines:
# get horizon in ns
horizons = blpair_seps / conversions.units.c * 1e9
# convert to cosmological wave vector
if not delay:
# Get average redshift of spw
avg_z = uvp.cosmo.f2z(np.mean(uvp.freq_array[uvp.spw_to_freq_indices(spw)]))
horizons *= uvp.cosmo.tau_to_kpara(avg_z, little_h=little_h) / 1e9
# iterate over bins and plot lines
if rotate:
bin_edges = x_edges
else:
bin_edges = y_edges
for i, hor in enumerate(horizons):
if rotate:
ax.plot(bin_edges[i:i+2], [hor, hor], color='#ffffff', ls='--', lw=lw)
if not uvp.folded:
ax.plot(bin_edges[i:i+2], [-hor, -hor], color='#ffffff', ls='--', lw=lw)
else:
ax.plot([hor, hor], bin_edges[i:i+2], color='#ffffff', ls='--', lw=lw)
if not uvp.folded:
ax.plot([-hor, -hor], bin_edges[i:i+2], color='#ffffff', ls='--', lw=lw)
# flip axes
if flip_xax:
fig.sca(ax)
fig.gca().invert_xaxis()
if flip_yax:
fig.sca(ax)
fig.gca().invert_yaxis()
# add title
if title is not None:
ax.set_title(title, fontsize=12)
# Configure tick sizes and rotation
[tl.set_size(xtick_size) for tl in ax.get_xticklabels()]
[tl.set_rotation(xtick_rot) for tl in ax.get_xticklabels()]
[tl.set_size(ytick_size) for tl in ax.get_yticklabels()]
[tl.set_rotation(ytick_rot) for tl in ax.get_yticklabels()]
# return figure
if new_plot:
return fig
def plot_uvdata_waterfalls(uvd, basename, data='data', plot_mode='log',
vmin=None, vmax=None, recenter=False, format='png',
**kwargs):
"""
Plot waterfalls for all baselines and polarizations within a UVData object,
and save to individual files.
Parameters
----------
uvd : UVData object
Input data object. Waterfalls will be stored for all baselines and
polarizations within the object; use uvd.select() to remove unwanted
information.
basename : str
Base filename for the output plots. This must have two placeholders
for baseline ID ('bl') and polarization ('pol'),
e.g. basename='plots/uvdata.{pol}.{bl}'.
data : str, optional
Which data array to plot from the UVData object. Options are:
'data', 'flags', 'nsamples'. Default: 'data'.
plot_mode : str, optional
Plot mode, passed to uvtools.plot.waterfall. Default: 'log'.
vmin, vmax : float, optional
Min./max. values of the plot colorscale. Default: None (uses the
min./max. of the data).
recenter : bool, optional
Whether to apply recentering (see uvtools.plot.waterfall).
Default: False.
format : str, optional
The file format of the output images. If None, the image format will be
deduced from the basename string. Default: 'png'.
**kwargs : dict
Keyword arguments passed to uvtools.plot.waterfall, which passes them
on to matplotlib.imshow.
"""
import matplotlib
import matplotlib.pyplot as plt
assert isinstance(uvd, UVData), "'uvd' must be a UVData object."
assert data in ['data', 'flags', 'nsamples'], \
"'%s' not a valid data array; use 'data', 'flags', or 'nsamples'" \
% data
# Set plot colorscale max/min if specified
drng = None
if vmin is not None:
assert vmax is not None, "Must also specify vmax if vmin is specified."
drng = vmax - vmin
# Empty figure
fig, ax = plt.subplots(1, 1)
# Loop over antenna pairs and pols
for (ant1, ant2, pol), d in uvd.antpairpol_iter():
# Get chosen data array
if data == 'data':
pass
elif data == 'flags':
d = uvd.get_flags((ant1, ant2, pol))
elif data == 'nsamples':
d = uvd.get_nsamples((ant1, ant2, pol))
else:
raise KeyError("Invalid data array type '%s'" % data)
# Make plot
img = uvtools.plot.waterfall(d, mode=plot_mode, mx=vmax, drng=drng,
recenter=recenter, **kwargs)
fig.colorbar(img)
# Save to file
outfile = basename.format(bl="%d.%d"%(ant1, ant2), pol=pol)
if format is not None:
# Make sure format extension is given
if outfile[-len(format)].lower() != format.lower():
outfile = "%s.%s" % (outfile, format)
fig.tight_layout()
fig.savefig(outfile, format=format)
fig.clf()
def _get_sigfig(x):
return -int(np.floor(np.log10(np.abs(x))))
def _round_sigfig(x, up=True):
sigfigs = get_sigfig(x)
if up:
return np.ceil(10**sigfigs * x) / 10**sigfigs
else:
return np.floor(10**sigfigs * x) / 10**sigfigs