Source code for hera_pspec.plot

import copy

import numpy as np
import uvtools
from pyuvdata import UVData

from . import conversions, utils, uvpspec


def _is_antpair_tuple(value):
    """Return True for a baseline tuple like (ant1, ant2)."""
    return (
        isinstance(value, tuple)
        and len(value) == 2
        and all(isinstance(ant, (int, np.integer)) for ant in value)
    )


def _is_blpair_tuple(value):
    """Return True for a blpair tuple like ((ant1, ant2), (ant3, ant4))."""
    return (
        isinstance(value, tuple)
        and len(value) == 2
        and all(_is_antpair_tuple(antpair) for antpair in value)
    )


def _normalize_plot_blpairs(uvp, blpairs, func_name):
    """
    Normalize plotting blpair input to a list of lists of blpair integers.
    """
    normalized = []
    for blpgrp in blpairs:
        if isinstance(blpgrp, list):
            group = blpgrp
        elif isinstance(blpgrp, (int, np.integer)) or _is_blpair_tuple(blpgrp):
            group = [blpgrp]
        elif _is_antpair_tuple(blpgrp):
            raise ValueError(
                f"{func_name} blpairs must be baseline-pair tuples or groups of "
                "baseline-pair tuples/integers. For a single blpair, use "
                "[((ant1, ant2), (ant3, ant4))]; to average a group, use "
                "[[blpair1, blpair2, ...]]."
            )
        else:
            raise TypeError(
                f"{func_name} blpairs must be an iterable of baseline-pair tuples, "
                "blpair integers, or lists of those values."
            )

        normalized_group = []
        for blp in group:
            if isinstance(blp, (int, np.integer)):
                normalized_group.append(blp)
            elif _is_blpair_tuple(blp):
                normalized_group.append(uvp.antnums_to_blpair(blp))
            elif _is_antpair_tuple(blp):
                raise ValueError(
                    f"{func_name} blpairs must be baseline-pair tuples or groups of "
                    "baseline-pair tuples/integers. For a single blpair, use "
                    "[((ant1, ant2), (ant3, ant4))]; to average a group, use "
                    "[[blpair1, blpair2, ...]]."
                )
            else:
                raise TypeError(
                    f"{func_name} blpairs must be baseline-pair tuples, blpair "
                    "integers, or lists of those values."
                )
        normalized.append(normalized_group)

    return normalized


def _format_blpair_group_label(uvp, blpair_group):
    """Format a plotted blpair or averaged blpair group for display."""
    group = [uvp.blpair_to_antnums(blp) for blp in blpair_group]
    if len(group) == 1:
        return str(group[0])
    return str(group)


def _normalize_polpair_label(pol):
    """Format a polarization input consistently for labels and titles."""
    if isinstance(pol, str):
        pol = (pol, pol)
    return str(pol)


def _format_lst_label(lst):
    """Format an LST in hours for display."""
    lst_hrs = (lst % (2 * np.pi)) * 12 / np.pi
    return f"lst={lst_hrs:0.3f} hr"


def _format_delay_spectrum_manual_label(label_type, metadata):
    """Format explicit manual labels for delay_spectrum."""
    if label_type == "key":
        return str(metadata["key"])
    if label_type == "blpair":
        return f"{metadata['blpair']}"
    if label_type == "blpairt":
        return f"{metadata['blpair']}, {metadata['time']:0.5f}"
    raise ValueError(f"Couldn't understand label_type {label_type}")


def _get_delay_spectrum_title_and_labels(
    series_metadata, label_type, title_legend, average_blpairs=False
):
    """
    Return the auto-generated title, per-series labels, and legend visibility.

    When *average_blpairs* is True the ``blpair`` field is never written to
    the title, because it would produce a very long list of all the averaged
    baseline-pairs. The blpair label is still used in the legend when multiple
    averaged groups are present.
    """
    if not title_legend:
        return "", [None for _ in series_metadata], False

    if label_type != "auto":
        labels = [
            _format_delay_spectrum_manual_label(label_type, metadata)
            for metadata in series_metadata
        ]
        return "", labels, True

    if not series_metadata:
        return "", [], False

    field_order = ("spw", "blpair", "pol", "lst")
    field_labels = {
        "spw": lambda value: f"spw={value}",
        "blpair": lambda value: f"blpair={value}",
        "pol": lambda value: f"pol={value}",
        "lst": _format_lst_label,
    }
    varying_fields = [
        field
        for field in field_order
        if len({metadata[field] for metadata in series_metadata}) > 1
    ]
    static_fields = [field for field in field_order if field not in varying_fields]

    # When blpairs are being averaged, skip the blpair label from the title
    # even if it is static (one averaged group). The list of all averaged
    # blpairs can be very long and does not help identify the plotted data.
    title_fields = [f for f in static_fields if not (average_blpairs and f == "blpair")]

    title = " | ".join(
        field_labels[field](series_metadata[0][field]) for field in title_fields
    )
    labels = []
    for metadata in series_metadata:
        parts = [field_labels[field](metadata[field]) for field in varying_fields]
        labels.append(", ".join(parts) if parts else None)
    return title, labels, any(label is not None for label in labels)


