Source code for gsolve.tide.earth_tide

# GSolve - gravity processing software.
# Copyright (c) 2026 Earth Sciences New Zealand.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
# SPDX-License-Identifier: GPLv3

# Copyright (c) 2025 Earth Sciences New Zealand.

import dataclasses
import warnings
from typing import Literal, Protocol, Self, overload, runtime_checkable

import numpy as _np
import pandas as _pd
import pygtide
from matplotlib.pyplot import axis
from numpy import arccos, arcsin, arctan, cos, deg2rad, pi, sin, sqrt
from numpy.typing import ArrayLike, NDArray
from pandas.api.interchange import from_dataframe
from pandas.api.typing import NaTType
from PIL.Image import new

from gsolve.core._typing import (
    DatetimeArray,
    DatetimeScalar,
    FilePath,
    FloatArray,
    SiteIDArray,
    StringArray,
    TimedeltaScalar,
)
from gsolve.core.utils import (
    GSolveDataWarning,
    dms2rad,
    to_1d_ndarray,
    to_naive_utc_datetime,
)

__all__ = [
    "EarthTideCorrectionProvider",
    "LongmanTidalCorrection",
    "EternaPredictTidalCorrection",
    "LongmanConstants",
    "gravimetric_factor",
]


[docs] @runtime_checkable class EarthTideCorrectionProvider(Protocol): """Protocol defining interface for classes that provide earth tide corrections."""
[docs] def tidal_correction(
self, lat: FloatArray, lon: FloatArray, elev: FloatArray, date_time: DatetimeArray, site_id: SiteIDArray | None = None, **kwargs, ) -> NDArray[_np.float64]: ...
[docs] def identifier(self, **kwargs) -> str: ...
[docs] def gravimetric_factor( k2: float = 0.2980, h2: float = 0.6032 ) -> _np.float64 | NDArray[_np.float64]: """Compute gravimetric factor from Love numbers ``k2`` and ``h2``. The gravimetric factor accounts for the non-rigidity of the Earth. Default values for standard modern earth model (PREM) are from [1]_Agnew (2007) . Parameters ---------- k2, h2 : float or array-like, optional Love numbers for earth response to semi-diurnal tides. Returns ------- delta2: ndarray or float The gravimetric factor, float if both `k2` & `h2` are floats. References ---------- .. [1] Agnew, D. C. (2007). 3.06 Earth Tides. In Treatise on Geophysics (pp. 163-195). Elsevier. https://doi.org/10.1016/B978-044452748-6.00056-0 """ _h2 = to_1d_ndarray(h2).astype(float) _k2 = to_1d_ndarray(k2, expected_size=_h2.size).astype(float) gfactor = 1 + _h2 - 1.5 * _k2 return gfactor[0] if gfactor.size == 1 else gfactor
[docs] @dataclasses.dataclass class LongmanConstants: """Constants used in Longman gravitational potential calculations. Parameters ---------- a : float, default 6.3781366e08 Earth's equatorial radius in cm (after UNSO 2011). c : float, default 3.84399e10 Mean distance between the centers earth-moon (cm). c1 : float, default 1.495983e13 Mean distance between centers earth-sun (cm). e : float, default 0.054900489 Eccentricity of the moon's orbit. i : float, default 0.08979719 Inclination of moon's orbit to the ecliptic, 5.145 degrees. m : float, default 0.074804 Ratio of mean motion of the sun to that of the moon. mu : float, default 6.67428e-08 Newton's gravitational constant, 6.670e-8 in orig. M : float, default 7.3477e25 Mass of the moon in grams. omega : float, default 0.409314616 Inclination of Earth's equator to ecliptic 23.452 degrees. S : float, default 1.98840987e33 Mass of the sun in grams https://aa.usno.navy.mil/downloads/publications/Constants_2021.pdf. """ a: float = 6.3781366e08 # Earth's equatorial radius in cm (after UNSO 2011) c: float = 3.84399e10 # Mean distance between the centers earth-moon (cm) c1: float = 1.495983e13 # Mean distance between centers earth-sun (cm) e: float = 0.054900489 # Eccentricity of the moon's orbit i: float = round( deg2rad(5.145), 9 ) # = 0.08979719 Inclination of moon's orbit to the ecliptic m: float = 0.074804 # Ratio of mean motion of the sun to that of the moon mu: float = 6.67428e-08 # Newton's gravitational constant, 6.670e-8 in orig. M: float = 7.3477e25 # Mass of the moon in grams omega: float = round( deg2rad(23.452), 9 ) # = 0.409315 Incl. of Earth's equator to ecliptic S: float = 1.98840987e33 # Mass of the sun in grams
# https://aa.usno.navy.mil/downloads/publications/Constants_2021.pdf
[docs] class LongmanTidalCorrection(EarthTideCorrectionProvider): """Class to compute lunar and solar gravitational effects using Longman's method. Parameters ---------- amp_factor : float, default 1.2 The default amplification factor. kwargs : optional Additional keyword arguments are used when instantiating a ``LongmanConstants`` object, allowing the user to override the default constants. Attributes ---------- amp_factor : float The amplification factor applied when calculating gravity corrections using :meth:`tidal_correction`. constants : LongmanConstants The physical constants used in the Longman method. See Also -------- gsolve.tide.earth_tide.LongmanConstants : Physical constants used in Longman's method. gsolve.tide.earth_tide.EarthTideCorrectionProvider : Protocol defining interface for classes implementing earth tide correction. """ def __init__(self, amp_factor: float = 1.2, **kwargs) -> None: self.amp_factor = float(amp_factor) self.constants = LongmanConstants(**kwargs) def __repr__(self) -> str: return f"{self.__class__.__name__}(amp_factor={self.amp_factor})"
[docs] def identifier(self, **kwargs) -> str: if not kwargs: return self.__repr__() afactor = kwargs.pop("amp_factor", self.amp_factor) _kwargs = dict(amp_factor=afactor, **kwargs) suffix = ",".join(f"{k}={v}" for k, v in _kwargs.items()) return f"{self.__class__.__name__}({suffix})"
[docs] def gravity_accelerations( self, lat: FloatArray, lon: FloatArray, elev: FloatArray, dt: DatetimeScalar | DatetimeArray, ) -> tuple[NDArray[_np.float64], NDArray[_np.float64]]: """Compute lunar and solar gravitational accelerations. Parameters ---------- lat : scalar or array_like of shape(M,) Latitude in decimal degrees. lon : scalar or array_like of shape(M,) Longitude in decimal degrees. elev : scalar or array_like of shape(M,) Elevation in meters (datum independent) dt : str, datetime, or array_like of shape(M,) The date-times to be at which to calculate corrections. Can be any format parsable by Pandas.to_datetime() method. amp_factor : float, or array_like of shape(M,), optional Factor to apply to tidal corrections to account for earth's deformation response to tidal forces. Tyypically in range 1.14-1.2 for semi-diurnal tides (default 1.2). To calculate this factor using h2 and k2 parameters the gravimetric_factor function can be used. Returns ------- g_lunar, g_solar : The tidal acceleration in milligals due to the moon and sun. Scalar if all input args are scalar. """ try: _lon = to_1d_ndarray(lon).astype(float) _lat = to_1d_ndarray(lat, expected_size=_lon.size).astype(float) _elev = to_1d_ndarray(elev, expected_size=_lon.size).astype(float) except ValueError: raise ValueError( "Invalid lat, lon, or elev: must be equal sized 1d arrays of floats" ) T = _decimal_julian_century(dt) t0 = _decimal_hour_of_day(dt) # original Longman notation for lat, lon and elevation lmbda = deg2rad(_lat) L = -1 * _lon H = _elev * 100.0 # in cm T = _decimal_julian_century(dt) T_2 = T**2 T_3 = T**3 t0 = _decimal_hour_of_day(dt) a = self.constants.a mu = self.constants.mu M = self.constants.M S = self.constants.S e = self.constants.e m = self.constants.m c = self.constants.c c1 = self.constants.c1 i = self.constants.i omega = self.constants.omega rev = 2 * pi # Lunar Calculations # Longman eq 10' (s) Mean longitude of moon in its orbit reckoned # from the referred equinox. This is Bartels(1957) formula. s = ( dms2rad(270, 26, 11.72) + (1336 * rev + dms2rad(0, 0, 1108406.05)) * T + dms2rad(0, 0, 7.128) * T_2 + dms2rad(0, 0, 0.0072) * T_3 ) # Longman eq 11' (p) Mean longitude of lunar perigee. This is Bartels(1957) formula. p = ( dms2rad(334, 19, 46.42) + (11 * rev + dms2rad(0, 0, 392522.51)) * T + dms2rad(0, 0, 37.15) * T_2 + dms2rad(0, 0, 0.036) * T_3 ) # Longman eq 12' (h) Mean longitude of the sun. This is Bartels(1957) formula. h = ( dms2rad(279, 41, 48.05) + dms2rad(0, 0, 129602768.11) * T + dms2rad(0, 0, 1.080) * T_2 ) # Longman eq 19 (N) Longitude of the moon's ascending node in its orbit. From Schureman(1941) N = ( dms2rad(259.0, 10.0, 57.12) - (5 * rev + dms2rad(0, 0, 482912.63)) * T + dms2rad(0, 0, 7.480) * T_2 + dms2rad(0, 0, 0.007) * T_3 ) # eq. 20 (I) Inclination of the moon's orbit to the equator I = arccos(cos(omega) * cos(i) - sin(omega) * sin(i) * cos(N)) # eq. 21 (nu) Longitude in the celestial equator of its intersection A with the moon's orbit nu = arcsin(sin(i) * sin(N) / sin(I)) # eq. 24 (t) Hour angle of mean sun measured west-ward from # the place of observations t = deg2rad(15.0 * (t0 - 12) - L) # eq. 23 (chi) right ascension of meridian of place of observations reckoned from A chi = t + h - nu # eq. 15, 16, 18 cos_alpha = cos(N) * cos(nu) + sin(N) * sin(nu) * cos(omega) sin_alpha = sin(omega) * sin(N) / sin(I) alpha = 2 * arctan(sin_alpha / (1 + cos_alpha)) # eq. 14 (xi) Longitude in the moon's orbit of its ascending # intersection with the celestial equator xi = N - alpha # eq. 13 (sigma) Mean longitude of moon in radians in its orbit # reckoned from A sigma = s - xi # eq. 9 (l) Longitude of moon in its orbit reckoned from its ascending # intersection with the equator l = ( # noqa: E741 sigma + 2 * e * sin(s - p) + 5.0 / 4 * e**2 * sin(2 * (s - p)) + 15.0 / 4 * m * e * sin(s - 2 * h + p) + 11.0 / 8 * m**2 * sin(2 * (s - h)) ) # Solar Calculations # eq. 26 (p1) Mean longitude of solar perigee p1 = ( dms2rad(281, 13, 15.00) + dms2rad(0, 0, 6189.03) * T + dms2rad(0, 0, 1.63) * T**2 + dms2rad(0, 0, 0.012) * T**3 ) # eq. 27 (e1) Eccentricity of the Earth's orbit e1 = 0.01675104 - 0.00004180 * T - 0.000000126 * T**2 # eq. 28 (chi1) right ascension of meridian of place of observations # reckoned from the vernal equinox chi1 = t + h # eq. 25 (l1) Longitude of sun in the ecliptic reckoned from the # vernal equinox l1 = h + 2 * e1 * sin(h - p1) # eq. 7 cosine(theta) Theta represents the zenith angle of the moon cos_theta = sin(lmbda) * sin(I) * sin(l) + cos(lmbda) * ( cos(I / 2) ** 2 * cos(l - chi) + sin(I / 2) ** 2 * cos(l + chi) ) # eq. 8 cosine(phi) Phi represents the zenith angle of the run cos_phi = sin(lmbda) * sin(omega) * sin(l1) + cos(lmbda) * ( cos(omega / 2) ** 2 * cos(l1 - chi1) + sin(omega / 2) ** 2 * cos(l1 + chi1) ) # Distance Calculations # eq. 34 (C) Distance parameter C = sqrt(1.0 / (1 + 0.006738 * sin(lmbda) ** 2)) # eq. 33 (r) Distance from point P to the center of the Earth r = C * a + H # eq. 31 & 32 (a') Distance parameter, equation 31 a_prime = 1.0 / (c * (1 - e**2)) a1_prime = 1.0 / (c1 * (1 - e1**2)) # eq. 29 & 30 (d) Distance between centers of the Earth and the moon d = ( 1 / c + a_prime * e * cos(s - p) + a_prime * e**2 * cos(2 * (s - p)) + (15 / 8) * a_prime * m * e * cos(s - 2 * h + p) + a_prime * m**2 * cos(2 * (s - h)) ) ** -1 # (D) Distance between centers of the Earth and the sun D = (1 / c1 + a1_prime * e1 * cos(h - p1)) ** -1 # eq. 1 (g_lunar) Vertical componet of tidal acceleration due to the moon g_lunar = ( mu * M * r * d**-3 * (3 * cos_theta**2 - 1) + (3 / 2) * mu * M * r**2 / d**4 * (5 * cos_theta**3 - 3 * cos_theta) ) * 1e03 # eq. 3 (g_solar) Vertical componet of tidal acceleration due to the sun g_solar = mu * S * r * D**-3 * (3 * cos_phi**2 - 1) * 1e03 return g_lunar, g_solar
[docs] def tidal_correction( self, lat: FloatArray, lon: FloatArray, elev: FloatArray, date_time: DatetimeArray, site_id: SiteIDArray | None = None, **kwargs, ) -> NDArray[_np.float64]: """Compute tidal corrections at specified locations and times. Parameters ---------- site_id : array_like REMOVE lat : float or array_like Latitude in decimal degrees. lon : float or array_like Longitude in decimal degrees. elev : float or array_like Elevation in meters date_time : array_like The datetimes at which to calculate corrections. Returns ------- corrections : NDArray The tidal corrections in milligals. """ g = self.gravity_accelerations(lat, lon, elev, date_time) return (g[0] + g[1]) * self.amp_factor
[docs] def time_series( self, starttime: DatetimeScalar, endtime: DatetimeScalar, step: TimedeltaScalar, lat: float, lon: float, elev: float = 0.0, method: Literal["correction", "acceleration"] = "correction", ) -> _pd.Series: """Compute a time series of tidal corrections at some location. Parameters ---------- starttime : str, datetime The start time of the time series. endtime : str, datetime The end time of the time series. step : int, float, str, Timedelta The sampling interval of the time series. Can be a timedelta-like object, a timedelta string (e.g. "1S", "1H", "1D"), or a number of seconds. lat : float Latitude in decimal degrees. lon : float Longitude in decimal degrees. elev : float Elevation in meters (datum independent) method: {'correction', 'acceleration'}, default 'correction' Whether to return gravity corrections or accelerations. Returns ------- time_series : Series The time series of tidal corrections or accelerations. """ valid_methods = ("correction", "acceleration") if method not in valid_methods: raise ValueError( f"method parameter must be one of {valid_methods}, not '{method}'." ) try: step = _pd.Timedelta(step) if not isinstance(step, _pd.Timedelta): raise ValueError("step must be a valid timedelta or timedelta string.") except ValueError as e: raise ValueError(f"error parsing step: {e}") from e try: t0 = to_naive_utc_datetime(starttime, allow_nat=False) t1 = to_naive_utc_datetime(endtime, allow_nat=False) if not isinstance(t0, _pd.Timestamp) or not isinstance(t1, _pd.Timestamp): raise ValueError("not convertible to Timestamp.") if t0 >= t1: raise ValueError("starttime is after or equal to endtime.") except ValueError as e: raise ValueError(f"error parsing starttime and endtime: {e}") from None t_idx = _pd.date_range(t0, t1, freq=step) _lat = _np.full(len(t_idx), lat) _lon = _np.full(len(t_idx), lon) _elev = _np.full(len(t_idx), elev) if method == "correction": tseries = _pd.Series( data=self.tidal_correction( site_id=None, lat=_lat, lon=_lon, elev=_elev, date_time=t_idx ), index=t_idx, name=method, ) elif method == "acceleration": a, b = self.gravity_accelerations(_lat, _lon, _elev, t_idx) tseries = _pd.Series( data=a + b, index=t_idx, name=method, ) else: raise ValueError(f"Invalid method: {method}") return tseries
@overload def _decimal_julian_century(dt: DatetimeScalar, **kwargs) -> _np.float64: ... @overload def _decimal_julian_century(dt: DatetimeArray, **kwargs) -> NDArray[_np.float64]: ... def _decimal_julian_century( dt: DatetimeScalar | DatetimeArray, **kwargs ) -> NDArray[_np.float64] | _np.float64: """ Convert datetime to Julian Centuries since epoch B1900.0. A Julian Century is 100 * 365.25 * 86400 SI seconds. This function returns decimal Julian Centuries since 1899-12-31T12:00:00. Parameters ---------- dt : str, datetime, array-like The date-times to be converted. Can be any format parsable by ``pandas.to_datetime`` method. Datetimes without a timezone are assumed to be in UTC. Returns ------- julian_century : ndarray or scalar This is scalar if `dt` is a single value. See Also -------- to_naive_utc_datetime : Convert a datetime to a naive UTC datetime. pandas.to_datetime : Convert argument to datetime. """ _dt = to_naive_utc_datetime(dt, allow_nat=False, **kwargs) if _dt is None or isinstance(_dt, NaTType): raise ValueError("dt cannot be NaT or None.") if isinstance(_dt, _pd.Timestamp): _dt = _pd.DatetimeIndex([_dt]) elif isinstance(_dt, (_pd.DatetimeIndex, _pd.Series)): _dt = _pd.DatetimeIndex(_dt) else: raise ValueError( "dt could not be resolved to a datetime, DatetimeIndex, Series, or array-like of datetimes." ) julian_century_origin = _pd.Timestamp("1899-12-31T12:00:00", tz=None) td_seconds = (_dt - julian_century_origin).total_seconds() julian_century_seconds: int = 86400 * 36525 jcentury = _np.atleast_1d(td_seconds / julian_century_seconds) return jcentury[0] if jcentury.size == 1 else jcentury @overload def _decimal_hour_of_day(date_time: DatetimeScalar) -> _np.float64: ... @overload def _decimal_hour_of_day(date_time: DatetimeArray) -> NDArray[_np.float64]: ... def _decimal_hour_of_day( date_time: DatetimeScalar | DatetimeArray, ) -> _np.float64 | NDArray[_np.float64]: """Compute time of day in decimal hours. Parameters ---------- date_time : str, datetime, array-like The date-times to be processed. Can be any format parsable by pandas.to_datetime() method. Returns ------- hour : ndarray or scalar The decimal hour. This is a scalar if `arg` is a single value. """ _dt = to_naive_utc_datetime(date_time, allow_nat=False) if isinstance(_dt, NaTType) or _dt is None: raise ValueError("date_time cannot be NaT or None.") if not isinstance(_dt, (_pd.Timestamp, _pd.Series, _pd.DatetimeIndex)): raise ValueError( "date_time could not be resolved to a datetime, DatetimeIndex, Series, or array-like of datetimes." ) if isinstance(_dt, _pd.Timestamp): _dt = _pd.DatetimeIndex([_dt]) elif isinstance(_dt, _pd.Series): _dt = _pd.DatetimeIndex(_dt) _tdelta = ((_dt - _dt.normalize()).total_seconds() / 3600.0).to_numpy(float) return _tdelta[0] if _tdelta.size == 1 else _tdelta class EternaTidalParameters: """Class to store tidal parameters for use with ETERNA/pygtide. Tidal parameters (``TIDALPARAM`` in ETERNA, ``wave_group`` in pygtide) define the ampltude factors and phase leads to be applied to the tidal potentials provided by a given tidal catalogue. A single tidal parameter is comprises 4 values - ``freq_start``: the start frequency of the tidal constituent in cycles per day (cpd). - ``freq_stop``: the stop frequency of the tidal constituent in cpd. - ``amplitude_factor``: multiply the amplitude of the tidal constituent. - ``phase_lead``: the phase lead in degrees An example tidal parameter (from ETERNA documentation) covering M2, the semi-diurnal lunar tide constituent is ``[1.914129, 1.950419, 1.18705, 2.0327]``. Multiple tidal parameters can be combined to cover the full spectrum of tidal constituents. Attributes ---------- data : pd.DataFrame A DataFrame containing the tidal parameters under the columns 'freq_start', 'freq_stop', 'amplitude', and 'phase_lead'. Other columns such as 'wave_group' label may be included but are not used. Parameters ---------- freq_start : array_like The start frequency of the tidal constituent in cycles per day (cpd). freq_stop : array_like The stop frequency of the tidal constituent in cycles per day (cpd). amplitude : array_like The amplitude factor to apply to the tidal constituent. phase_lead : array_like The phase lead in degrees to apply to the tidal constituent. wave_group : array_like, optional Labels for the tidal constituent, e.g. "M2". **kwargs: array_likes, optional Other columns to include in the ``data`` DataFrame. Must be array-like of the same size as the other parameters. Returns ------- EternaTidalParameters An instance of the class containing the tidal parameters. See Also -------- EternaPredictTidalCorrection : Class to compute tidal corrections using ETERNA/pygtide. pygtide : Python package for tidal predictions using ETERNA catalogues. """ def __init__( self, freq_start: FloatArray, freq_stop: FloatArray, amplitude: FloatArray, phase_lead: FloatArray, wave_group: StringArray | None = None, **kwargs, ) -> None: self.data: _pd.DataFrame try: freq_start = to_1d_ndarray(freq_start).astype(float) s = freq_start.size freq_stop = to_1d_ndarray(freq_stop, expected_size=s).astype(float) amplitude = to_1d_ndarray(amplitude, expected_size=s).astype(float) phase_lead = to_1d_ndarray(phase_lead, expected_size=s).astype(float) if wave_group is not None: wave_group = to_1d_ndarray(wave_group, expected_size=s).astype(str) except ValueError as e: raise ValueError(f"Error parsing tidal parameters: {e}") self.data = _pd.DataFrame.from_dict( data={ "freq_start": freq_start, "freq_stop": freq_stop, "amplitude": amplitude, "phase_lead": phase_lead, }, orient="columns", ) if wave_group is not None: self.data["wave_group"] = wave_group for k, v in kwargs.items(): self.data[k] = v self.sort_and_validate() def __repr__(self) -> str: return f"{self.__class__.__name__}(nparam={self.data.shape[0]})" def __copy__(self) -> Self: return self.__class__.from_dataframe(self.data.copy()) def copy(self) -> Self: return self.__copy__() def sort_and_validate(self, gap_threshold: float | None = None) -> None: """Sort tidal parameters by ``start_freq`` and validate data. This method ensures that the following conditions are met: - All data are floats with no NaN. - Frequency intervals are positive and increasing i.e. ``freq_start`` < ``freq_stop``. - Frequency intervals are discrete and do not overlap. - Warn if there are gaps between successivefrequency intervals. Raises ------ ValueError If any of the above validations fail. """ emsg = GSolveDataWarning(prefix=f"{type(self).__name__} error", show=True) errors = [] data_subset = self.data.loc[ :, ["freq_start", "freq_stop", "amplitude", "phase_lead"] ] nan_rows = data_subset.index[data_subset.isna().any(axis=1)].to_list() if nan_rows: emsg(f"NaN values found in rows {nan_rows}") bad_freq_rows = self.data.index[ (self.data["freq_stop"] <= self.data["freq_start"]) ].to_list() if bad_freq_rows: emsg(f"'freq_start' >= 'freq_stop' in rows {bad_freq_rows}.") self.data = self.data.sort_values(by=["freq_start"], ignore_index=True) overlaps = _np.full_like(self.data.index, False, dtype=bool) overlaps[:-1] = ( self.data.iloc[:-1, 1].to_numpy() >= self.data.iloc[1:, 0].to_numpy() ) if overlaps.any(): overlap_rows = self.data.index[overlaps].to_list() emsg( "Frequency intervals must be discrete. Overlapping " f"frequency found in rows {overlap_rows}." ) if emsg.count > 0: raise ValueError(f"Validation failed, {emsg.count} error(s) found.") if gap_threshold is not None and self.data.shape[0] > 1: gap_threshold = float(gap_threshold) gaps = ( self.data.iloc[:-1, 1] - self.data.iloc[1:, 0] > gap_threshold ).to_numpy() if gaps.any(): n_gaps = gaps.sum() warnings.warn( f"some frequency intervals separated by greater than {gap_threshold} ", UserWarning, ) @classmethod def from_array(cls, arr: ArrayLike) -> Self: """ Create an EternaTidalParameters instance from an array-like object. Parameters ---------- arr : array_like A 1D array with at least 4 elements or a 2D array with at least 4 columns. Cells/columns 0-3 correspond to 'freq_start', 'freq_stop', 'amplitude', and 'phase_lead' in that order. Returns ------- EternaTidalParameters """ if isinstance(arr, _pd.DataFrame): return cls.from_dataframe( arr, ) arr = _np.atleast_2d(arr).astype(float) if arr.ndim != 2 or arr.shape[1] != 4: raise ValueError("Input array must be 2D with at least 4 columns.") kwargs = {f"col_{i}": arr[:, i] for i in range(4, arr.shape[1])} return cls( freq_start=arr[:, 0], freq_stop=arr[:, 1], amplitude=arr[:, 2], phase_lead=arr[:, 3], **kwargs, ) @classmethod def from_dataframe(cls, df: _pd.DataFrame, include_index: bool = False) -> Self: """ Create an EternaTidalParameters object from a DataFrame. Parameters ---------- df : pd.DataFrame A DataFrame containing columns 'freq_start', 'freq_stop', 'amplitude', and 'phase_lead'. Other columns will be included in the object's ``data`` attribute but are not required. include_index : bool, default False If True, the index will be treated as a column when parsing the DataFrame. Returns ------- EternaTidalParameters """ if include_index: df = df.reset_index() expected_columns = {"freq_start", "freq_stop", "amplitude", "phase_lead"} if not expected_columns.issubset(df.columns): missing = list(expected_columns - set(df.columns)) raise ValueError(f"DataFrame is missing required columns: {missing}") return cls( freq_start=df["freq_start"], freq_stop=df["freq_stop"], amplitude=df["amplitude"], phase_lead=df["phase_lead"], **{k: df[k] for k in df.columns if k not in expected_columns}, ) @classmethod def from_excel(cls, fname: FilePath, sheet_name: str | int = 0) -> Self: """ Read an EternaTidalParameters object from an Excel worksheet. The spreadsheet must contain columns labelled 'freq_start', 'freq_stop', 'amplitude', and 'phase_lead'. Other columns will be loaded but are not used. Parameters ---------- fname : FilePath The Excel/spreadsheet file to be read. Can be any object type supported by or spreadsheet format (e.g. odf) supported by ``pandas.read_excel()``, sheet_name : str | int, default is 0 The name or index of the worksheet to read. Default is 0, the first sheet in `fname`. Returns ------- EternaTidalParameters See Also -------- EternaTidalParameters.from_dataframe : Create an EternaTidalParameters object from a DataFrame. pandas.read_excel : Read an Excel file into a DataFrame. """ df = _pd.read_excel(fname, sheet_name=sheet_name) if isinstance(df, dict): raise ValueError( "Excel file contains multiple sheets. Please specify sheet_name." ) return cls.from_dataframe(df) @classmethod def quick_tide_pro_defaults(cls) -> Self: """Return an EternaTidalParameters instance with the default parameters used by QuickTide Pro. Quick tide pro uses the Tamura (1987) tidal potential catalogue. Using thee parameters with higer resolution tide catalogues may produce unexpected results. """ df = _pd.DataFrame.from_dict( data={ "DC": [0.000000, 0.000001, 1.000000, 0.0000], "Long": [0.000002, 0.249951, 1.160000, 0.0000], "Q1": [0.721500, 0.906315, 1.154250, 0.0000], "O1": [0.921941, 0.974188, 1.154240, 0.0000], "P1": [0.989049, 0.998028, 1.149150, 0.0000], "K1": [0.999853, 1.216397, 1.134890, 0.0000], "N2": [1.719381, 1.906462, 1.161720, 0.0000], "M2": [1.923766, 1.976926, 1.161720, 0.0000], "S2": [1.991787, 2.002885, 1.161720, 0.0000], "K2": [2.003032, 2.182843, 1.161720, 0.0000], "M3": [2.753244, 3.081254, 1.07338, 0.0000], "other": [3.791964, 3.937897, 1.03900, 0.0000], }, orient="index", columns=["freq_start", "freq_stop", "amplitude", "phase_lead"], ).rename_axis("wave_group") return cls.from_dataframe(df, include_index=True) @classmethod def default( cls, freq_start: float = 0.0, freq_stop: float = 10.0, amplitude_factor: float = 1.16, phase_lead: float = 0.0, ) -> Self: """Return a tidal parameters object with default values used by GSolve.""" return cls.from_array([freq_start, freq_stop, amplitude_factor, phase_lead]) def pygtide_wavegroup_arg(self) -> NDArray[_np.float64]: """Return a copy of the paramaters as an ndarray (shape= (N, 4)) for use as the argument to ``pygtide.set_wavegroup()`` method. """ return ( self.data.loc[:, ["freq_start", "freq_stop", "amplitude", "phase_lead"]] .to_numpy() .copy() )
[docs] class EternaPredictTidalCorrection(EarthTideCorrectionProvider): """Compute earth tide gravity corrections using PREDICT from ETERNA34. The Attributes ---------- tidal_params : EternaTidalParameters Parameters ---------- tidal_params : array-like | EternaTidalParameters, optional The tidal parameters applied to discrete components of the tidal catalogue. This is a 2D array with 4 columns, where each row corresponds to a tidal component and the columns are: tidalpoten : int, default 8 The tide potential catalogue to use. ETERNA/pygtide provides 8 potential catalogues of increasing resolution and therefore computational cost. The most commonly used cataloges: - ``4`` : Tamura (1987), 1200 waves. - ``7`` : Hartmann and Wenzel (1995), 12935 waves. - ``8`` (Default) : Kudryavtsev (2004), 28806 waves (highest resolution). See ETERNA/pygtide for the full list of available catalogues. tidalcompo : int, default 0 The tidal component to calculate. The default 0 is earth tide gravity, but other components such as displacement, tilt or strain can also be calculated. See pygtide or ETERNA documentation for details. amtruncate : float, default 1e-10 Amplitude threshold for components to be included in the tidal calculation. Waves with amplitudes below this threshold are excluded. Higher values will reduce computation time, but also the accuracy of results. poletidecor : float, default 1.16 Amplitude factor for of pole tide gravity component. Pole tides are caused to variations in the Earth's rotation axis (Chandler Wobble) and are not included in the standard tidal potential catalogues. Pole tide solutions are dependent on observational data provided by IERS, so the user should that they periodically run ``pgtide.update()`` to ensure these data are up to date. lodtidecor : float, default 1.16 Amplitude factor for of Length Of Day tide gravity component, due to variations in the Earth's rotation rate and are not included in the standard tidal potential catalogues.. LOD corrections are depenedent on observational data provided by IERS, so the user should that they periodically run ``pgtide.update()`` to ensure these data are up to date. See Also -------- EternaTidalParameters : Class to store tidal parameters for use with ETERNA/pygtide. pygtide : Python package for tidal predictions using ETERNA catalogues. """ def __init__( self, tidal_params: ArrayLike | EternaTidalParameters | None = None, tidalpoten: int = 8, tidalcompo: int = 0, amtruncate: float = 1e-10, poletidecor: float = 1.16, lodtidecor: float = 1.16, **kwargs, ) -> None: self._pgt: pygtide.pygtide = pygtide.pygtide(msg=False) self._pgt.msg = False self._pgt_kwargs: dict[str, float | int] = { "tidalpoten": tidalpoten, "tidalcompo": tidalcompo, "amtruncate": amtruncate, "poletidecor": poletidecor, "lodtidecor": lodtidecor, } self.tidal_params: EternaTidalParameters if tidal_params is None: self.tidal_params = EternaTidalParameters.default() self.set_tidal_params(self.tidal_params.pygtide_wavegroup_arg()) elif isinstance(tidal_params, EternaTidalParameters): self.set_tidal_params(tidal_params) else: self.set_tidal_params(tidal_params) def __repr__(self) -> str: suffix = ",".join(f"{k}={v}" for k, v in self._pgt_kwargs.items()) return f"{self.__class__.__name__}({suffix})"
[docs] def set_tidal_params(self, tidal_params: ArrayLike | EternaTidalParameters) -> None: """Set the tidal parameters to be applied.""" if not isinstance(tidal_params, EternaTidalParameters): self.tidal_params = EternaTidalParameters.from_array(tidal_params) else: self.tidal_params = tidal_params.copy() self._pgt.set_wavegroup(self.tidal_params.pygtide_wavegroup_arg())
[docs] def identifier(self, **kwargs) -> str: if not kwargs: return self.__repr__() self._pgt_kwargs.update(kwargs) suffix = ",".join(f"{k}={v}" for k, v in kwargs.items()) return f"{self.__repr__()}({suffix})"
[docs] def time_series( self, lat: float, lon: float, elev: float, starttime: DatetimeScalar, duration: TimedeltaScalar, sample_interval: int = 60, unit: Literal["mgal", "ugal", "nm/s^2"] = "mgal", **kwargs, ) -> _pd.DataFrame: """Compute a time series of tidal corrections at some location. A limitation of ETERNA is that tides are always calculated from the start of a UTC day. Parameters ---------- lat : float Latitude of the location in degrees. lon : float Longitude of the location in degrees. elev : float Elevation of the location in meters. starttime : DatetimeScalar Start duration : TimedeltaScalar Duration of the time series. sample_interval : int, optional Sampling interval in seconds (default is 60). unit : {"mgal", "ugal", "nm/s^2"}, optional Unit of the output (default is "mgal"). **kwargs : dict Additional keyword arguments to pass to the underlying pygtide predictor. Returns ------- DataFrame DataFrame containing the tidal corrections. """ this_run_kwargs = self._pgt_kwargs.copy() this_run_kwargs.update(kwargs) _starttime = ( to_naive_utc_datetime(starttime, allow_nat=False) .normalize() .to_pydatetime() ) if isinstance(duration, (int, float)): _duration = int(duration) else: _duration = int( _np.ceil(_pd.to_timedelta(duration).total_seconds() / 3600.0) ) if _duration <= 0: raise ValueError( "duration must be a positive timedelta or number of hours." ) sample_interval = int(sample_interval) if sample_interval <= 0: raise ValueError( "sample_interval must be a positive integer number of seconds." ) with warnings.catch_warnings(): self._pgt.msg = False warnings.filterwarnings( action="ignore", message="Please consider updating the leap second database", category=UserWarning, ) self._pgt.predict( latitude=lat, longitude=lon, height=elev, startdate=_starttime, duration=_duration, samprate=sample_interval, **this_run_kwargs, ) df = self._pgt.results() if not isinstance(df, _pd.DataFrame): raise ValueError("No results returned from pygtide prediction.") normalised_cols = ["datetime", "signal", "tide", "pole_tide", "lod_tide"] df = df.rename( columns={a: b for a, b in zip(df.columns, normalised_cols)} ).set_index("datetime") df = df.set_index(to_naive_utc_datetime(df.index)) if unit == "nm/s^2": return df elif unit == "ugal": return df * 1e-3 elif unit == "mgal": return df * 1e-4
[docs] def tidal_correction( self, lat: FloatArray, lon: FloatArray, elev: FloatArray, date_time: DatetimeArray, site_id: SiteIDArray | None = None, unit: Literal["mgal", "ugal", "nm/s^2"] = "mgal", sample_interval: int = 60, **kwargs, ) -> NDArray[_np.float64]: lat = to_1d_ndarray(lat).astype(float) lon = to_1d_ndarray(lon, expected_size=lat.size).astype(float) elev = to_1d_ndarray(elev, expected_size=lat.size).astype(float) date_time = _pd.DatetimeIndex( to_naive_utc_datetime(date_time, allow_nat=False) ).round(freq="1s") # datume for converting date_time to seconds since epoch for interpolation. # - set to a day before the minimum ensure that all are captured t0 = date_time.floor(freq="s").min() - _pd.Timedelta(days=1) # date_time_seconds = (date_time - t0).total_seconds().to_numpy(float) date_time_seconds = date_time.astype("int64") if site_id is None: raise ValueError( "site_id is a required parameter for " "EternaPredictTidalCorrection tidal_correction method." ) elif isinstance(site_id, str): site_id = [site_id] * lat.size site_id = to_1d_ndarray(site_id, expected_size=lat.size).astype(str) corrs = _np.zeros_like(lat, dtype=float) for site in _np.unique(site_id): site_mask = site_id == site # ensure at least 1 hour of data before and after the range # of date_time for this site t0 = (date_time[site_mask].min() - _pd.Timedelta(hours=1)).normalize() _duration_hrs = ( int( _np.ceil((date_time[site_mask].max() - t0).total_seconds() / 3600.0) ) ) + 1 ts = self.time_series( lat=lat[site_mask][0], lon=lon[site_mask][0], elev=elev[site_mask][0], starttime=t0, duration=_duration_hrs, sample_interval=sample_interval, unit=unit, ).mul(-1.0) if not isinstance(ts.index, _pd.DatetimeIndex): raise ValueError( "Unexpected time series index type from pygtide results." ) ts = ts.set_index(ts.index.round(freq="1s")) if not (corrs[site_mask] == 0.0).all(): raise ValueError("Unexpected non-zero values in corrs for site mask.") corrs[site_mask] = _np.interp( x=date_time[site_mask], xp=ts.index, fp=ts["signal"].to_numpy(), ) return corrs