# This file is part of the Open Data Cube, see https://opendatacube.org for more information
#
# Copyright (c) 2015-2025 ODC Contributors
# SPDX-License-Identifier: Apache-2.0
"""
Storage Query and Access API module
"""
import collections
import datetime
import logging
import math
import warnings
from collections.abc import Iterable
from typing import TYPE_CHECKING, Union
import numpy as np
import pandas
import xarray
from odc.geo import Geometry
from odc.geo.geobox import GeoBox
from odc.geo.geom import lonlat_bounds, mid_longitude
from pandas import to_datetime as pandas_to_datetime
from typing_extensions import override
from datacube.index import Index
from ..index import extract_geom_from_query, strip_all_spatial_fields_from_query
from ..migration import ODC2DeprecationWarning
from ..model import Dataset, QueryField, Range
from ..utils.dates import normalise_dt, tz_aware
if TYPE_CHECKING:
from datacube.utils.geometry import GeoBox as LegacyGeoBox
_LOG: logging.Logger = logging.getLogger(__name__)
[docs]
class GroupBy:
[docs]
def __init__(
self, group_by_func, dimension, units, sort_key=None, group_key=None
) -> None:
"""
GroupBy Object
:param group_by_func: Dataset -> group identifier
:param dimension: dimension of the group key
:param units: units of the group key
:param sort_key: how to sort datasets in a group internally
:param group_key: the coordinate value for a group
list[Dataset] -> coord value
"""
self.group_by_func = group_by_func
self.dimension = dimension
self.units = units
if sort_key is None:
sort_key = group_by_func
self.sort_key = sort_key
if group_key is None:
group_key = lambda datasets: group_by_func(datasets[0]) # noqa: E731
self.group_key = group_key
OTHER_KEYS = (
"measurements",
"group_by",
"output_crs",
"resolution",
"set_nan",
"product",
"geopolygon",
"like",
"source_filter",
)
[docs]
class Query:
def __init__(
self,
index: Index | None = None,
product: str | None = None,
geopolygon=None,
like: GeoBox | xarray.Dataset | xarray.DataArray | None = None,
**search_terms,
) -> None:
"""Parses search terms in preparation for querying the Data Cube Index.
Create a :class:`Query` object by passing it a set of search terms as keyword arguments.
>>> query = Query(product='ls5_nbar_albers', time=('2001-01-01', '2002-01-01'))
Use by accessing :attr:`search_terms`:
>>> query.search_terms['time'] # doctest: +NORMALIZE_WHITESPACE
Range(begin=datetime.datetime(2001, 1, 1, 0, 0, tzinfo=tzutc()), \
end=datetime.datetime(2002, 1, 1, 23, 59, 59, 999999, tzinfo=tzutc()))
By passing in an ``index``, the search parameters will be validated as existing on the ``product``,
and a spatial search appropriate for the index driver can be extracted.
Used by :meth:`datacube.Datacube.find_datasets` and :meth:`datacube.Datacube.load`.
:param index: An optional `index` object, if checking of field names is desired.
:param product: name of product
:type geopolygon: the spatial boundaries of the search, can be:
odc.geo.geom.Geometry: A Geometry object
Any string or JsonLike object that can be converted to a Geometry object.
An iterable of either of the above; or
None: no geopolygon defined (may be derived from like or lat/lon/x/y/crs search terms)
:param like: spatio-temporal bounds of `like` are used for the search
:param search_terms:
* `measurements` - list of measurements to retrieve
* `latitude`, `lat`, `y`, `longitude`, `lon`, `long`, `x` - tuples (min, max) bounding spatial dimensions
* 'extra_dimension_name' (e.g. `z`) - tuples (min, max) bounding extra \
dimensions specified by name for 3D datasets. E.g. z=(10, 30).
* `crs` - spatial coordinate reference system to interpret the spatial bounds
* `group_by` - observation grouping method. One of `time`, `solar_day`. Default is `time`
"""
self.index = index
self.product = product
self.geopolygon = extract_geom_from_query(geopolygon=geopolygon, **search_terms)
if (
"source_filter" in search_terms
and search_terms["source_filter"] is not None
):
self.source_filter: Query | None = Query(**search_terms["source_filter"])
else:
self.source_filter = None
search_terms = strip_all_spatial_fields_from_query(search_terms)
remaining_keys = set(search_terms.keys()) - set(OTHER_KEYS)
if index:
# Retrieve known keys for extra dimensions
known_dim_keys: set = set()
if product is not None:
datacube_products: Iterable = index.products.search(product=product)
else:
datacube_products = index.products.get_all()
for datacube_product in datacube_products:
known_dim_keys.update(datacube_product.extra_dimensions.dims.keys())
remaining_keys -= known_dim_keys
unknown_keys = remaining_keys - set(index.products.get_field_names())
# TODO: What about keys source filters, and what if the keys don't match up with this product...
if unknown_keys:
raise LookupError("Unknown arguments: ", unknown_keys)
self.search = {}
for key in remaining_keys:
self.search.update(_values_to_search(**{key: search_terms[key]}))
if like:
assert self.geopolygon is None, (
"'like' with other spatial bounding parameters is not supported"
)
like = _normalise_geobox(like)
self.geopolygon = like.extent
if "time" not in self.search:
time_coord = like.coords.get("time") # type: ignore[attr-defined]
if time_coord is not None:
self.search["time"] = _time_to_search_dims(
# convert from np.datetime64 to datetime.datetime
(
pandas_to_datetime(time_coord.values[0]).to_pydatetime(),
pandas_to_datetime(time_coord.values[-1]).to_pydatetime(),
)
)
@property
def search_terms(self) -> dict:
"""
Access the search terms as a dictionary.
"""
kwargs = {}
kwargs.update(self.search)
if self.geopolygon:
if self.index and self.index.supports_spatial_indexes:
kwargs["geopolygon"] = self.geopolygon
else:
geo_bb = lonlat_bounds(self.geopolygon, resolution="auto")
if math.isclose(geo_bb.bottom, geo_bb.top, abs_tol=1e-5):
kwargs["lat"] = geo_bb.bottom
else:
kwargs["lat"] = Range(geo_bb.bottom, geo_bb.top)
if math.isclose(geo_bb.left, geo_bb.right, abs_tol=1e-5):
kwargs["lon"] = geo_bb.left
else:
kwargs["lon"] = Range(geo_bb.left, geo_bb.right)
if self.product:
kwargs["product"] = self.product
if self.source_filter:
kwargs["source_filter"] = self.source_filter.search_terms
return kwargs
@override
def __repr__(self) -> str:
return self.__str__()
@override
def __str__(self) -> str:
return f"""Datacube Query:
type = {self.product}
search = {self.search}
geopolygon = {self.geopolygon}
"""
def _extract_time_from_ds(ds: Dataset) -> datetime.datetime:
return normalise_dt(ds.center_time)
[docs]
def query_group_by(
group_by: str | GroupBy | None = "time", **kwargs: QueryField
) -> GroupBy:
"""
Group by function for loading datasets
:param group_by: group_by name, supported strings are
- `time` (default)
- `solar_day`, see :func:`datacube.api.query.solar_day`
or pass a :class:`datacube.api.query.GroupBy` object.
:return: :class:`datacube.api.query.GroupBy`
:raises LookupError: when group_by string is not a valid dictionary key.
"""
if isinstance(group_by, GroupBy):
return group_by
if not isinstance(group_by, str):
group_by = None
time_grouper = GroupBy(
group_by_func=_extract_time_from_ds,
dimension="time",
units="seconds since 1970-01-01 00:00:00",
)
solar_day_grouper = GroupBy(
group_by_func=solar_day,
dimension="time",
units="seconds since 1970-01-01 00:00:00",
sort_key=_extract_time_from_ds,
group_key=lambda datasets: _extract_time_from_ds(datasets[0]),
)
group_by_map: dict[str | None, GroupBy] = {
None: time_grouper,
"time": time_grouper,
"solar_day": solar_day_grouper,
}
try:
return group_by_map[group_by]
except KeyError:
raise LookupError(
f"No group by function for {group_by}, valid options are: {group_by_map.keys()}",
) from None
def _value_to_range(
value: str | int | float | list[str | int | float],
) -> tuple[float, float]:
if isinstance(value, str | float | int):
value = float(value)
return value, value
else:
return float(value[0]), float(value[-1])
def _values_to_search(**kwargs) -> dict:
search = {}
for key, value in kwargs.items():
if key.lower() in ("time", "t"):
search["time"] = _time_to_search_dims(value)
elif key not in ["latitude", "lat", "y"] + ["longitude", "lon", "x"]:
# If it's not a string, but is a sequence of length 2, then it's a Range
if (
not isinstance(value, str)
and isinstance(value, collections.abc.Sequence)
and len(value) == 2
):
search[key] = Range(*value)
# All other cases are default
else:
search[key] = value
return search
def _time_to_search_dims(time_range):
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
tr_start, tr_end = time_range, time_range
if hasattr(time_range, "__iter__") and not isinstance(time_range, str):
tmp = list(time_range)
if len(tmp) > 2:
raise ValueError("Please supply start and end date only for time query")
tr_start, tr_end = tmp[0], tmp[-1]
if isinstance(tr_start, int | float) or isinstance(tr_end, int | float):
raise TypeError("Time dimension must be provided as a datetime or a string")
if tr_start is None:
start = datetime.datetime.fromtimestamp(0)
elif not isinstance(tr_start, datetime.datetime):
# convert to datetime.datetime
if hasattr(tr_start, "isoformat"):
tr_start = tr_start.isoformat()
start = pandas_to_datetime(tr_start).to_pydatetime()
else:
start = tr_start
if tr_end is None:
tr_end = datetime.datetime.now().strftime("%Y-%m-%d")
# Attempt conversion to isoformat
# allows pandas.Period to handle datetime objects
if hasattr(tr_end, "isoformat"):
tr_end = tr_end.isoformat()
# get end of period to ensure range is inclusive
end = pandas.Period(tr_end).end_time.to_pydatetime()
tr = Range(tz_aware(start), tz_aware(end))
if start == end:
return tr[0]
return tr
def _convert_to_solar_time(utc, longitude: float) -> datetime.datetime:
seconds_per_degree = 240
offset_seconds = int(longitude * seconds_per_degree)
offset = datetime.timedelta(seconds=offset_seconds)
return utc + offset
def _ds_mid_longitude(dataset: Dataset) -> float | None:
m = dataset.metadata
if hasattr(m, "lon"):
lon = m.lon
return (lon.begin + lon.end) * 0.5
return None
[docs]
def solar_day(dataset: Dataset, longitude: float | None = None) -> np.datetime64:
"""
Adjust Dataset timestamp for "local time" given location and convert to numpy.
:param dataset: Dataset object from which to read time and location
:param longitude: If supplied correct timestamp for this longitude,
rather than mid-point of the Dataset's footprint
"""
utc = dataset.center_time.astimezone(datetime.timezone.utc)
if longitude is None:
_lon = _ds_mid_longitude(dataset)
if _lon is None:
raise ValueError(
"Cannot compute solar_day: dataset is missing spatial info"
)
longitude = _lon
solar_time = _convert_to_solar_time(utc, longitude)
return np.datetime64(solar_time.date(), "D")
def solar_offset(geom: Geometry | Dataset, precision: str = "h") -> datetime.timedelta:
"""
Given a geometry or a Dataset compute offset to add to UTC timestamp to get solar day right.
This only work when geometry is "local enough".
:param geom: Geometry with defined CRS
:param precision: one of ``'h'`` or ``'s'``, defaults to hour precision
"""
if isinstance(geom, Geometry):
lon = mid_longitude(geom)
else:
_lon = _ds_mid_longitude(geom)
if _lon is None:
raise ValueError(
"Cannot compute solar offset, dataset is missing spatial info"
)
lon = _lon
if precision == "h":
return datetime.timedelta(hours=round(lon * 24 / 360))
# 240 == (24*60*60)/360 (seconds of a day per degree of longitude)
return datetime.timedelta(seconds=int(lon * 240))
def _normalise_geobox(
gbox: Union[GeoBox, "LegacyGeoBox", xarray.Dataset, xarray.DataArray],
) -> GeoBox:
"""Retain support for legacy geoboxes by converting them to odc.geo GeoBoxes."""
if isinstance(gbox, GeoBox):
# Is already a GeoBox
return gbox
if isinstance(gbox, xarray.Dataset | xarray.DataArray):
# Is an Xarray object
return gbox.odc.geobox
# Is a legacy GeoBox: convert to odc.geo.geobox.GeoBox.
warnings.warn(
"The use of datacube.utils.geometry.GeoBox objects is deprecated, "
"and support will be removed in a future release.\n"
"Now converting to an odc.geo GeoBox.",
ODC2DeprecationWarning,
)
crs = None if gbox.crs is None else gbox.crs._str
return GeoBox(shape=gbox.shape, affine=gbox.affine, crs=crs)