import copy
import numpy as np
import uvtools
from pyuvdata import UVData
from . import conversions, utils, uvpspec
def _is_antpair_tuple(value):
"""Return True for a baseline tuple like (ant1, ant2)."""
return (
isinstance(value, tuple)
and len(value) == 2
and all(isinstance(ant, (int, np.integer)) for ant in value)
)
def _is_blpair_tuple(value):
"""Return True for a blpair tuple like ((ant1, ant2), (ant3, ant4))."""
return (
isinstance(value, tuple)
and len(value) == 2
and all(_is_antpair_tuple(antpair) for antpair in value)
)
def _normalize_plot_blpairs(uvp, blpairs, func_name):
"""
Normalize plotting blpair input to a list of lists of blpair integers.
"""
normalized = []
for blpgrp in blpairs:
if isinstance(blpgrp, list):
group = blpgrp
elif isinstance(blpgrp, (int, np.integer)) or _is_blpair_tuple(blpgrp):
group = [blpgrp]
elif _is_antpair_tuple(blpgrp):
raise ValueError(
f"{func_name} blpairs must be baseline-pair tuples or groups of "
"baseline-pair tuples/integers. For a single blpair, use "
"[((ant1, ant2), (ant3, ant4))]; to average a group, use "
"[[blpair1, blpair2, ...]]."
)
else:
raise TypeError(
f"{func_name} blpairs must be an iterable of baseline-pair tuples, "
"blpair integers, or lists of those values."
)
normalized_group = []
for blp in group:
if isinstance(blp, (int, np.integer)):
normalized_group.append(blp)
elif _is_blpair_tuple(blp):
normalized_group.append(uvp.antnums_to_blpair(blp))
elif _is_antpair_tuple(blp):
raise ValueError(
f"{func_name} blpairs must be baseline-pair tuples or groups of "
"baseline-pair tuples/integers. For a single blpair, use "
"[((ant1, ant2), (ant3, ant4))]; to average a group, use "
"[[blpair1, blpair2, ...]]."
)
else:
raise TypeError(
f"{func_name} blpairs must be baseline-pair tuples, blpair "
"integers, or lists of those values."
)
normalized.append(normalized_group)
return normalized
def _format_blpair_group_label(uvp, blpair_group):
"""Format a plotted blpair or averaged blpair group for display."""
group = [uvp.blpair_to_antnums(blp) for blp in blpair_group]
if len(group) == 1:
return str(group[0])
return str(group)
def _normalize_polpair_label(pol):
"""Format a polarization input consistently for labels and titles."""
if isinstance(pol, str):
pol = (pol, pol)
return str(pol)
def _format_lst_label(lst):
"""Format an LST in hours for display."""
lst_hrs = (lst % (2 * np.pi)) * 12 / np.pi
return f"lst={lst_hrs:0.3f} hr"
def _format_delay_spectrum_manual_label(label_type, metadata):
"""Format explicit manual labels for delay_spectrum."""
if label_type == "key":
return str(metadata["key"])
if label_type == "blpair":
return f"{metadata['blpair']}"
if label_type == "blpairt":
return f"{metadata['blpair']}, {metadata['time']:0.5f}"
raise ValueError(f"Couldn't understand label_type {label_type}")
def _get_delay_spectrum_title_and_labels(
series_metadata, label_type, title_legend, average_blpairs=False
):
"""
Return the auto-generated title, per-series labels, and legend visibility.
When *average_blpairs* is True the ``blpair`` field is never written to
the title, because it would produce a very long list of all the averaged
baseline-pairs. The blpair label is still used in the legend when multiple
averaged groups are present.
"""
if not title_legend:
return "", [None for _ in series_metadata], False
if label_type != "auto":
labels = [
_format_delay_spectrum_manual_label(label_type, metadata)
for metadata in series_metadata
]
return "", labels, True
if not series_metadata:
return "", [], False
field_order = ("spw", "blpair", "pol", "lst")
field_labels = {
"spw": lambda value: f"spw={value}",
"blpair": lambda value: f"blpair={value}",
"pol": lambda value: f"pol={value}",
"lst": _format_lst_label,
}
varying_fields = [
field
for field in field_order
if len({metadata[field] for metadata in series_metadata}) > 1
]
static_fields = [field for field in field_order if field not in varying_fields]
# When blpairs are being averaged, skip the blpair label from the title
# even if it is static (one averaged group). The list of all averaged
# blpairs can be very long and does not help identify the plotted data.
title_fields = [f for f in static_fields if not (average_blpairs and f == "blpair")]
title = " | ".join(
field_labels[field](series_metadata[0][field]) for field in title_fields
)
labels = []
for metadata in series_metadata:
parts = [field_labels[field](metadata[field]) for field in varying_fields]
labels.append(", ".join(parts) if parts else None)
return title, labels, any(label is not None for label in labels)
[docs]
def delay_spectrum(
uvp,
blpairs,
spw,
pol,
average_blpairs=False,
average_times=False,
fold=False,
plot_noise=False,
delay=True,
deltasq=False,
legend=False,
title_legend=True,
ax=None,
component="real",
lines=True,
markers=False,
error=None,
times=None,
logscale=True,
force_plot=False,
label_type="auto",
plot_stats=None,
**kwargs,
):
r"""
Plot a 1D delay spectrum (or spectra) for a group of baselines.
Parameters
----------
uvp : UVPspec
UVPSpec object, containing delay spectra for a set of baseline-pairs,
times, polarizations, and spectral windows.
blpairs : sequence of baseline-pair identifiers
Sequence of blpair integers, nested baseline-pair tuples like
``((ant1, ant2), (ant3, ant4))``, or lists containing either form.
A list element is treated as one plotted/averaged group, so use
``[blpair1, blpair2]`` to plot two blpairs separately and
``[[blpair1, blpair2]]`` to average them together when
``average_blpairs=True``.
spw : int
Spectral-window index to plot.
pol : tuple or str
Polarization pair to plot. A string is interpreted as an auto-polarization,
e.g. ``"xx"`` becomes ``("xx", "xx")``.
average_blpairs : bool, optional
If True, average over the baseline pairs within each group.
average_times : bool, optional
If True, average spectra over the time axis. Default: False.
fold : bool, optional
Whether to fold the power spectrum in :math:`|k_\parallel|`.
Default: False.
plot_noise : bool, optional
Whether to plot noise power spectrum curves or not. Default: False.
delay : bool, optional
Whether to plot the power spectrum in delay units (ns) or cosmological
units (h/Mpc). Default: True.
deltasq : bool, optional
If True, plot dimensionless power spectra, Delta^2. This is ignored if
delay=True. Default: False.
legend : bool, optional
Whether to enable the plot legend. When ``label_type='auto'`` (the
default), a legend is only rendered if there are varying metadata
fields across the plotted spectra (e.g., multiple baseline-pairs or
LSTs). With any other ``label_type`` value, the legend is always
rendered when ``legend=True``. Has no effect when
``title_legend=False``. Default: False.
title_legend : bool, optional
If True, generate delay-spectrum title/legend metadata. When
``label_type="auto"``, static fields are written to the title and
varying fields are written to the legend. If False, do not create the
title or legend text automatically. Default: True.
ax : matplotlib.axes, optional
Use this to pass in an existing Axes object, which the power spectra
will be added to. The smart title/legend behavior also applies when
using a provided Axes unless ``title_legend=False``. If None, a new
Axes object will be created. Default: None.
component : str, optional
Component of complex spectra to plot, options=['abs', 'real', 'imag'].
Default: 'real'.
lines : bool, optional
If True, plot lines between bandpowers for a given pspectrum.
Default: True.
markers : bool, optional
If True, plot circles at bandpowers. Filled for positive, empty
for negative. Default: False.
error : str, optional
If not None and if error exists in uvp stats_array, plot errors
on bandpowers. Default: None.
times : array_like, optional
Values from ``uvp.time_avg_array`` to plot. Use this to select which
integrations appear; if ``label_type="blpairt"``, the legend labels
include the selected time for each spectrum.
logscale : bool, optional
If True, put plot on a log-scale. Else linear scale. Default: True.
force_plot : bool, optional
If plotting a large number of spectra (>100), this function will error.
Set this to True to override this large plot error and force plot.
Default: False.
label_type : str, optional
Line label type in legend. Options are:
``'auto'`` to put static metadata in the title and varying metadata in
the legend; ``'key'`` to label lines by ``(spw, blpair, pol)``;
``'blpair'`` to label by blpair; and ``'blpairt'`` to label by
blpair-time.
plot_stats : string, optional
If not None, plot an entry in uvp.stats_array instead
of power spectrum in uvp.data_array.
kwargs : dict, optional
Extra kwargs to pass to _all_ ax.plot calls.
Returns
-------
fig : matplotlib.pyplot.Figure
Matplotlib Figure instance.
"""
import matplotlib.pyplot as plt
# Create new Axes if none specified
new_plot = False
if ax is None:
new_plot = True
fig, ax = plt.subplots(1, 1)
# Select times if requested
if times is not None:
uvp = uvp.select(times=times, inplace=False)
# Add ungrouped baseline-pairs into a group of their own (expected by the
# averaging routines) and reject malformed baseline tuples up front.
blpairs = _normalize_plot_blpairs(uvp, blpairs, "delay_spectrum")
# Average over blpairs or times if requested
blpairs_in = copy.deepcopy(blpairs) # Save input blpair list
if average_blpairs:
uvp_plt = uvp.average_spectra(
blpair_groups=blpairs, time_avg=average_times, inplace=False
)
else:
uvp_plt = copy.deepcopy(uvp)
if average_times:
# Average over times, but not baseline-pairs
uvp_plt.average_spectra(time_avg=True, inplace=True)
# Check plot size
if uvp_plt.Ntimes * len(blpairs) > 100 and not force_plot:
raise ValueError(
"Trying to plot > 100 spectra... Set force_plot=True to continue."
)
# Convert to Delta^2 units if requested
if deltasq and not delay:
uvp_plt.convert_to_deltasq()
# Fold the power spectra if requested
if fold:
uvp_plt.fold_spectra()
# Get x-axis units (delays in ns, or k_parallel in Mpc^-1 or h Mpc^-1)
if delay:
dlys = uvp_plt.get_dlys(spw) * 1e9 # ns
x = dlys
else:
k_para = uvp_plt.get_kparas(spw)
x = k_para
# Check plot_stats
if plot_stats is not None:
assert plot_stats in uvp_plt.stats_array, (
f"specified key {plot_stats} not found in stats_array"
)
# Collect power spectra and metadata before plotting so smart title/legend
# formatting can tell which fields vary across the plotted set.
series = []
for blgrp in blpairs:
blgrp_label = _format_blpair_group_label(uvp_plt, blgrp)
# Loop over blpairs in group and plot power spectrum for each one
for blp in blgrp:
# setup key and casting function
key = (spw, blp, pol)
pol_label = _normalize_polpair_label(pol)
if component == "real":
cast = np.real
elif component == "imag":
cast = np.imag
elif component == "abs":
cast = np.abs
# get data array and repeat x array
if plot_stats is None:
data = cast(uvp_plt.get_data(key))
else:
data = cast(uvp_plt.get_stats(plot_stats, key))
# flag records that have zero integration
flags = np.isclose(uvp_plt.get_integrations(key), 0.0)
data[flags] = np.nan
# get errs if requessted
errs = None
if error is not None and hasattr(uvp_plt, "stats_array"):
if error in uvp_plt.stats_array:
errs = uvp_plt.get_stats(error, key)
errs[flags] = np.nan
else:
raise KeyError(
"Error variable '%s' not found in stats_array of UVPSpec object."
% error
)
elif error is not None:
raise KeyError(
"Error variable '%s' not found in stats_array of UVPSpec object."
% error
)
# get times
blptimes = uvp_plt.time_avg_array[uvp_plt.blpair_to_indices(blp)]
blplsts = uvp_plt.lst_avg_array[uvp_plt.blpair_to_indices(blp)]
# iterate over integrations per blp
series.extend(
{
"y": data[i],
"t": blptimes[i],
"lst": blplsts[i],
"key": (spw, blgrp_label, pol_label),
"blpair": blgrp_label,
"spw": spw,
"pol": pol_label,
"errs": cast(errs[i]) if errs is not None else None,
}
for i in range(data.shape[0])
)
# If blpairs were averaged, only the first blpair in the group
# exists any more (so skip the rest)
if average_blpairs:
break
title, labels, show_legend = _get_delay_spectrum_title_and_labels(
[
{
"key": item["key"],
"blpair": item["blpair"],
"spw": item["spw"],
"pol": item["pol"],
"time": item["t"],
"lst": item["lst"],
}
for item in series
],
label_type,
title_legend,
average_blpairs=average_blpairs,
)
for item, label in zip(series, labels, strict=True):
y = item["y"]
# plot elements
cax = None
if lines:
if logscale:
_y = np.abs(y)
else:
_y = y
(cax,) = ax.plot(x, _y, marker="None", label=label, **kwargs)
if markers:
if cax is None:
c = None
else:
c = cax.get_color()
marker_label = None if lines else label
# plot markers
if logscale:
# plot positive w/ filled circles
(cax,) = ax.plot(
x[y >= 0],
np.abs(y[y >= 0]),
c=c,
ls="None",
marker="o",
markerfacecolor=c,
markeredgecolor=c,
label=marker_label,
**kwargs,
)
# plot negative w/ unfilled circles
c = cax.get_color()
(cax,) = ax.plot(
x[y < 0],
np.abs(y[y < 0]),
c=c,
ls="None",
marker="o",
markerfacecolor="None",
markeredgecolor=c,
**kwargs,
)
else:
(cax,) = ax.plot(
x,
y,
c=c,
ls="None",
marker="o",
markerfacecolor=c,
markeredgecolor=c,
label=marker_label,
**kwargs,
)
if item["errs"] is not None:
if cax is None:
c = None
else:
c = cax.get_color()
if logscale:
_y = np.abs(y)
else:
_y = y
cax = ax.errorbar(x, _y, fmt="none", ecolor=c, yerr=item["errs"], **kwargs)
# Set log scale
if logscale:
ax.set_yscale("log")
if title:
ax.set_title(title)
# Add legend only when legend=True and show_legend=True. show_legend is
# False when label_type='auto' and all metadata fields are identical
# (a single spectrum or all blpairs at the same LST), because there is
# nothing meaningful to distinguish in the legend entries.
if legend and show_legend:
ax.legend(loc="upper left")
# Add labels with units
if ax.get_xlabel() == "":
if delay:
ax.set_xlabel(r"$\tau$ $[{\rm ns}]$", fontsize=16)
else:
ax.set_xlabel(r"$k_{\parallel}\ h\ Mpc^{-1}$", fontsize=16)
if ax.get_ylabel() == "" and plot_stats is None:
# Sanitize power spectrum units
psunits = uvp_plt.units
if "h^-1" in psunits:
psunits = psunits.replace(r"h^-1", "h^{-1}")
if "h^-3" in psunits:
psunits = psunits.replace(r"h^-3", "h^{-3}")
if "Mpc" in psunits and "\\rm" not in psunits:
psunits = psunits.replace("Mpc", r"{\rm Mpc}")
if "pi" in psunits and "\\pi" not in psunits:
psunits = psunits.replace("pi", r"\pi")
# Power spectrum type
if deltasq:
ax.set_ylabel(r"$\Delta^2$ $[%s]$" % psunits, fontsize=16)
else:
ax.set_ylabel(r"$P(k_\parallel)$ $[%s]$" % psunits, fontsize=16)
# Return Figure: the axis is an attribute of figure
if new_plot:
return fig
def delay_waterfall(
uvp,
blpairs,
spw,
pol,
component="abs-real",
average_blpairs=False,
fold=False,
delay=True,
deltasq=False,
log=True,
lst_in_hrs=True,
vmin=None,
vmax=None,
cmap="YlGnBu",
axes=None,
figsize=(14, 6),
force_plot=False,
times=None,
title_type="blpair",
colorbar=True,
**kwargs,
):
r"""
Plot a 1D delay spectrum waterfall (or spectra) for a group of baselines.
Parameters
----------
uvp : UVPspec
UVPSpec object, containing delay spectra for a set of baseline-pairs,
times, polarizations, and spectral windows.
blpairs : sequence of baseline-pair identifiers
Sequence of blpair integers, nested baseline-pair tuples like
``((ant1, ant2), (ant3, ant4))``, or lists containing either form.
A list element is treated as one plotted/averaged group, so use
``[blpair1, blpair2]`` to plot two blpairs separately and
``[[blpair1, blpair2]]`` to average them together when
``average_blpairs=True``.
spw, pol : int or str
Which spectral window and polarization to plot.
component : str
Component of complex spectra to plot, options=['abs', 'real', 'imag', 'abs-real', 'abs-imag'].
abs-real is abs(real(data)), whereas 'real' is real(data)
Default: 'abs-real'.
average_blpairs : bool, optional
If True, average over the baseline pairs within each group.
fold : bool, optional
Whether to fold the power spectrum in :math:`|k_\parallel|`.
Default: False.
delay : bool, optional
Whether to plot the power spectrum in delay units (ns) or cosmological
units (h/Mpc). Default: True.
deltasq : bool, optional
If True, plot dimensionless power spectra, Delta^2. This is ignored if
delay=True. Default: False.
log : bool, optional
Whether to plot the log10 of the data. Default: True.
lst_in_hrs : bool, optional
If True, LST is plotted in hours, otherwise its plotted in radians.
vmin, vmax : float, optional
Clip the color scale of the delay spectrum to these min./max. values.
If None, use the natural range of the data. Default: None.
cmap : str, optional
Matplotlib colormap to use. Default: 'YlGnBu'.
axes : array of matplotlib.axes, optional
Use this to pass in an existing Axes object or array of axes, which
the power spectra will be added to. (Warning: Labels and legends will
not be altered in this case, even if the existing plot has completely different axis
labels etc.) If None, a new Axes object will be created. Default: None.
figsize : tuple
len-2 integer tuple specifying figure size if axes is None
force_plot : bool
If plotting a large number of blpairs (>20), this routine will quit
unless force_plot == True.
times : array_like, optional
Values from ``uvp.time_avg_array`` to plot.
title_type : str, optional
Type of title to put above plot(s). Options = ['blpair', 'blvec']
blpair : "bls: {bl1} x {bl2}"
blvec : "bl len {len} m & ang {ang} deg"
colorbar : bool, optional
Whether to make a colorbar. Default: True
kwargs : keyword arguments
Additional kwargs to pass to ax.matshow()
Returns
-------
fig : matplotlib.pyplot.Figure
Matplotlib Figure instance if input ax is None.
"""
import matplotlib as mpl
import matplotlib.pyplot as plt
# assert component
assert component in ["real", "abs", "imag", "abs-real", "abs-imag"], (
f"Can't parse specified component {component}"
)
fix_negval = component in ["real", "imag"] and log
# Add ungrouped baseline-pairs into a group of their own (expected by the
# averaging routines) and reject malformed baseline tuples up front.
blpairs = _normalize_plot_blpairs(uvp, blpairs, "delay_waterfall")
# Select times if requested
if times is not None:
uvp = uvp.select(times=times, inplace=False)
# Average over blpairs or times if requested
blpairs_in = copy.deepcopy(blpairs) # Save input blpair list
if average_blpairs:
uvp_plt = uvp.average_spectra(
blpair_groups=blpairs, time_avg=False, inplace=False
)
else:
uvp_plt = copy.deepcopy(uvp)
# Fold the power spectra if requested
if fold:
uvp_plt.fold_spectra()
# Convert to Delta^2 units if requested
if deltasq and not delay:
uvp_plt.convert_to_deltasq()
# Get x-axis units (delays in ns, or k_parallel in Mpc^-1 or h Mpc^-1)
if delay:
dlys = uvp_plt.get_dlys(spw) * 1e9 # ns
x = dlys
else:
k_para = uvp_plt.get_kparas(spw)
x = k_para
# Extract power spectra into array
waterfall = {}
for blgrp in blpairs:
# Loop over blpairs in group and plot power spectrum for each one
for blp in blgrp:
# make key
key = (spw, blp, pol)
# get power data
power = uvp_plt.get_data(key, omit_flags=False)
# set flagged power data to nan
flags = np.isclose(uvp_plt.get_integrations(key), 0.0)
power[flags, :] = np.nan
# get component
if component == "abs":
power = np.abs(power)
elif component == "real":
power = np.real(power)
elif component == "abs-real":
power = np.abs(np.real(power))
elif component == "imag":
power = np.imag(power)
elif component == "abs-imag":
power = np.abs(np.real(power))
# if real or imag and log is True, set negative values to near zero
# this is done so that one can use cmap.set_under() and cmap.set_bad() separately
if fix_negval:
power[power < 0] = np.abs(power).min() * 1e-6 + 1e-10
# assign to waterfall
waterfall[key] = power
# If blpairs were averaged, only the first blpair in the group
# exists any more (so skip the rest)
if average_blpairs:
break
# check for reasonable number of blpairs to plot...
Nkeys = len(waterfall)
if Nkeys > 20 and not force_plot:
raise ValueError("Nblps > 20 and force_plot == False, quitting...")
# Take logarithm of data if requested
if log:
for k in waterfall:
waterfall[k] = np.log10(np.abs(waterfall[k]))
logunits = r"\log_{10}"
else:
logunits = ""
# Create new Axes if none specified
new_plot = False
if axes is None:
new_plot = True
# figure out how many subplots to make
Nkeys = len(waterfall)
Nside = int(np.ceil(np.sqrt(Nkeys)))
fig, axes = plt.subplots(Nside, Nside, figsize=figsize)
# Ensure axes is an ndarray
if isinstance(axes, mpl.axes.Axes):
axes = np.array([[axes]])
if isinstance(axes, list):
axes = np.array(axes)
# Ensure its 2D and get side lengths
if axes.ndim == 1:
axes = axes[:, None]
assert axes.ndim == 2, "input axes must have ndim == 2"
Nvert, Nhorz = axes.shape
# Get LST range: setting y-ticks is tricky due to LST wrapping...
y = uvp_plt.lst_avg_array[uvp_plt.key_to_indices(list(waterfall.keys())[0])[1]]
y = np.unwrap(y)
if y[0] > np.pi:
# if start is closer to 2pi than 0, lower axis by an octave
y -= 2 * np.pi
if lst_in_hrs:
lst_units = "Hr"
y *= 24 / (2 * np.pi)
else:
lst_units = "rad"
# get baseline vectors
blvecs = dict(
zip(
[uvp_plt.bl_to_antnums(bl) for bl in uvp_plt.bl_array],
uvp_plt.get_ENU_bl_vecs(),
)
)
# Sanitize power spectrum units
psunits = uvp_plt.units
if "h^-1" in psunits:
psunits = psunits.replace("h^-1", "h^{-1}")
if "h^-3" in psunits:
psunits = psunits.replace("h^-3", "h^{-3}")
if "Hz" in psunits:
psunits = psunits.replace("Hz", r"{\rm Hz}")
if "str" in psunits:
psunits = psunits.replace("str", r"\,{\rm str}")
if "Mpc" in psunits and "\\rm" not in psunits:
psunits = psunits.replace("Mpc", r"{\rm Mpc}")
if "pi" in psunits and "\\pi" not in psunits:
psunits = psunits.replace("pi", r"\pi")
if "beam normalization not specified" in psunits:
psunits = psunits.replace("beam normalization not specified", r"{\rm unnormed}")
# Iterate over waterfall keys
keys = list(waterfall.keys())
k = 0
for i in range(Nvert):
for j in range(Nhorz):
# set ax
ax = axes[i, j]
# turn off subplot if no more plots to make
if k >= Nkeys:
ax.axis("off")
continue
# get blpair key for this subplot
key = keys[k]
blp = uvp_plt.blpair_to_antnums(key[1])
# plot waterfall
cax = ax.matshow(
waterfall[key],
cmap=cmap,
aspect="auto",
vmin=vmin,
vmax=vmax,
extent=[x[0], x[-1], y[-1], y[0]],
**kwargs,
)
# ax config
ax.xaxis.set_ticks_position("bottom")
ax.tick_params(labelsize=12)
if ax.get_title() == "":
if title_type == "blpair":
ax.set_title("bls: {} x {}".format(*blp), y=1)
elif title_type == "blvec":
blv = 0.5 * (blvecs[blp[0]] + blvecs[blp[1]])
lens, angs = utils.get_bl_lens_angs([blv], bl_error_tol=1.0)
ax.set_title(f"bl len {lens[0]:0.2f} m & {angs[0]:0.0f} deg", y=1)
# set colorbar
if colorbar:
if fix_negval:
cb_extend = "min"
else:
cb_extend = "neither"
cbar = ax.get_figure().colorbar(cax, ax=ax, extend=cb_extend)
cbar.ax.tick_params(labelsize=14)
if fix_negval:
cbar.ax.set_title("$< 0$", y=-0.05, fontsize=16)
# configure left-column plots
if j == 0:
# set yticks
ax.set_ylabel(rf"LST [{lst_units}]", fontsize=16)
else:
ax.set_yticklabels([])
# configure bottom-row plots
if k + Nhorz >= Nkeys:
if ax.get_xlabel() == "":
if delay:
ax.set_xlabel(r"$\tau$ $[{\rm ns}]$", fontsize=16)
else:
ax.set_xlabel(r"$k_{\parallel}\ h\ Mpc^{-1}$", fontsize=16)
else:
ax.set_xticklabels([])
k += 1
# make suptitle
if axes[0][0].get_figure()._suptitle is None:
if deltasq:
units = r"$%s\Delta^2$ $[%s]$" % (logunits, psunits)
else:
units = r"$%sP(k_\parallel)$ $[%s]$" % (logunits, psunits)
spwrange = np.around(np.array(uvp_plt.get_spw_ranges()[spw][:2]) / 1e6, 2)
axes[0][0].get_figure().suptitle(
"{}\n{} polarization | {} -- {} MHz".format(units, pol, *spwrange),
y=1.03,
fontsize=14,
)
# Return Axes
if new_plot:
return fig
def delay_wedge(
uvp,
spw,
pol,
blpairs=None,
times=None,
error_weights=None,
fold=False,
delay=True,
rotate=False,
component="abs-real",
log10=True,
loglog=False,
red_tol=1.0,
center_line=False,
horizon_lines=False,
title=None,
ax=None,
cmap="viridis",
figsize=(8, 6),
deltasq=False,
colorbar=False,
cbax=None,
vmin=None,
vmax=None,
edgecolor="none",
flip_xax=False,
flip_yax=False,
lw=2,
set_bl_tick_major=False,
set_bl_tick_minor=False,
xtick_size=10,
xtick_rot=0,
ytick_size=10,
ytick_rot=0,
**kwargs,
):
"""
Plot a 2D delay spectrum (or spectra) from a UVPSpec object. Note that
all integrations and redundant baselines are averaged (unless specifying
times) before plotting.
Note: this deepcopies input uvp before averaging.
Parameters
----------
uvp : UVPSpec
UVPSpec object containing delay spectra to plot.
spw : integer
Which spectral window to plot.
pol : int or tuple
Polarization-pair integer or tuple, e.g. ('pI', 'pI')
blpairs : list of tuples, optional
List of baseline-pair tuples to use in plotting.
times : list, optional
An ndarray or list of times from uvp.time_avg_array to
select on before plotting. Default: None.
error_weights : string, optional
error_weights specify which kind of errors we use for weights
during averaging power spectra.
fold : bool, optional
Whether to fold the power spectrum in k_parallel.
Default: False.
delay : bool, optional
Whether to plot the axes in tau (ns). If False, axes will
be plotted in cosmological units.
Default: True.
rotate : bool, optional
If False, use baseline-type as x-axis and delay as y-axis,
else use baseline-type as y-axis and delay as x-axis.
Default: False
component : str, optional
Component of complex spectra to plot. Options=['real', 'imag', 'abs', 'abs-real', 'abs-imag']
abs-real is abs(real(data)), whereas 'real' is real(data)
Default: 'abs-real'.
log10 : bool, optional
If True, take log10 of data before plotting. Default: True
loglog : bool, optional
If True, turn x-axis and y-axis into log-log scale. Default: False
red_tol : float, optional
Redundancy tolerance when solving for redundant groups in meters.
Default: 1.0
center_line : bool, optional
Whether to plot a dotted line at k_perp = 0.
Default: False.
horizon_lines : bool, optional
Whether to plot dotted lines along the horizon.
Default: False.
title : string, optional
Title for subplot. Default: None.
ax : matplotlib.axes, optional
If not None, use this axes as a subplot for delay wedge.
cmap : str, optional
Colormap of wedge plot. Default: 'viridis'
figsize : len-2 integer tuple, optional
If ax is None, this is the new figure size.
deltasq : bool, optional
Convert to Delta^2 before plotting. This is ignored if delay=True.
Default: False
colorbar : bool, optional
Add a colorbar to the plot. Default: False
cbax : matplotlib.axes, optional
Axis object for adding colorbar if True. Default: None
vmin : float, optional
Minimum range of colorscale. Default: None
vmax : float, optional
Maximum range of colorscale. Default: None
edgecolor : str, optional
Edgecolor of bins in pcolormesh. Default: 'none'
flip_xax : bool, optional
Flip xaxis if True. Default: False
flip_yax : bool, optional
Flip yaxis if True. Default: False
lw : int, optional
Line-width of horizon and center lines if plotted. Default: 2.
set_bl_tick_major : bool, optional
If True, use the baseline lengths as major ticks, rather than default
uniform grid.
set_bl_tick_minor : bool, optional
If True, use the baseline lengths as minor ticks, which have no labels.
kwargs : dictionary
Additional keyword arguments to pass to pcolormesh() call.
Returns
-------
fig : matplotlib.pyplot.Figure
Matplotlib Figure instance if ax is None.
"""
import matplotlib as mpl
import matplotlib.pyplot as plt
# type checking
uvp = copy.deepcopy(uvp)
assert isinstance(uvp, uvpspec.UVPSpec), "input uvp must be a UVPSpec object"
assert isinstance(spw, (int, np.integer))
assert isinstance(pol, (int, np.integer, tuple))
fix_negval = component in ["real", "imag"] and log10
# check pspec units for little h
little_h = "h^-3" in uvp.norm_units
# Create new ax if none specified
new_plot = False
if ax is None:
fig, ax = plt.subplots(figsize=figsize)
new_plot = True
else:
fig = ax.get_figure()
# Select out times and blpairs if provided
if times is not None:
uvp.select(blpairs=blpairs, times=times, inplace=True)
# Average across redundant groups and time
# this also ensures blpairs are ordered from short_bl --> long_bl
blp_grps, lens, angs, tags = utils.get_blvec_reds(
uvp, bl_error_tol=red_tol, match_bl_lens=True
)
uvp.average_spectra(
blpair_groups=blp_grps, time_avg=True, error_weights=error_weights, inplace=True
)
# get blpairs and order by len and enforce bl len ordering anyways
blpairs, blpair_seps = uvp.get_blpairs(), uvp.get_blpair_seps()
osort = np.argsort(blpair_seps)
blpairs, blpair_seps = [blpairs[oi] for oi in osort], blpair_seps[osort]
if len(blpairs) < 2:
raise ValueError(
"delay_wedge requires at least two baseline pairs after selection "
"and averaging."
)
# Convert to DeltaSq
if deltasq and not delay:
uvp.convert_to_deltasq(inplace=True)
# Fold array
if fold:
uvp.fold_spectra()
# Format ticks
if delay:
x_axis = uvp.get_dlys(spw) * 1e9
y_axis = blpair_seps
else:
x_axis = uvp.get_kparas(spw, little_h=little_h)
y_axis = uvp.get_kperps(spw, little_h=little_h)
if rotate:
_x_axis = y_axis
y_axis = x_axis
x_axis = _x_axis
# Conigure Units
psunits = rf"({uvp.vis_units})^2\ {uvp.norm_units}"
if "h^-1" in psunits:
psunits = psunits.replace("h^-1", r"h^{-1}\ ")
if "h^-3" in psunits:
psunits = psunits.replace(r"h^-3", r"h^{-3}\ ")
if "Hz" in psunits:
psunits = psunits.replace("Hz", r"{\rm Hz}\ ")
if "str" in psunits:
psunits = psunits.replace("str", r"\,{\rm str}\ ")
if "Mpc" in psunits and "\\rm" not in psunits:
psunits = psunits.replace("Mpc", r"{\rm Mpc}")
if "pi" in psunits and "\\pi" not in psunits:
psunits = psunits.replace("pi", r"\pi")
if "beam normalization not specified" in psunits:
psunits = psunits.replace("beam normalization not specified", r"{\rm unnormed}")
# get data with shape (Nblpairs, Ndlys)
data = [uvp.get_data((spw, blp, pol)).squeeze() for blp in blpairs]
# get component
if component == "real":
data = np.real(data)
elif component == "abs-real":
data = np.abs(np.real(data))
elif component == "imag":
data = np.imag(data)
elif component == "abs-imag":
data = np.abs(np.imag(data))
elif component == "abs":
data = np.abs(data)
else:
raise ValueError(f"Did not understand component {component}")
# if real or imag and log is True, set negative values to near zero
# this is done so that one can use cmap.set_under() and cmap.set_bad() separately
if fix_negval:
data[data < 0] = np.abs(data).min() * 1e-6 + 1e-10
# take log10
if log10:
data = np.log10(data)
# loglog
if loglog:
ax.set_xscale("log")
ax.set_yscale("log")
ax.yaxis.set_major_formatter(mpl.ticker.LogFormatterSciNotation())
ax.yaxis.set_minor_formatter(mpl.ticker.NullFormatter())
ax.xaxis.set_major_formatter(mpl.ticker.LogFormatterSciNotation())
ax.xaxis.set_minor_formatter(mpl.ticker.NullFormatter())
# rotate
if rotate:
data = np.rot90(data[:, ::-1], k=1)
# Get bin edges
xdiff = np.diff(x_axis)
x_edges = np.array(
[x_axis[0] - xdiff[0] / 2.0]
+ list(x_axis[:-1] + xdiff / 2.0)
+ [x_axis[-1] + xdiff[-1] / 2.0]
)
ydiff = np.diff(y_axis)
y_edges = np.array(
[y_axis[0] - ydiff[0] / 2.0]
+ list(y_axis[:-1] + ydiff / 2.0)
+ [y_axis[-1] + ydiff[-1] / 2.0]
)
X, Y = np.meshgrid(x_edges, y_edges)
# plot
cax = ax.pcolormesh(
X,
Y,
data,
cmap=cmap,
edgecolor=edgecolor,
lw=0.01,
vmin=vmin,
vmax=vmax,
**kwargs,
)
# Configure ticks
if set_bl_tick_major:
if rotate:
ax.set_xticks([np.around(x, _get_sigfig(x) + 2) for x in x_axis])
else:
ax.set_yticks([np.around(x, _get_sigfig(x) + 2) for x in y_axis])
if set_bl_tick_minor:
if rotate:
ax.set_xticks(
[np.around(x, _get_sigfig(x) + 2) for x in x_axis], minor=True
)
else:
ax.set_yticks(
[np.around(x, _get_sigfig(x) + 2) for x in y_axis], minor=True
)
# Add colorbar
if colorbar:
if fix_negval:
cb_extend = "min"
else:
cb_extend = "neither"
if cbax is None:
cbax = ax
cbar = fig.colorbar(cax, ax=cbax, extend=cb_extend)
if deltasq:
p = r"\Delta^2"
else:
p = "P"
if delay:
p = r"{}({},\ {})".format(p, r"\tau", r"|\vec{b}|")
else:
p = r"{}({},\ {})".format(p, r"k_\parallel", r"k_\perp")
if log10:
psunits = rf"$\log_{{10}}\ {p}\ [{psunits}]$"
else:
psunits = rf"${p}\ [{psunits}]$"
cbar.set_label(psunits, fontsize=14)
if fix_negval:
cbar.ax.set_title("$< 0$", y=-0.05, fontsize=16)
# Configure tick labels
if delay:
xlabel = r"$\tau$ $[{\rm ns}]$"
ylabel = r"$|\vec{b}|$ $[{\rm m}]$"
else:
xlabel = r"$k_{\parallel}\ [h\ \rm Mpc^{-1}]$"
ylabel = r"$k_{\perp}\ [h\ \rm Mpc^{-1}]$"
if rotate:
_xlabel = ylabel
ylabel = xlabel
xlabel = _xlabel
if ax.get_xlabel() == "":
ax.set_xlabel(xlabel, fontsize=16)
if ax.get_ylabel() == "":
ax.set_ylabel(ylabel, fontsize=16)
# Configure center line
if center_line:
if rotate:
ax.axhline(y=0, color="#000000", ls="--", lw=lw)
else:
ax.axvline(x=0, color="#000000", ls="--", lw=lw)
# Plot horizons
if horizon_lines:
# get horizon in ns
horizons = blpair_seps / conversions.units.c * 1e9
# convert to cosmological wave vector
if not delay:
# Get average redshift of spw
avg_z = uvp.cosmo.f2z(np.mean(uvp.freq_array[uvp.spw_to_freq_indices(spw)]))
horizons *= uvp.cosmo.tau_to_kpara(avg_z, little_h=little_h) / 1e9
# iterate over bins and plot lines
if rotate:
bin_edges = x_edges
else:
bin_edges = y_edges
for i, hor in enumerate(horizons):
if rotate:
ax.plot(
bin_edges[i : i + 2], [hor, hor], color="#ffffff", ls="--", lw=lw
)
if not uvp.folded:
ax.plot(
bin_edges[i : i + 2],
[-hor, -hor],
color="#ffffff",
ls="--",
lw=lw,
)
else:
ax.plot(
[hor, hor], bin_edges[i : i + 2], color="#ffffff", ls="--", lw=lw
)
if not uvp.folded:
ax.plot(
[-hor, -hor],
bin_edges[i : i + 2],
color="#ffffff",
ls="--",
lw=lw,
)
# flip axes
if flip_xax:
fig.sca(ax)
fig.gca().invert_xaxis()
if flip_yax:
fig.sca(ax)
fig.gca().invert_yaxis()
# add title
if title is not None:
ax.set_title(title, fontsize=12)
# Configure tick sizes and rotation
[tl.set_size(xtick_size) for tl in ax.get_xticklabels()]
[tl.set_rotation(xtick_rot) for tl in ax.get_xticklabels()]
[tl.set_size(ytick_size) for tl in ax.get_yticklabels()]
[tl.set_rotation(ytick_rot) for tl in ax.get_yticklabels()]
# return figure
if new_plot:
return fig
def plot_uvdata_waterfalls(
uvd,
basename,
data="data",
plot_mode="log",
vmin=None,
vmax=None,
recenter=False,
format="png", # noqa: A002 — public kwarg name; renaming is a breaking change
**kwargs,
):
"""
Plot waterfalls for all baselines and polarizations within a UVData object,
and save to individual files.
Parameters
----------
uvd : UVData object
Input data object. Waterfalls will be stored for all baselines and
polarizations within the object; use uvd.select() to remove unwanted
information.
basename : str
Base filename for the output plots. This must have two placeholders
for baseline ID ('bl') and polarization ('pol'),
e.g. basename='plots/uvdata.{pol}.{bl}'.
data : str, optional
Which data array to plot from the UVData object. Options are:
'data', 'flags', 'nsamples'. Default: 'data'.
plot_mode : str, optional
Plot mode, passed to uvtools.plot.waterfall. Default: 'log'.
vmin, vmax : float, optional
Min./max. values of the plot colorscale. Default: None (uses the
min./max. of the data).
recenter : bool, optional
Whether to apply recentering (see uvtools.plot.waterfall).
Default: False.
format : str, optional
The file format of the output images. If None, the image format will be
deduced from the basename string. Default: 'png'.
**kwargs : dict
Keyword arguments passed to uvtools.plot.waterfall, which passes them
on to matplotlib.imshow.
"""
import matplotlib.pyplot as plt
assert isinstance(uvd, UVData), "'uvd' must be a UVData object."
assert data in ["data", "flags", "nsamples"], (
"'%s' not a valid data array; use 'data', 'flags', or 'nsamples'" % data
)
# Set plot colorscale max/min if specified
drng = None
if vmin is not None:
assert vmax is not None, "Must also specify vmax if vmin is specified."
drng = vmax - vmin
# Empty figure
fig, ax = plt.subplots(1, 1)
# Loop over antenna pairs and pols
for (ant1, ant2, pol), d in uvd.antpairpol_iter():
# Get chosen data array
if data == "data":
pass
elif data == "flags":
d = uvd.get_flags((ant1, ant2, pol))
elif data == "nsamples":
d = uvd.get_nsamples((ant1, ant2, pol))
else:
raise KeyError("Invalid data array type '%s'" % data)
# Make plot
img = uvtools.plot.waterfall(
d, mode=plot_mode, mx=vmax, drng=drng, recenter=recenter, **kwargs
)
fig.colorbar(img)
# Save to file
outfile = basename.format(bl="%d.%d" % (ant1, ant2), pol=pol)
if format is not None:
# Make sure format extension is given
if outfile[-len(format)].lower() != format.lower():
outfile = "%s.%s" % (outfile, format)
fig.tight_layout()
fig.savefig(outfile, format=format)
fig.clf()
def _get_sigfig(x):
return -int(np.floor(np.log10(np.abs(x))))
def _round_sigfig(x, up=True):
sigfigs = _get_sigfig(x)
if up:
return np.ceil(10**sigfigs * x) / 10**sigfigs
else:
return np.floor(10**sigfigs * x) / 10**sigfigs