Source code for datacube.utils.masking

# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2025 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Tools for masking data based on a bit-mask variable with attached definition.

The main functions are `make_mask(variable)` `describe_flags(variable)`
"""

from collections.abc import Iterable

import pandas
import xarray
from xarray import DataArray, Dataset

from datacube.utils.math import valid_mask

FLAGS_ATTR_NAME = "flags_definition"


def list_flag_names(variable):
    """
    Returns the available masking flags for the variable

    :param variable: Masking xarray.Dataset or xarray.DataArray
    :return: list
    """
    flags_def = get_flags_def(variable)
    return sorted(flags_def.keys())


[docs] def describe_variable_flags(variable, with_pandas: bool = True): """ Returns either a Pandas Dataframe (with_pandas=True - default) or a string (with_pandas=False) describing the available flags for a masking variable Interprets the `flags_definition` attribute on the provided variable and returns a Pandas Dataframe or string like:: Bits are listed from the MSB (bit 13) to the LSB (bit 0) Bit Value Flag Name Description 13 0 cloud_shadow_fmask Cloud Shadow (Fmask) 12 0 cloud_shadow_acca Cloud Shadow (ACCA) 11 0 cloud_fmask Cloud (Fmask) 10 0 cloud_acca Cloud (ACCA) :param variable: Masking xarray.Dataset or xarray.DataArray :return: Pandas Dataframe or str """ flags_def = get_flags_def(variable) if not with_pandas: return describe_flags_def(flags_def) return pandas.DataFrame.from_dict(flags_def, orient="index")
def describe_flags_def(flags_def) -> str: return "\n".join(generate_table(list(_table_contents(flags_def)))) def _table_contents(flags_def): yield "Flag name", "Description", "Bit. No", "Value", "Meaning" for name, defn in sorted(flags_def.items(), key=_order_bitdefs_by_bits): name, desc = name, defn["description"] for value, meaning in defn["values"].items(): yield name, desc, str(defn["bits"]), str(value), str(meaning) name, desc = "", "" def _order_bitdefs_by_bits(bitdef): name, defn = bitdef try: return min(defn["bits"]) except TypeError: return defn["bits"]
[docs] def make_mask(variable: Dataset | DataArray, **flags): """ Returns a mask array, based on provided flags When multiple flags are provided, they will be combined in a logical AND fashion. For example: >>> make_mask(pqa, cloud_acca=False, cloud_fmask=False, land_obs=True) # doctest: +SKIP OR >>> make_mask(pqa, **GOOD_PIXEL_FLAGS) # doctest: +SKIP where `GOOD_PIXEL_FLAGS` is a dict of flag_name to True/False :param variable: :param flags: list of boolean flags :return: boolean xarray.DataArray or xarray.Dataset """ flags_def = get_flags_def(variable) mask, mask_value = create_mask_value(flags_def, **flags) return variable & mask == mask_value
def valid_data_mask(data): """ Returns bool arrays where the data is not `nodata` :param Dataset or DataArray data: :return: Dataset or DataArray """ if isinstance(data, Dataset): return data.map(valid_data_mask) if not isinstance(data, DataArray): raise TypeError(f"valid_data_mask not supported for type {type(data)}") nodata = data.attrs.get("nodata", None) return xarray.apply_ufunc( valid_mask, data, nodata, dask="parallelized", output_dtypes=[bool] )
[docs] def mask_invalid_data(data, keep_attrs: bool = True): """ Sets all `nodata` values to ``nan``. This will convert numeric data to type `float`. :param Dataset or DataArray data: :param keep_attrs: If the attributes of the data should be included in the returned . :return: Dataset or DataArray """ if isinstance(data, Dataset): # Pass keep_attrs as a positional arg to the DataArray func return data.map(mask_invalid_data, keep_attrs=keep_attrs, args=(keep_attrs,)) if isinstance(data, DataArray): if "nodata" not in data.attrs: return data out_data_array = data.where(data != data.nodata) if keep_attrs: out_data_array.attrs = { key: value for key, value in data.attrs.items() if key != "nodata" } return out_data_array raise TypeError(f"mask_invalid_data not supported for type {type(data)}")
def create_mask_value(bits_def, **flags) -> tuple[int, int]: mask = 0 value = 0 for flag_name, flag_ref in flags.items(): defn = bits_def.get(flag_name, None) if defn is None: raise ValueError(f'Unknown flag: "{flag_name}"') try: [flag_value] = ( bit_val for bit_val, val_ref in defn["values"].items() if val_ref == flag_ref ) flag_value = int(flag_value) # Might be string if coming from DB except ValueError: raise ValueError( f"Unknown value {flag_ref} specified for flag {flag_name}" ) from None if isinstance(defn["bits"], Iterable): # Multi-bit flag # Set mask for bit in defn["bits"]: mask = set_value_at_index(mask, bit, True) shift = min(defn["bits"]) real_val = flag_value << shift value |= real_val else: bit = defn["bits"] mask = set_value_at_index(mask, bit, True) value = set_value_at_index(value, bit, bool(flag_value)) return mask, value def mask_to_dict(bits_def: dict, mask_value: int) -> dict: """ Describes which flags are set for a mask value :param bits_def: :param mask_value: :return: Mapping of flag_name -> set_value """ return_dict = {} for flag_name, flag_defn in bits_def.items(): # Make bits a list, even if there is only one flag_bits = flag_defn["bits"] if not isinstance(flag_defn["bits"], list): flag_bits = [flag_bits] # The amount to shift flag_value to line up with mask_value flag_shift = min(flag_bits) # Mask our mask_value, we are only interested in the bits for this flag flag_mask = 0 for i in flag_bits: flag_mask |= 1 << i masked_mask_value = mask_value & flag_mask for flag_value, value in flag_defn["values"].items(): shifted_value = int(flag_value) << flag_shift if shifted_value == masked_mask_value: assert flag_name not in return_dict return_dict[flag_name] = value return return_dict def get_flags_def(variable): flags = getattr(variable, FLAGS_ATTR_NAME, None) if flags is not None: return flags data_vars = getattr(variable, "data_vars", None) if data_vars is not None: # Maybe we have a DataSet, not a DataArray for var in data_vars.values(): flags = getattr(var, FLAGS_ATTR_NAME, None) if flags is not None: return flags raise ValueError("No masking variable found") def set_value_at_index(bitmask: int, index: int, value: bool) -> int: """ Set a bit value onto an integer bitmask eg. set bits 2 and 4 to True >>> mask = 0 >>> mask = set_value_at_index(mask, 2, True) >>> mask = set_value_at_index(mask, 4, True) >>> print(bin(mask)) 0b10100 >>> mask = set_value_at_index(mask, 2, False) >>> print(bin(mask)) 0b10000 :param bitmask: existing int bitmask to alter """ bit_val = 2**index if value: bitmask |= bit_val else: bitmask &= ~bit_val return bitmask def generate_table(rows): """ Yield strings to print a table using the data in `rows`. TODO: Maybe replace with Pandas :param rows: A sequence of sequences with the 0th element being the table header """ # - figure out column widths widths = [len(max(columns, key=len)) for columns in zip(*rows)] # - print the header header, data = rows[0], rows[1:] yield ( " | ".join(format(title, f"{width}s") for width, title in zip(widths, header)) ) # Print the separator first_col = "" # - print the data for row in data: if first_col == "" and row[0] != "": # - print the separator yield "-+-".join("-" * width for width in widths) first_col = row[0] yield ( " | ".join(format(cdata, f"{width}s") for width, cdata in zip(widths, row)) )