[docs] def delay_spectrum( uvp, blpairs, spw, pol, average_blpairs=False, average_times=False, fold=False, plot_noise=False, delay=True, deltasq=False, legend=False, title_legend=True, ax=None, component="real", lines=True, markers=False, error=None, times=None, logscale=True, force_plot=False, label_type="auto", plot_stats=None, **kwargs, ): r""" Plot a 1D delay spectrum (or spectra) for a group of baselines. Parameters ---------- uvp : UVPspec UVPSpec object, containing delay spectra for a set of baseline-pairs, times, polarizations, and spectral windows. blpairs : sequence of baseline-pair identifiers Sequence of blpair integers, nested baseline-pair tuples like ``((ant1, ant2), (ant3, ant4))``, or lists containing either form. A list element is treated as one plotted/averaged group, so use ``[blpair1, blpair2]`` to plot two blpairs separately and ``[[blpair1, blpair2]]`` to average them together when ``average_blpairs=True``. spw : int Spectral-window index to plot. pol : tuple or str Polarization pair to plot. A string is interpreted as an auto-polarization, e.g. ``"xx"`` becomes ``("xx", "xx")``. average_blpairs : bool, optional If True, average over the baseline pairs within each group. average_times : bool, optional If True, average spectra over the time axis. Default: False. fold : bool, optional Whether to fold the power spectrum in :math:`|k_\parallel|`. Default: False. plot_noise : bool, optional Whether to plot noise power spectrum curves or not. Default: False. delay : bool, optional Whether to plot the power spectrum in delay units (ns) or cosmological units (h/Mpc). Default: True. deltasq : bool, optional If True, plot dimensionless power spectra, Delta^2. This is ignored if delay=True. Default: False. legend : bool, optional Whether to enable the plot legend. When ``label_type='auto'`` (the default), a legend is only rendered if there are varying metadata fields across the plotted spectra (e.g., multiple baseline-pairs or LSTs). With any other ``label_type`` value, the legend is always rendered when ``legend=True``. Has no effect when ``title_legend=False``. Default: False. title_legend : bool, optional If True, generate delay-spectrum title/legend metadata. When ``label_type="auto"``, static fields are written to the title and varying fields are written to the legend. If False, do not create the title or legend text automatically. Default: True. ax : matplotlib.axes, optional Use this to pass in an existing Axes object, which the power spectra will be added to. The smart title/legend behavior also applies when using a provided Axes unless ``title_legend=False``. If None, a new Axes object will be created. Default: None. component : str, optional Component of complex spectra to plot, options=['abs', 'real', 'imag']. Default: 'real'. lines : bool, optional If True, plot lines between bandpowers for a given pspectrum. Default: True. markers : bool, optional If True, plot circles at bandpowers. Filled for positive, empty for negative. Default: False. error : str, optional If not None and if error exists in uvp stats_array, plot errors on bandpowers. Default: None. times : array_like, optional Values from ``uvp.time_avg_array`` to plot. Use this to select which integrations appear; if ``label_type="blpairt"``, the legend labels include the selected time for each spectrum. logscale : bool, optional If True, put plot on a log-scale. Else linear scale. Default: True. force_plot : bool, optional If plotting a large number of spectra (>100), this function will error. Set this to True to override this large plot error and force plot. Default: False. label_type : str, optional Line label type in legend. Options are: ``'auto'`` to put static metadata in the title and varying metadata in the legend; ``'key'`` to label lines by ``(spw, blpair, pol)``; ``'blpair'`` to label by blpair; and ``'blpairt'`` to label by blpair-time. plot_stats : string, optional If not None, plot an entry in uvp.stats_array instead of power spectrum in uvp.data_array. kwargs : dict, optional Extra kwargs to pass to _all_ ax.plot calls. Returns ------- fig : matplotlib.pyplot.Figure Matplotlib Figure instance. """ import matplotlib.pyplot as plt # Create new Axes if none specified new_plot = False if ax is None: new_plot = True fig, ax = plt.subplots(1, 1) # Select times if requested if times is not None: uvp = uvp.select(times=times, inplace=False) # Add ungrouped baseline-pairs into a group of their own (expected by the # averaging routines) and reject malformed baseline tuples up front. blpairs = _normalize_plot_blpairs(uvp, blpairs, "delay_spectrum") # Average over blpairs or times if requested blpairs_in = copy.deepcopy(blpairs) # Save input blpair list if average_blpairs: uvp_plt = uvp.average_spectra( blpair_groups=blpairs, time_avg=average_times, inplace=False ) else: uvp_plt = copy.deepcopy(uvp) if average_times: # Average over times, but not baseline-pairs uvp_plt.average_spectra(time_avg=True, inplace=True) # Check plot size if uvp_plt.Ntimes * len(blpairs) > 100 and not force_plot: raise ValueError( "Trying to plot > 100 spectra... Set force_plot=True to continue." ) # Convert to Delta^2 units if requested if deltasq and not delay: uvp_plt.convert_to_deltasq() # Fold the power spectra if requested if fold: uvp_plt.fold_spectra() # Get x-axis units (delays in ns, or k_parallel in Mpc^-1 or h Mpc^-1) if delay: dlys = uvp_plt.get_dlys(spw) * 1e9 # ns x = dlys else: k_para = uvp_plt.get_kparas(spw) x = k_para # Check plot_stats if plot_stats is not None: assert plot_stats in uvp_plt.stats_array, ( f"specified key {plot_stats} not found in stats_array" ) # Collect power spectra and metadata before plotting so smart title/legend # formatting can tell which fields vary across the plotted set. series = [] for blgrp in blpairs: blgrp_label = _format_blpair_group_label(uvp_plt, blgrp) # Loop over blpairs in group and plot power spectrum for each one for blp in blgrp: # setup key and casting function key = (spw, blp, pol) pol_label = _normalize_polpair_label(pol) if component == "real": cast = np.real elif component == "imag": cast = np.imag elif component == "abs": cast = np.abs # get data array and repeat x array if plot_stats is None: data = cast(uvp_plt.get_data(key)) else: data = cast(uvp_plt.get_stats(plot_stats, key)) # flag records that have zero integration flags = np.isclose(uvp_plt.get_integrations(key), 0.0) data[flags] = np.nan # get errs if requessted errs = None if error is not None and hasattr(uvp_plt, "stats_array"): if error in uvp_plt.stats_array: errs = uvp_plt.get_stats(error, key) errs[flags] = np.nan else: raise KeyError( "Error variable '%s' not found in stats_array of UVPSpec object." % error ) elif error is not None: raise KeyError( "Error variable '%s' not found in stats_array of UVPSpec object." % error ) # get times blptimes = uvp_plt.time_avg_array[uvp_plt.blpair_to_indices(blp)] blplsts = uvp_plt.lst_avg_array[uvp_plt.blpair_to_indices(blp)] # iterate over integrations per blp series.extend( { "y": data[i], "t": blptimes[i], "lst": blplsts[i], "key": (spw, blgrp_label, pol_label), "blpair": blgrp_label, "spw": spw, "pol": pol_label, "errs": cast(errs[i]) if errs is not None else None, } for i in range(data.shape[0]) ) # If blpairs were averaged, only the first blpair in the group # exists any more (so skip the rest) if average_blpairs: break title, labels, show_legend = _get_delay_spectrum_title_and_labels( [ { "key": item["key"], "blpair": item["blpair"], "spw": item["spw"], "pol": item["pol"], "time": item["t"], "lst": item["lst"], } for item in series ], label_type, title_legend, average_blpairs=average_blpairs, ) for item, label in zip(series, labels, strict=True): y = item["y"] # plot elements cax = None if lines: if logscale: _y = np.abs(y) else: _y = y (cax,) = ax.plot(x, _y, marker="None", label=label, **kwargs) if markers: if cax is None: c = None else: c = cax.get_color() marker_label = None if lines else label # plot markers if logscale: # plot positive w/ filled circles (cax,) = ax.plot( x[y >= 0], np.abs(y[y >= 0]), c=c, ls="None", marker="o", markerfacecolor=c, markeredgecolor=c, label=marker_label, **kwargs, ) # plot negative w/ unfilled circles c = cax.get_color() (cax,) = ax.plot( x[y < 0], np.abs(y[y < 0]), c=c, ls="None", marker="o", markerfacecolor="None", markeredgecolor=c, **kwargs, ) else: (cax,) = ax.plot( x, y, c=c, ls="None", marker="o", markerfacecolor=c, markeredgecolor=c, label=marker_label, **kwargs, ) if item["errs"] is not None: if cax is None: c = None else: c = cax.get_color() if logscale: _y = np.abs(y) else: _y = y cax = ax.errorbar(x, _y, fmt="none", ecolor=c, yerr=item["errs"], **kwargs) # Set log scale if logscale: ax.set_yscale("log") if title: ax.set_title(title) # Add legend only when legend=True and show_legend=True. show_legend is # False when label_type='auto' and all metadata fields are identical # (a single spectrum or all blpairs at the same LST), because there is # nothing meaningful to distinguish in the legend entries. if legend and show_legend: ax.legend(loc="upper left") # Add labels with units if ax.get_xlabel() == "": if delay: ax.set_xlabel(r"$\tau$ $[{\rm ns}]$", fontsize=16) else: ax.set_xlabel(r"$k_{\parallel}\ h\ Mpc^{-1}$", fontsize=16) if ax.get_ylabel() == "" and plot_stats is None: # Sanitize power spectrum units psunits = uvp_plt.units if "h^-1" in psunits: psunits = psunits.replace(r"h^-1", "h^{-1}") if "h^-3" in psunits: psunits = psunits.replace(r"h^-3", "h^{-3}") if "Mpc" in psunits and "\\rm" not in psunits: psunits = psunits.replace("Mpc", r"{\rm Mpc}") if "pi" in psunits and "\\pi" not in psunits: psunits = psunits.replace("pi", r"\pi") # Power spectrum type if deltasq: ax.set_ylabel(r"$\Delta^2$ $[%s]$" % psunits, fontsize=16) else: ax.set_ylabel(r"$P(k_\parallel)$ $[%s]$" % psunits, fontsize=16) # Return Figure: the axis is an attribute of figure if new_plot: return fig
def delay_waterfall( uvp, blpairs, spw, pol, component="abs-real", average_blpairs=False, fold=False, delay=True, deltasq=False, log=True, lst_in_hrs=True, vmin=None, vmax=None, cmap="YlGnBu", axes=None, figsize=(14, 6), force_plot=False, times=None, title_type="blpair", colorbar=True, **kwargs, ): r""" Plot a 1D delay spectrum waterfall (or spectra) for a group of baselines. Parameters ---------- uvp : UVPspec UVPSpec object, containing delay spectra for a set of baseline-pairs, times, polarizations, and spectral windows. blpairs : sequence of baseline-pair identifiers Sequence of blpair integers, nested baseline-pair tuples like ``((ant1, ant2), (ant3, ant4))``, or lists containing either form. A list element is treated as one plotted/averaged group, so use ``[blpair1, blpair2]`` to plot two blpairs separately and ``[[blpair1, blpair2]]`` to average them together when ``average_blpairs=True``. spw, pol : int or str Which spectral window and polarization to plot. component : str Component of complex spectra to plot, options=['abs', 'real', 'imag', 'abs-real', 'abs-imag']. abs-real is abs(real(data)), whereas 'real' is real(data) Default: 'abs-real'. average_blpairs : bool, optional If True, average over the baseline pairs within each group. fold : bool, optional Whether to fold the power spectrum in :math:`|k_\parallel|`. Default: False. delay : bool, optional Whether to plot the power spectrum in delay units (ns) or cosmological units (h/Mpc). Default: True. deltasq : bool, optional If True, plot dimensionless power spectra, Delta^2. This is ignored if delay=True. Default: False. log : bool, optional Whether to plot the log10 of the data. Default: True. lst_in_hrs : bool, optional If True, LST is plotted in hours, otherwise its plotted in radians. vmin, vmax : float, optional Clip the color scale of the delay spectrum to these min./max. values. If None, use the natural range of the data. Default: None. cmap : str, optional Matplotlib colormap to use. Default: 'YlGnBu'. axes : array of matplotlib.axes, optional Use this to pass in an existing Axes object or array of axes, which the power spectra will be added to. (Warning: Labels and legends will not be altered in this case, even if the existing plot has completely different axis labels etc.) If None, a new Axes object will be created. Default: None. figsize : tuple len-2 integer tuple specifying figure size if axes is None force_plot : bool If plotting a large number of blpairs (>20), this routine will quit unless force_plot == True. times : array_like, optional Values from ``uvp.time_avg_array`` to plot. title_type : str, optional Type of title to put above plot(s). Options = ['blpair', 'blvec'] blpair : "bls: {bl1} x {bl2}" blvec : "bl len {len} m & ang {ang} deg" colorbar : bool, optional Whether to make a colorbar. Default: True kwargs : keyword arguments Additional kwargs to pass to ax.matshow() Returns ------- fig : matplotlib.pyplot.Figure Matplotlib Figure instance if input ax is None. """ import matplotlib as mpl import matplotlib.pyplot as plt # assert component assert component in ["real", "abs", "imag", "abs-real", "abs-imag"], ( f"Can't parse specified component {component}" ) fix_negval = component in ["real", "imag"] and log # Add ungrouped baseline-pairs into a group of their own (expected by the # averaging routines) and reject malformed baseline tuples up front. blpairs = _normalize_plot_blpairs(uvp, blpairs, "delay_waterfall") # Select times if requested if times is not None: uvp = uvp.select(times=times, inplace=False) # Average over blpairs or times if requested blpairs_in = copy.deepcopy(blpairs) # Save input blpair list if average_blpairs: uvp_plt = uvp.average_spectra( blpair_groups=blpairs, time_avg=False, inplace=False ) else: uvp_plt = copy.deepcopy(uvp) # Fold the power spectra if requested if fold: uvp_plt.fold_spectra() # Convert to Delta^2 units if requested if deltasq and not delay: uvp_plt.convert_to_deltasq() # Get x-axis units (delays in ns, or k_parallel in Mpc^-1 or h Mpc^-1) if delay: dlys = uvp_plt.get_dlys(spw) * 1e9 # ns x = dlys else: k_para = uvp_plt.get_kparas(spw) x = k_para # Extract power spectra into array waterfall = {} for blgrp in blpairs: # Loop over blpairs in group and plot power spectrum for each one for blp in blgrp: # make key key = (spw, blp, pol) # get power data power = uvp_plt.get_data(key, omit_flags=False) # set flagged power data to nan flags = np.isclose(uvp_plt.get_integrations(key), 0.0) power[flags, :] = np.nan # get component if component == "abs": power = np.abs(power) elif component == "real": power = np.real(power) elif component == "abs-real": power = np.abs(np.real(power)) elif component == "imag": power = np.imag(power) elif component == "abs-imag": power = np.abs(np.real(power)) # if real or imag and log is True, set negative values to near zero # this is done so that one can use cmap.set_under() and cmap.set_bad() separately if fix_negval: power[power < 0] = np.abs(power).min() * 1e-6 + 1e-10 # assign to waterfall waterfall[key] = power # If blpairs were averaged, only the first blpair in the group # exists any more (so skip the rest) if average_blpairs: break # check for reasonable number of blpairs to plot... Nkeys = len(waterfall) if Nkeys > 20 and not force_plot: raise ValueError("Nblps > 20 and force_plot == False, quitting...") # Take logarithm of data if requested if log: for k in waterfall: waterfall[k] = np.log10(np.abs(waterfall[k])) logunits = r"\log_{10}" else: logunits = "" # Create new Axes if none specified new_plot = False if axes is None: new_plot = True # figure out how many subplots to make Nkeys = len(waterfall) Nside = int(np.ceil(np.sqrt(Nkeys))) fig, axes = plt.subplots(Nside, Nside, figsize=figsize) # Ensure axes is an ndarray if isinstance(axes, mpl.axes.Axes): axes = np.array([[axes]]) if isinstance(axes, list): axes = np.array(axes) # Ensure its 2D and get side lengths if axes.ndim == 1: axes = axes[:, None] assert axes.ndim == 2, "input axes must have ndim == 2" Nvert, Nhorz = axes.shape # Get LST range: setting y-ticks is tricky due to LST wrapping... y = uvp_plt.lst_avg_array[uvp_plt.key_to_indices(list(waterfall.keys())[0])[1]] y = np.unwrap(y) if y[0] > np.pi: # if start is closer to 2pi than 0, lower axis by an octave y -= 2 * np.pi if lst_in_hrs: lst_units = "Hr" y *= 24 / (2 * np.pi) else: lst_units = "rad" # get baseline vectors blvecs = dict( zip( [uvp_plt.bl_to_antnums(bl) for bl in uvp_plt.bl_array], uvp_plt.get_ENU_bl_vecs(), ) ) # Sanitize power spectrum units psunits = uvp_plt.units if "h^-1" in psunits: psunits = psunits.replace("h^-1", "h^{-1}") if "h^-3" in psunits: psunits = psunits.replace("h^-3", "h^{-3}") if "Hz" in psunits: psunits = psunits.replace("Hz", r"{\rm Hz}") if "str" in psunits: psunits = psunits.replace("str", r"\,{\rm str}") if "Mpc" in psunits and "\\rm" not in psunits: psunits = psunits.replace("Mpc", r"{\rm Mpc}") if "pi" in psunits and "\\pi" not in psunits: psunits = psunits.replace("pi", r"\pi") if "beam normalization not specified" in psunits: psunits = psunits.replace("beam normalization not specified", r"{\rm unnormed}") # Iterate over waterfall keys keys = list(waterfall.keys()) k = 0 for i in range(Nvert): for j in range(Nhorz): # set ax ax = axes[i, j] # turn off subplot if no more plots to make if k >= Nkeys: ax.axis("off") continue # get blpair key for this subplot key = keys[k] blp = uvp_plt.blpair_to_antnums(key[1]) # plot waterfall cax = ax.matshow( waterfall[key], cmap=cmap, aspect="auto", vmin=vmin, vmax=vmax, extent=[x[0], x[-1], y[-1], y[0]], **kwargs, ) # ax config ax.xaxis.set_ticks_position("bottom") ax.tick_params(labelsize=12) if ax.get_title() == "": if title_type == "blpair": ax.set_title("bls: {} x {}".format(*blp), y=1) elif title_type == "blvec": blv = 0.5 * (blvecs[blp[0]] + blvecs[blp[1]]) lens, angs = utils.get_bl_lens_angs([blv], bl_error_tol=1.0) ax.set_title(f"bl len {lens[0]:0.2f} m & {angs[0]:0.0f} deg", y=1) # set colorbar if colorbar: if fix_negval: cb_extend = "min" else: cb_extend = "neither" cbar = ax.get_figure().colorbar(cax, ax=ax, extend=cb_extend) cbar.ax.tick_params(labelsize=14) if fix_negval: cbar.ax.set_title("$< 0$", y=-0.05, fontsize=16) # configure left-column plots if j == 0: # set yticks ax.set_ylabel(rf"LST [{lst_units}]", fontsize=16) else: ax.set_yticklabels([]) # configure bottom-row plots if k + Nhorz >= Nkeys: if ax.get_xlabel() == "": if delay: ax.set_xlabel(r"$\tau$ $[{\rm ns}]$", fontsize=16) else: ax.set_xlabel(r"$k_{\parallel}\ h\ Mpc^{-1}$", fontsize=16) else: ax.set_xticklabels([]) k += 1 # make suptitle if axes[0][0].get_figure()._suptitle is None: if deltasq: units = r"$%s\Delta^2$ $[%s]$" % (logunits, psunits) else: units = r"$%sP(k_\parallel)$ $[%s]$" % (logunits, psunits) spwrange = np.around(np.array(uvp_plt.get_spw_ranges()[spw][:2]) / 1e6, 2) axes[0][0].get_figure().suptitle( "{}\n{} polarization | {} -- {} MHz".format(units, pol, *spwrange), y=1.03, fontsize=14, ) # Return Axes if new_plot: return fig def delay_wedge( uvp, spw, pol, blpairs=None, times=None, error_weights=None, fold=False, delay=True, rotate=False, component="abs-real", log10=True, loglog=False, red_tol=1.0, center_line=False, horizon_lines=False, title=None, ax=None, cmap="viridis", figsize=(8, 6), deltasq=False, colorbar=False, cbax=None, vmin=None, vmax=None, edgecolor="none", flip_xax=False, flip_yax=False, lw=2, set_bl_tick_major=False, set_bl_tick_minor=False, xtick_size=10, xtick_rot=0, ytick_size=10, ytick_rot=0, **kwargs, ): """ Plot a 2D delay spectrum (or spectra) from a UVPSpec object. Note that all integrations and redundant baselines are averaged (unless specifying times) before plotting. Note: this deepcopies input uvp before averaging. Parameters ---------- uvp : UVPSpec UVPSpec object containing delay spectra to plot. spw : integer Which spectral window to plot. pol : int or tuple Polarization-pair integer or tuple, e.g. ('pI', 'pI') blpairs : list of tuples, optional List of baseline-pair tuples to use in plotting. times : list, optional An ndarray or list of times from uvp.time_avg_array to select on before plotting. Default: None. error_weights : string, optional error_weights specify which kind of errors we use for weights during averaging power spectra. fold : bool, optional Whether to fold the power spectrum in k_parallel. Default: False. delay : bool, optional Whether to plot the axes in tau (ns). If False, axes will be plotted in cosmological units. Default: True. rotate : bool, optional If False, use baseline-type as x-axis and delay as y-axis, else use baseline-type as y-axis and delay as x-axis. Default: False component : str, optional Component of complex spectra to plot. Options=['real', 'imag', 'abs', 'abs-real', 'abs-imag'] abs-real is abs(real(data)), whereas 'real' is real(data) Default: 'abs-real'. log10 : bool, optional If True, take log10 of data before plotting. Default: True loglog : bool, optional If True, turn x-axis and y-axis into log-log scale. Default: False red_tol : float, optional Redundancy tolerance when solving for redundant groups in meters. Default: 1.0 center_line : bool, optional Whether to plot a dotted line at k_perp = 0. Default: False. horizon_lines : bool, optional Whether to plot dotted lines along the horizon. Default: False. title : string, optional Title for subplot. Default: None. ax : matplotlib.axes, optional If not None, use this axes as a subplot for delay wedge. cmap : str, optional Colormap of wedge plot. Default: 'viridis' figsize : len-2 integer tuple, optional If ax is None, this is the new figure size. deltasq : bool, optional Convert to Delta^2 before plotting. This is ignored if delay=True. Default: False colorbar : bool, optional Add a colorbar to the plot. Default: False cbax : matplotlib.axes, optional Axis object for adding colorbar if True. Default: None vmin : float, optional Minimum range of colorscale. Default: None vmax : float, optional Maximum range of colorscale. Default: None edgecolor : str, optional Edgecolor of bins in pcolormesh. Default: 'none' flip_xax : bool, optional Flip xaxis if True. Default: False flip_yax : bool, optional Flip yaxis if True. Default: False lw : int, optional Line-width of horizon and center lines if plotted. Default: 2. set_bl_tick_major : bool, optional If True, use the baseline lengths as major ticks, rather than default uniform grid. set_bl_tick_minor : bool, optional If True, use the baseline lengths as minor ticks, which have no labels. kwargs : dictionary Additional keyword arguments to pass to pcolormesh() call. Returns ------- fig : matplotlib.pyplot.Figure Matplotlib Figure instance if ax is None. """ import matplotlib as mpl import matplotlib.pyplot as plt # type checking uvp = copy.deepcopy(uvp) assert isinstance(uvp, uvpspec.UVPSpec), "input uvp must be a UVPSpec object" assert isinstance(spw, (int, np.integer)) assert isinstance(pol, (int, np.integer, tuple)) fix_negval = component in ["real", "imag"] and log10 # check pspec units for little h little_h = "h^-3" in uvp.norm_units # Create new ax if none specified new_plot = False if ax is None: fig, ax = plt.subplots(figsize=figsize) new_plot = True else: fig = ax.get_figure() # Select out times and blpairs if provided if times is not None: uvp.select(blpairs=blpairs, times=times, inplace=True) # Average across redundant groups and time # this also ensures blpairs are ordered from short_bl --> long_bl blp_grps, lens, angs, tags = utils.get_blvec_reds( uvp, bl_error_tol=red_tol, match_bl_lens=True ) uvp.average_spectra( blpair_groups=blp_grps, time_avg=True, error_weights=error_weights, inplace=True ) # get blpairs and order by len and enforce bl len ordering anyways blpairs, blpair_seps = uvp.get_blpairs(), uvp.get_blpair_seps() osort = np.argsort(blpair_seps) blpairs, blpair_seps = [blpairs[oi] for oi in osort], blpair_seps[osort] if len(blpairs) < 2: raise ValueError( "delay_wedge requires at least two baseline pairs after selection " "and averaging." ) # Convert to DeltaSq if deltasq and not delay: uvp.convert_to_deltasq(inplace=True) # Fold array if fold: uvp.fold_spectra() # Format ticks if delay: x_axis = uvp.get_dlys(spw) * 1e9 y_axis = blpair_seps else: x_axis = uvp.get_kparas(spw, little_h=little_h) y_axis = uvp.get_kperps(spw, little_h=little_h) if rotate: _x_axis = y_axis y_axis = x_axis x_axis = _x_axis # Conigure Units psunits = rf"({uvp.vis_units})^2\ {uvp.norm_units}" if "h^-1" in psunits: psunits = psunits.replace("h^-1", r"h^{-1}\ ") if "h^-3" in psunits: psunits = psunits.replace(r"h^-3", r"h^{-3}\ ") if "Hz" in psunits: psunits = psunits.replace("Hz", r"{\rm Hz}\ ") if "str" in psunits: psunits = psunits.replace("str", r"\,{\rm str}\ ") if "Mpc" in psunits and "\\rm" not in psunits: psunits = psunits.replace("Mpc", r"{\rm Mpc}") if "pi" in psunits and "\\pi" not in psunits: psunits = psunits.replace("pi", r"\pi") if "beam normalization not specified" in psunits: psunits = psunits.replace("beam normalization not specified", r"{\rm unnormed}") # get data with shape (Nblpairs, Ndlys) data = [uvp.get_data((spw, blp, pol)).squeeze() for blp in blpairs] # get component if component == "real": data = np.real(data) elif component == "abs-real": data = np.abs(np.real(data)) elif component == "imag": data = np.imag(data) elif component == "abs-imag": data = np.abs(np.imag(data)) elif component == "abs": data = np.abs(data) else: raise ValueError(f"Did not understand component {component}") # if real or imag and log is True, set negative values to near zero # this is done so that one can use cmap.set_under() and cmap.set_bad() separately if fix_negval: data[data < 0] = np.abs(data).min() * 1e-6 + 1e-10 # take log10 if log10: data = np.log10(data) # loglog if loglog: ax.set_xscale("log") ax.set_yscale("log") ax.yaxis.set_major_formatter(mpl.ticker.LogFormatterSciNotation()) ax.yaxis.set_minor_formatter(mpl.ticker.NullFormatter()) ax.xaxis.set_major_formatter(mpl.ticker.LogFormatterSciNotation()) ax.xaxis.set_minor_formatter(mpl.ticker.NullFormatter()) # rotate if rotate: data = np.rot90(data[:, ::-1], k=1) # Get bin edges xdiff = np.diff(x_axis) x_edges = np.array( [x_axis[0] - xdiff[0] / 2.0] + list(x_axis[:-1] + xdiff / 2.0) + [x_axis[-1] + xdiff[-1] / 2.0] ) ydiff = np.diff(y_axis) y_edges = np.array( [y_axis[0] - ydiff[0] / 2.0] + list(y_axis[:-1] + ydiff / 2.0) + [y_axis[-1] + ydiff[-1] / 2.0] ) X, Y = np.meshgrid(x_edges, y_edges) # plot cax = ax.pcolormesh( X, Y, data, cmap=cmap, edgecolor=edgecolor, lw=0.01, vmin=vmin, vmax=vmax, **kwargs, ) # Configure ticks if set_bl_tick_major: if rotate: ax.set_xticks([np.around(x, _get_sigfig(x) + 2) for x in x_axis]) else: ax.set_yticks([np.around(x, _get_sigfig(x) + 2) for x in y_axis]) if set_bl_tick_minor: if rotate: ax.set_xticks( [np.around(x, _get_sigfig(x) + 2) for x in x_axis], minor=True ) else: ax.set_yticks( [np.around(x, _get_sigfig(x) + 2) for x in y_axis], minor=True ) # Add colorbar if colorbar: if fix_negval: cb_extend = "min" else: cb_extend = "neither" if cbax is None: cbax = ax cbar = fig.colorbar(cax, ax=cbax, extend=cb_extend) if deltasq: p = r"\Delta^2" else: p = "P" if delay: p = r"{}({},\ {})".format(p, r"\tau", r"|\vec{b}|") else: p = r"{}({},\ {})".format(p, r"k_\parallel", r"k_\perp") if log10: psunits = rf"$\log_{{10}}\ {p}\ [{psunits}]$" else: psunits = rf"${p}\ [{psunits}]$" cbar.set_label(psunits, fontsize=14) if fix_negval: cbar.ax.set_title("$< 0$", y=-0.05, fontsize=16) # Configure tick labels if delay: xlabel = r"$\tau$ $[{\rm ns}]$" ylabel = r"$|\vec{b}|$ $[{\rm m}]$" else: xlabel = r"$k_{\parallel}\ [h\ \rm Mpc^{-1}]$" ylabel = r"$k_{\perp}\ [h\ \rm Mpc^{-1}]$" if rotate: _xlabel = ylabel ylabel = xlabel xlabel = _xlabel if ax.get_xlabel() == "": ax.set_xlabel(xlabel, fontsize=16) if ax.get_ylabel() == "": ax.set_ylabel(ylabel, fontsize=16) # Configure center line if center_line: if rotate: ax.axhline(y=0, color="#000000", ls="--", lw=lw) else: ax.axvline(x=0, color="#000000", ls="--", lw=lw) # Plot horizons if horizon_lines: # get horizon in ns horizons = blpair_seps / conversions.units.c * 1e9 # convert to cosmological wave vector if not delay: # Get average redshift of spw avg_z = uvp.cosmo.f2z(np.mean(uvp.freq_array[uvp.spw_to_freq_indices(spw)])) horizons *= uvp.cosmo.tau_to_kpara(avg_z, little_h=little_h) / 1e9 # iterate over bins and plot lines if rotate: bin_edges = x_edges else: bin_edges = y_edges for i, hor in enumerate(horizons): if rotate: ax.plot( bin_edges[i : i + 2], [hor, hor], color="#ffffff", ls="--", lw=lw ) if not uvp.folded: ax.plot( bin_edges[i : i + 2], [-hor, -hor], color="#ffffff", ls="--", lw=lw, ) else: ax.plot( [hor, hor], bin_edges[i : i + 2], color="#ffffff", ls="--", lw=lw ) if not uvp.folded: ax.plot( [-hor, -hor], bin_edges[i : i + 2], color="#ffffff", ls="--", lw=lw, ) # flip axes if flip_xax: fig.sca(ax) fig.gca().invert_xaxis() if flip_yax: fig.sca(ax) fig.gca().invert_yaxis() # add title if title is not None: ax.set_title(title, fontsize=12) # Configure tick sizes and rotation [tl.set_size(xtick_size) for tl in ax.get_xticklabels()] [tl.set_rotation(xtick_rot) for tl in ax.get_xticklabels()] [tl.set_size(ytick_size) for tl in ax.get_yticklabels()] [tl.set_rotation(ytick_rot) for tl in ax.get_yticklabels()] # return figure if new_plot: return fig def plot_uvdata_waterfalls( uvd, basename, data="data", plot_mode="log", vmin=None, vmax=None, recenter=False, format="png", # noqa: A002 — public kwarg name; renaming is a breaking change **kwargs, ): """ Plot waterfalls for all baselines and polarizations within a UVData object, and save to individual files. Parameters ---------- uvd : UVData object Input data object. Waterfalls will be stored for all baselines and polarizations within the object; use uvd.select() to remove unwanted information. basename : str Base filename for the output plots. This must have two placeholders for baseline ID ('bl') and polarization ('pol'), e.g. basename='plots/uvdata.{pol}.{bl}'. data : str, optional Which data array to plot from the UVData object. Options are: 'data', 'flags', 'nsamples'. Default: 'data'. plot_mode : str, optional Plot mode, passed to uvtools.plot.waterfall. Default: 'log'. vmin, vmax : float, optional Min./max. values of the plot colorscale. Default: None (uses the min./max. of the data). recenter : bool, optional Whether to apply recentering (see uvtools.plot.waterfall). Default: False. format : str, optional The file format of the output images. If None, the image format will be deduced from the basename string. Default: 'png'. **kwargs : dict Keyword arguments passed to uvtools.plot.waterfall, which passes them on to matplotlib.imshow. """ import matplotlib.pyplot as plt assert isinstance(uvd, UVData), "'uvd' must be a UVData object." assert data in ["data", "flags", "nsamples"], ( "'%s' not a valid data array; use 'data', 'flags', or 'nsamples'" % data ) # Set plot colorscale max/min if specified drng = None if vmin is not None: assert vmax is not None, "Must also specify vmax if vmin is specified." drng = vmax - vmin # Empty figure fig, ax = plt.subplots(1, 1) # Loop over antenna pairs and pols for (ant1, ant2, pol), d in uvd.antpairpol_iter(): # Get chosen data array if data == "data": pass elif data == "flags": d = uvd.get_flags((ant1, ant2, pol)) elif data == "nsamples": d = uvd.get_nsamples((ant1, ant2, pol)) else: raise KeyError("Invalid data array type '%s'" % data) # Make plot img = uvtools.plot.waterfall( d, mode=plot_mode, mx=vmax, drng=drng, recenter=recenter, **kwargs ) fig.colorbar(img) # Save to file outfile = basename.format(bl="%d.%d" % (ant1, ant2), pol=pol) if format is not None: # Make sure format extension is given if outfile[-len(format)].lower() != format.lower(): outfile = "%s.%s" % (outfile, format) fig.tight_layout() fig.savefig(outfile, format=format) fig.clf() def _get_sigfig(x): return -int(np.floor(np.log10(np.abs(x)))) def _round_sigfig(x, up=True): sigfigs = _get_sigfig(x) if up: return np.ceil(10**sigfigs * x) / 10**sigfigs else: return np.floor(10**sigfigs * x) / 10**sigfigs