Skip to content

Numerical

get_extrema_date_from_spline(spline, type='max', thresh=None, n_extrema=2)

Given a spline, this returns the dates (x variable) corresponding to the maxima or minima.

Parameters:

Name Type Description Default
spline CubicSpline

spline to find extrema of

required
type str

which extrema to find ('max' or 'min')

'max'
thresh Optional[float]

Only keep maxima (minima) with values above (below) this.

None
n_extrema int

Keep at most this many extrema, if more than this then will only keep highest (lowest).

2
Source code in isca_tools/utils/numerical.py
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
def get_extrema_date_from_spline(spline: CubicSpline, type: str = 'max', thresh: Optional[float] = None,
                                 n_extrema: int = 2) -> np.ndarray:
    """
    Given a spline, this returns the dates (x variable) corresponding to the maxima or minima.

    Args:
        spline: spline to find extrema of
        type: which extrema to find ('max' or 'min')
        thresh: Only keep maxima (minima) with values above (below) this.
        n_extrema: Keep at most this many extrema, if more than this then will only keep highest (lowest).
    """
    extrema_date = spline.derivative().roots(extrapolate=False)
    if type == 'max':
        extrema_date = extrema_date[spline(extrema_date, 2) < 0]  # maxima have a negative second derivative
    elif type == 'min':
        extrema_date = extrema_date[spline(extrema_date, 2) > 0]  # minima have a positive second derivative
    else:
        raise ValueError('type is not valid, it should be max or min')
    extrema_values = spline(extrema_date)
    if thresh is not None:
        # Only keep maxima with value above threshold
        if type == 'max':
            keep = extrema_values > thresh
        elif type == 'min':
            keep = extrema_values < thresh
        extrema_date = extrema_date[keep]
        extrema_values = extrema_values[keep]
    if len(extrema_date) > n_extrema:
        if type == 'max':
            keep_ind = np.argsort(extrema_values)[-n_extrema:]
        elif type == 'min':
            keep_ind = np.argsort(extrema_values)[:n_extrema]
        extrema_date = extrema_date[keep_ind]
    return extrema_date

get_var_extrema_date(time, var, smooth_window=1, type='max', thresh_extrema=None, max_extrema=2, smooth_method='convolve')

Finds the dates of extrema of a variable, given some smoothing is performed first. Also returns the splines themselves.

Parameters:

Name Type Description Default
time ndarray

float [n_time]
Time in days (assumes periodic e.g. annual mean, so time = np.arange(360))

required
var ndarray

float [n_time]
Value of variable at each time. Again, assume periodic

required
smooth_window int

Number of time steps to use to smooth var before finding extrema. Smaller equals more accurate fit. 1 is perfect fit.

1
type str

which extrema to find ('max' or 'min')

'max'
thresh_extrema Optional[float]

Only keep maxima (minima) with values above (below) this.

None
max_extrema int

Keep at most this many extrema, if more than this then will only keep highest (lowest).

2
smooth_method str

convolve or spline
If convolve, will smooth via convolution with window of length smooth_window. If spline, will fit a spline using every smooth_window values of time.

'convolve'

Returns:

Type Description
ndarray

extrema_date: float [max_extrema]
Dates of extrema of var

CubicSpline

spline_var: Spline fit to var to find the extrema.

Source code in isca_tools/utils/numerical.py
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
def get_var_extrema_date(time: np.ndarray, var: np.ndarray, smooth_window: int = 1,
                         type: str = 'max', thresh_extrema: Optional[float] = None,
                         max_extrema: int = 2, smooth_method: str = 'convolve') -> Tuple[np.ndarray, CubicSpline]:
    """
    Finds the dates of extrema of a variable, given some smoothing is performed first.
    Also returns the splines themselves.

    Args:
        time: `float [n_time]`</br>
            Time in days (assumes periodic e.g. annual mean, so `time = np.arange(360)`)
        var: `float [n_time]`</br>
            Value of variable at each time. Again, assume periodic
        smooth_window: Number of time steps to use to smooth `var` before finding extrema.
            Smaller equals more accurate fit. `1` is perfect fit.
        type: which extrema to find ('max' or 'min')
        thresh_extrema: Only keep maxima (minima) with values above (below) this.
        max_extrema: Keep at most this many extrema, if more than this then will only keep highest (lowest).
        smooth_method: `convolve` or `spline`</br>
            If `convolve`, will smooth via convolution with window of length `smooth_window`.
            If `spline`, will fit a spline using every `smooth_window` values of `time`.

    Returns:
        `extrema_date`: `float [max_extrema]`</br>
            Dates of extrema of var
        `spline_var`: Spline fit to var to find the extrema.
    """
    if smooth_method.lower() == 'spline':
        # Make so last element of arrays equal first as periodic
        time_smooth = np.append(time, time[-1] + 1)[::smooth_window]
        var_smooth = np.append(var, var[0])[::smooth_window]
    elif smooth_method.lower() == 'convolve':
        var_smooth = scipy.ndimage.convolve(var, np.ones(smooth_window) / smooth_window, mode='wrap')
        time_smooth = np.append(time, time[-1] + 1)
        var_smooth = np.append(var_smooth, var_smooth[0])
    else:
        raise ValueError('smooth_method must be either spline or convolve')
    # Spline var is the spline replicating var_smooth exactly i.e. spline_var(t) = var_smooth[t] if t in time_smooth
    spline_var = CubicSpline(time_smooth, var_smooth, bc_type='periodic')
    extrema_date = get_extrema_date_from_spline(spline_var, type, thresh_extrema, max_extrema)
    return extrema_date, spline_var

get_var_shift(x, shift_time=None, shift_phase=None, time=None, time_start=None, time_end=None)

Returns the periodic variable \(x(t-t_{shift})\) where \(t\)=time, and \(t_{shift}=\)shift_time. If shift_phase is provided, will set shift_time = shift_phase * period.

Parameters:

Name Type Description Default
x ndarray

float [n_x]
\(x\) variable such that x[i] is the value of \(x\) at time time[i].

required
shift_time Optional[float]

How much to shift \(x\) by in units of time.

None
shift_phase Optional[float]

Fraction of period to shift \(x\) by.

None
time Optional[ndarray]

float [n_x]
Time such that x[i] is \(x\) at time time[i]. If time provided, will use spline to apply shift to \(x\). If time not provided, assume time is np.arange(n_x), and will use np.roll to apply shift to \(x\).

None
time_start Optional[float]

Start time such that period is given by time_end - time_start + 1. If not provided, will set to min value in time.

None
time_end Optional[float]

End time such that period is given by time_end - time_start + 1. If not provided, will set to max value in time.

None

Returns:

Name Type Description
x_shift ndarray

float [n_x]
\(x\) variable shifted in time such that x_shift[i] is value of \(x\) at time time[i] - shift_time.

Source code in isca_tools/utils/numerical.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def get_var_shift(x: np.ndarray, shift_time: Optional[float] = None, shift_phase: Optional[float] = None,
                  time: Optional[np.ndarray] = None, time_start: Optional[float] = None,
                  time_end: Optional[float] = None) -> np.ndarray:
    """
    Returns the periodic variable $x(t-t_{shift})$ where $t$=`time`, and $t_{shift}=$`shift_time`.
    If `shift_phase` is provided, will set `shift_time = shift_phase * period`.

    Args:
        x: `float [n_x]`</br>
            $x$ variable such that `x[i]` is the value of $x$ at time `time[i]`.
        shift_time: How much to shift $x$ by in units of `time`.
        shift_phase: Fraction of period to shift $x$ by.
        time: `float [n_x]`</br>
            Time such that `x[i]` is $x$ at time `time[i]`.</n>
            If `time` provided, will use spline to apply shift to $x$.</n>
            If `time` not provided, assume time is `np.arange(n_x)`, and will use `np.roll` to apply shift to $x$.
        time_start: Start time such that period is given by `time_end - time_start + 1`.
            If not provided, will set to min value in `time`.
        time_end: End time such that period is given by `time_end - time_start + 1`.
            If not provided, will set to max value in `time`.

    Returns:
        x_shift: `float [n_x]`</br>
            $x$ variable shifted in time such that `x_shift[i]` is value of $x$ at time `time[i] - shift_time`.
    """
    if time is not None:
        ind = np.argsort(time)
        if time_start is None:
            time_start = time[ind][0]
        if time_end is None:
            time_end = time[ind][-1]
        if time[ind][0] < time_start:
            raise ValueError(f'Min time={time[ind][0]} is less than time_start={time_start}')
        if time[ind][-1] > time_end:
            raise ValueError(f'Max time={time[ind][-1]} is greater than time_end={time_end}')
        x_spline_fit = CubicSpline(np.append(time[ind], time_end + time[ind][0] - time_start + 1),
                                   np.append(x[ind], x[ind][0]),
                                   bc_type='periodic')
        period = time_end - time_start + 1
        if shift_phase is not None:
            shift_time = shift_phase * period
        x_shift = x_spline_fit(time - shift_time)
    else:
        if shift_phase is not None:
            shift_time = shift_phase * x.size
        if int(np.round(shift_time)) != shift_time:
            raise ValueError(f'shift_time={shift_time} is not a whole number - '
                             f'may be better using spline by providing time.')
        x_shift = np.roll(x, int(np.round(shift_time)))
    return x_shift

hybrid_root_find(residual, guess, search_radius, n_bracket_samples=32)

Find a root of a 1-D function using a fast secant/brentq hybrid method.

Attempts

1) Solve using two secant initialisations (fastest). 2) If those fail or leave the search interval, construct a local bracket by scanning the interval at fixed resolution. 3) Solve inside the constructed bracket using brentq (guaranteed).

Parameters:

Name Type Description Default
residual Callable

Function whose root is sought. Called as residual(x).

required
guess float

Initial guess around which to search for a solution.

required
search_radius float

Half-width of the interval in which the root is assumed to lie. The search interval is [guess - search_radius, guess + search_radius].

required
n_bracket_samples int

Number of sample points used to locate a sign change when constructing a fallback bracket (1-D array of this length is evaluated).

32

Returns:

Type Description
float

The root of residual inside the specified interval.

Raises:

Type Description
ValueError

If no sign change is located in the interval despite an assumed root, or if brentq fails to converge.

Source code in isca_tools/utils/numerical.py
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
def hybrid_root_find(
        residual: Callable,
        guess: float,
        search_radius: float,
        n_bracket_samples: int = 32
) -> float:
    """Find a root of a 1-D function using a fast secant/brentq hybrid method.

    Attempts:
      1) Solve using two secant initialisations (fastest).
      2) If those fail or leave the search interval, construct a local bracket
         by scanning the interval at fixed resolution.
      3) Solve inside the constructed bracket using brentq (guaranteed).

    Args:
        residual: Function whose root is sought. Called as residual(x).
        guess: Initial guess around which to search for a solution.
        search_radius: Half-width of the interval in which the root is assumed
            to lie. The search interval is [guess - search_radius,
            guess + search_radius].
        n_bracket_samples: Number of sample points used to locate a sign
            change when constructing a fallback bracket (1-D array of this
            length is evaluated).

    Returns:
        The root of `residual` inside the specified interval.

    Raises:
        ValueError: If no sign change is located in the interval despite an
            assumed root, or if brentq fails to converge.
    """
    lower = guess - search_radius
    upper = guess + search_radius

    # ---- 1. Fast attempt: secant method --------
    try:
        sol = root_scalar(
            residual,
            x0=guess,
            x1=guess + 0.5 * search_radius,
            method="secant"
        )
        if sol.converged and (lower <= sol.root <= upper):
            return sol.root
    except Exception:
        pass

    # ---- 2. Second secant attempt --------
    try:
        sol = root_scalar(
            residual,
            x0=guess - 0.5 * search_radius,
            x1=guess + search_radius,
            method="secant"
        )
        if sol.converged and (lower <= sol.root <= upper):
            return sol.root
    except Exception:
        pass

    # ---- 3. Construct a local bracket by scanning --------
    grid = np.linspace(lower, upper, n_bracket_samples)
    fvals = np.array([residual(x) for x in grid])

    # Find intervals with sign change
    sign_idx = np.where(np.signbit(fvals[:-1]) != np.signbit(fvals[1:]))[0]
    if len(sign_idx) == 0:
        raise ValueError(
            "No sign change found in interval despite assumed root."
        )

    # Take the first sign-change interval
    i = sign_idx[0]
    a, b = grid[i], grid[i + 1]

    # ---- 4. Guaranteed convergence using brentq --------
    sol = root_scalar(residual, bracket=[a, b], method='brentq')
    if not sol.converged:
        raise ValueError("brentq did not converge within constructed bracket.")

    return sol.root

interp_nan(x, y)

Set all nan values in y based on the two nearest values of x for which y is not nan.

Parameters:

Name Type Description Default
x ndarray

[n_points] Independent variable e.g. pressure

required
y ndarray

[n_points] Dependent variable e.g. temperature

required

Returns:

Name Type Description
y ndarray

[n_points] Same as input but with all nans replaced through interpolation.

not_valid_idx ndarray

[n_not_valid] Indices of nans in y that were replaced through interpolation.

Source code in isca_tools/utils/numerical.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
def interp_nan(x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Set all nan values in `y` based on the two nearest values of `x` for which `y` is not nan.

    Args:
        x: `[n_points]` Independent variable e.g. pressure
        y: `[n_points]` Dependent variable e.g. temperature

    Returns:
        y: `[n_points]` Same as input but with all nans replaced through interpolation.
        not_valid_idx: `[n_not_valid]` Indices of nans in `y` that were replaced through interpolation.
    """
    # Set all nan values in x based on the two nearest indices that are not nan
    # using linear interpolation
    not_valid = np.isnan(y)
    if not_valid.sum() == 0:
        return y, np.zeros(0)
    else:
        valid_idx = np.where(~not_valid)[0]
        not_valid_idx = np.where(not_valid)[0]
        for i in not_valid_idx:
            if i < valid_idx[0]:
                # Case: before first valid point
                j1, j2 = valid_idx[0], valid_idx[1]
            elif i > valid_idx[-1]:
                # Case: after last valid point
                j1, j2 = valid_idx[-2], valid_idx[-1]
            else:
                # Case: between valid points
                # nearest valid indices around i
                j2 = valid_idx[valid_idx > i][0]
                j1 = valid_idx[valid_idx < i][-1]
            # Linear interpolation between (x[j1], y[j1]) and (x[j2], y[j2])
            slope = (y[j2] - y[j1]) / (x[j2] - x[j1])
            y[i] = y[j1] + slope * (x[i] - x[j1])
    return y, not_valid_idx

polyfit_fixed_coefs(x, y, deg, c0=None, w=None, rcond=None)

Least-squares polynomial fit with optional fixed coefficients.

Fits p(x) = sum_{j=0..deg} coef[j] * x**(deg-j) using least squares, where some coef entries may be fixed.

This matches np.polyfit coefficient order: coef[0] multiplies x**deg and coef[-1] is the constant term.

Parameters:

Name Type Description Default
x ndarray

1D array of x values, shape (n,).

required
y ndarray

1D array of y values, shape (n,).

required
deg

Polynomial degree (>= 0).

required
c0 Optional[List]

Optional sequence of length deg+1. Use None for unknown coefficients to be solved; use a number for coefficients to be held fixed. If None, all coefficients are solved (equivalent to np.polyfit).

None
w Optional[ndarray]

Optional 1D weights, shape (n,). Minimizes sum((w(y - p(x)))*2), same convention as np.polyfit.

None
rcond

Passed to np.linalg.lstsq (default uses numpy's default).

None

Returns:

Name Type Description
coef ndarray

1D array of length deg+1, in np.polyfit order.

Raises:

Type Description
ValueError

If c0 length is not deg+1, or if all coefficients are fixed but they don't match y in a least-squares sense (you still get the coefficients; this is just not "fit").

Source code in isca_tools/utils/numerical.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def polyfit_fixed_coefs(x: np.ndarray, y: np.ndarray, deg, c0: Optional[List] = None,
                        w: Optional[np.ndarray] = None, rcond=None) -> np.ndarray:
    """Least-squares polynomial fit with optional fixed coefficients.

    Fits p(x) = sum_{j=0..deg} coef[j] * x**(deg-j) using least squares,
    where some coef entries may be fixed.

    This matches np.polyfit coefficient order: coef[0] multiplies x**deg and
    coef[-1] is the constant term.

    Args:
        x: 1D array of x values, shape (n,).
        y: 1D array of y values, shape (n,).
        deg: Polynomial degree (>= 0).
        c0: Optional sequence of length deg+1. Use None for unknown coefficients
            to be solved; use a number for coefficients to be held fixed.
            If None, all coefficients are solved (equivalent to np.polyfit).
        w: Optional 1D weights, shape (n,). Minimizes sum((w*(y - p(x)))**2),
            same convention as np.polyfit.
        rcond: Passed to np.linalg.lstsq (default uses numpy's default).

    Returns:
        coef: 1D array of length deg+1, in np.polyfit order.

    Raises:
        ValueError: If c0 length is not deg+1, or if all coefficients are fixed
            but they don't match y in a least-squares sense (you still get the
            coefficients; this is just not "fit").
    """
    if c0 is None:
        return np.polyfit(x, y, deg, w=w)
    else:
        x = np.asarray(x)
        y = np.asarray(y)

        if x.ndim != 1 or y.ndim != 1:
            raise ValueError("x and y must be 1D arrays.")
        if x.shape[0] != y.shape[0]:
            raise ValueError("x and y must have the same length.")
        if deg < 0:
            raise ValueError("deg must be >= 0.")

        # Vandermonde with columns [x**deg, ..., x**0]
        V = np.vander(x, N=deg + 1, increasing=False)

        if c0 is None:
            solved_mask = np.ones(deg + 1, dtype=bool)
            fixed = np.zeros(deg + 1, dtype=float)
        else:
            if len(c0) != deg + 1:
                raise ValueError(f"c0 must have length deg+1 = {deg + 1}.")
            solved_mask = np.array([ci is None for ci in c0], dtype=bool)
            fixed = np.array([0.0 if ci is None else float(ci) for ci in c0], dtype=float)

        # If everything is fixed, nothing to solve: just return fixed coefficients.
        if not np.any(solved_mask):
            coef = fixed.copy()
            return coef

        # Move contribution of fixed coefficients to RHS: y' = y - V_fixed @ fixed_vals
        V_fixed = V[:, ~solved_mask]
        V_free = V[:, solved_mask]
        y_adj = y - V_fixed @ fixed[~solved_mask]

        # Optional weights like np.polyfit: minimize ||w*(y_adj - V_free @ a_free)||^2
        if w is not None:
            w = np.asarray(w)
            if w.ndim != 1 or w.shape[0] != x.shape[0]:
                raise ValueError("w must be a 1D array with same length as x.")
            V_free = V_free * w[:, None]
            y_adj = y_adj * w

        coef_free, *_ = np.linalg.lstsq(V_free, y_adj, rcond=rcond)

        coef = fixed.copy()
        coef[solved_mask] = coef_free

        return coef

polyfit_phase(x, y, deg, time=None, time_start=None, time_end=None, deg_phase_calc=10, resample=False, include_phase=True, fourier_harmonics=None, integ_method='spline', coef_fix=None, pad_coefs_phase=False)

This fits a polynomial y_approx(x) = p[0] * x**deg + ... + p[deg] of degree deg to points (x, y) as np.polyfit but also includes additional phase shift term such that the total approximation for y is:

\(y_{approx} = \frac{1}{2} \lambda_{phase}(x(t-T/4) - x(t+T/4)) + \sum_{n=0}^{n_{deg}} \lambda_n x^n\)

where \(\lambda_n=\)poly_coefs[-1-n] and \(\lambda_{phase}=\)poly_coefs[0]. \(x\) is assumed periodic with period \(T=\)time[-1]-time[0]+time_spacing.

The phase component, \(y_{phase}=\frac{1}{2} \lambda_{phase}(x(t-T/4) - x(t+T/4))\), is found first from the residual of \(y-y_{best}\), where \(y_{best}\) is the polynomial approximation of degree deg_phase_calc.

\(\sum_{n=0}^{n_{deg}} \lambda_n x^n\) is then found by doing the normal polynomial approximation of degree deg to the residual \(y-y_{phase}\).

If fourier_harmonics is provided, then a fourier series will also be added to \(y_{approx}\) containing all harmonics, \(n\), in fourier_harmonics:

\(y_{approx} + \sum_{n} F_n\cos(2n\pi ft - \Phi_n)\)

The idea behind this is to account for part of \(y\) not directly related to \(x\).

Parameters:

Name Type Description Default
x ndarray

float [n_x]
\(x\) coordinates used to approximate \(y\). x[i] is value at time time[i].

required
y ndarray

float [n_x]
\(y\) coordinate correesponding to each \(x\).

required
deg int

Degree of the fitting polynomial. If negative, will only do the phase fitting.

required
time Optional[ndarray]

float [n_x]
Time such that x[i] is \(x\) and y[i] is \(y\) at time time[i]. If time provided, will use spline to apply phase shift to \(x\). If time not provided, assume time is np.arange(n_x), and will use np.roll to apply phase shift to \(x\).

None
time_start Optional[float]

Start time such that period is given by time_end - time_start + 1. If not provided, will set to min value in time.

None
time_end Optional[float]

End time such that period is given by time_end - time_start + 1. If not provided, will set to max value in time.

None
deg_phase_calc int

Degree of the fitting polynomial to use in the phase term calculation. Should be a large integer.

10
resample bool

If True, will use resample_data to resample x and y before each calling of np.polyfit.

False
include_phase bool

If False, will only call np.polyfit, but first return value will be 0 indicating no phase shift. Only makes sense to call this rather than np.polyfit if you want to use resample.

True
fourier_harmonics Optional[Union[int, ndarray]]

int [n_harmonics_include]
After applied polynomial of degree deg_phase_calc and fit the phase factor, a residual is obtained y_residual = y - y_best - y_phase. If fourier_harmonics is provided, a fourier series will be directly fit to this residual including all the harmonics in fourier_harmonics. If fourier_harmonics is an integer, all harmonics up to and including the value will be fit.
The final polynomial of degree deg will then be fit to y - y_phase - y_fourier.
If provide array, only harmonics included in array will be fit, but will return coefficients for all up to the max harmonic given, with zeros wherever harmonic not in fourier_harmonics.
Idea behind this is to account for part of \(y\) not directly related to \(x\).

None
integ_method str

How to perform the integration when calculating Fourier coefficients.
If spline, will fit a spline and then integrate the spline, otherwise will use scipy.integrate.simpson.

'spline'
coef_fix Optional[List]

Option to fix some coefs in poly_coefs, same order as output with first being phase coef. All coefs in array that are None will be computed.

None
pad_coefs_phase bool

If True, will set coefs_fourier_phase to length fourier_harmonics.max()+1, with the first value as zero. Otherwise will be size fourier_harmonics.max().

False

Returns:

Name Type Description
poly_coefs Union[ndarray, Tuple[ndarray, ndarray, ndarray]]

float [n_deg+2] Polynomial coefficients, phase first and then normal output of np.polyfit with lowest power last.

coefs_fourier_amp Union[ndarray, Tuple[ndarray, ndarray, ndarray]]

float [fourier_harmonics.max()+1]
coefs_fourier_amp[n] is the amplitude fourier coefficient for the \(n^{th}\) harmonic.
First value will be zero, because \(0^{th}\) harmonic is just a constant and so will be found in polynomial fitting.
Only returned if fourier_harmonics is provided.

coefs_fourier_phase Union[ndarray, Tuple[ndarray, ndarray, ndarray]]

float [fourier_harmonics.max()]
coefs_fourier_phase[n] is the phase fourier coefficient for the \((n+1)^{th}\) harmonic.
Only returned if fourier_harmonics is provided. If pad_coefs_phase, will pad at start with a zero so of length fourier_harmonics.max()+1.

Source code in isca_tools/utils/numerical.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def polyfit_phase(x: np.ndarray, y: np.ndarray,
                  deg: int, time: Optional[np.ndarray] = None, time_start: Optional[float] = None,
                  time_end: Optional[float] = None,
                  deg_phase_calc: int = 10, resample: bool = False,
                  include_phase: bool = True, fourier_harmonics: Optional[Union[int, np.ndarray]] = None,
                  integ_method: str = 'spline',
                  coef_fix: Optional[List] = None,
                  pad_coefs_phase: bool = False) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
    """
    This fits a polynomial `y_approx(x) = p[0] * x**deg + ... + p[deg]` of degree `deg` to points (x, y) as `np.polyfit`
    but also includes additional phase shift term such that the total approximation for y is:

    $y_{approx} = \\frac{1}{2} \lambda_{phase}(x(t-T/4) - x(t+T/4)) + \sum_{n=0}^{n_{deg}} \lambda_n x^n$

    where $\lambda_n=$`poly_coefs[-1-n]` and $\lambda_{phase}=$`poly_coefs[0]`.
    $x$ is assumed periodic with period $T=$`time[-1]-time[0]+time_spacing`.

    The phase component, $y_{phase}=\\frac{1}{2} \lambda_{phase}(x(t-T/4) - x(t+T/4))$, is found first from the
    residual of $y-y_{best}$, where $y_{best}$ is the polynomial approximation of degree `deg_phase_calc`.

    $\sum_{n=0}^{n_{deg}} \lambda_n x^n$ is then found by doing the normal polynomial approximation of degree
    `deg` to the residual $y-y_{phase}$.

    If `fourier_harmonics` is provided, then a fourier series will also be added to $y_{approx}$ containing all
    harmonics, $n$, in `fourier_harmonics`:

    $y_{approx} + \\sum_{n} F_n\\cos(2n\\pi ft - \\Phi_n)$

    The idea behind this is to account for part of $y$ not directly related to $x$.

    Args:
        x: `float [n_x]`</br>
            $x$ coordinates used to approximate $y$. `x[i]` is value at time `time[i]`.
        y: `float [n_x]`</br>
            $y$ coordinate correesponding to each $x$.
        deg: Degree of the fitting polynomial. If negative, will only do the phase fitting.
        time: `float [n_x]`</br>
            Time such that `x[i]` is $x$ and `y[i]` is $y$ at time `time[i]`.</n>
            If time provided, will use spline to apply phase shift to $x$.</n>
            If time not provided, assume time is `np.arange(n_x)`, and will use `np.roll` to apply phase shift to $x$.
        time_start: Start time such that period is given by `time_end - time_start + 1`.
            If not provided, will set to min value in `time`.
        time_end: End time such that period is given by `time_end - time_start + 1`.
            If not provided, will set to max value in `time`.
        deg_phase_calc: Degree of the fitting polynomial to use in the phase term calculation.
            Should be a large integer.
        resample: If `True`, will use `resample_data` to resample x and y before each calling of
            `np.polyfit`.
        include_phase: If `False`, will only call `np.polyfit`, but first return value will be 0 indicating
            no phase shift. Only makes sense to call this rather than `np.polyfit` if you want to use `resample`.
        fourier_harmonics: `int [n_harmonics_include]`</br>
            After applied polynomial of degree `deg_phase_calc` and fit the phase factor, a residual is obtained
            `y_residual = y - y_best - y_phase`. If `fourier_harmonics` is provided, a fourier
            series will be directly fit to this residual including all the harmonics in `fourier_harmonics`.
            If `fourier_harmonics` is an integer, all harmonics up to and including the value will be fit.</br>
            The final polynomial of degree `deg` will then be fit to `y - y_phase - y_fourier`.</br>
            If provide array, only harmonics included in array will be fit, but will return coefficients for
            all up to the max harmonic given, with zeros wherever harmonic not in `fourier_harmonics`.</br>
            Idea behind this is to account for part of $y$ not directly related to $x$.
        integ_method: How to perform the integration when calculating Fourier coefficients.</br>
            If `spline`, will fit a spline and then integrate the spline, otherwise will use `scipy.integrate.simpson`.
        coef_fix: Option to fix some coefs in poly_coefs, same order as output with first being phase coef.
            All coefs in array that are None will be computed.
        pad_coefs_phase: If `True`, will set `coefs_fourier_phase` to length `fourier_harmonics.max()+1`, with
            the first value as zero. Otherwise will be size `fourier_harmonics.max()`.

    Returns:
        poly_coefs: `float [n_deg+2]`
            Polynomial coefficients, phase first and then normal output of `np.polyfit` with lowest power last.
        coefs_fourier_amp: `float [fourier_harmonics.max()+1]`</br>
            `coefs_fourier_amp[n]` is the amplitude fourier coefficient for the $n^{th}$ harmonic.</br>
            First value will be zero, because $0^{th}$ harmonic is just a constant and so will be found in
            polynomial fitting.</br>
            Only returned if `fourier_harmonics` is provided.
        coefs_fourier_phase: `float [fourier_harmonics.max()]`</br>
            `coefs_fourier_phase[n]` is the phase fourier coefficient for the $(n+1)^{th}$ harmonic.</br>
            Only returned if `fourier_harmonics` is provided.
            If `pad_coefs_phase`, will pad at start with a zero so of length `fourier_harmonics.max()+1`.
    """
    coefs = np.zeros(np.clip(deg, 0, 1000) + 2)  # first coef is phase coef
    if resample:
        x_fit, y_fit = resample_data(time, x, y)[1:]
    else:
        x_fit = x
        y_fit = y
    if coef_fix is not None:
        if len(coef_fix) != deg + 2:
            raise ValueError(f"coef_fix must have length deg+2={deg + 2} not {len(coef_fix)}.\n"
                             f"First is phase coef, then the polynomial coefs")
        coef_fix_phase = coef_fix[0]
        coef_fix = coef_fix[1:]
    else:
        coef_fix_phase = None
    if not include_phase:
        coefs[1:] = polyfit_fixed_coefs(x_fit, y_fit, deg, coef_fix)  # don't do phase stuff so 1st value is 0
    else:
        y_best_polyfit = np.polyval(np.polyfit(x_fit, y_fit, deg_phase_calc), x)
        x_shift = 0.5 * (get_var_shift(x, shift_phase=0.25, time=time, time_start=time_start, time_end=time_end) -
                         get_var_shift(x, shift_phase=-0.25, time=time, time_start=time_start, time_end=time_end))
        if resample:
            x_shift_fit, y_residual_fit = resample_data(time, x_shift, y - y_best_polyfit)[1:]
        else:
            x_shift_fit = x_shift
            y_residual_fit = y - y_best_polyfit
        if (coef_fix is not None) and (coef_fix[-1] is not None):
            # Constraint the deg=0 coefficient in this case
            coefs[[0, -1]] = polyfit_fixed_coefs(x_shift_fit, y_residual_fit, 1,
                                                 c0=[None, coef_fix[-1]])
        else:
            coefs[[0, -1]] = np.polyfit(x_shift_fit, y_residual_fit, 1)
        if coef_fix_phase is not None:
            # If supplied coef_phase, then set this here
            coefs[0] = coef_fix_phase       # fit the
        y_no_phase = y - polyval_phase(coefs, x, time, time_start,
                                       time_end)  # residual after removing phase dependent term

        if fourier_harmonics is not None:
            time_use = np.arange(x.size) if time is None else time
            if not all(np.ediff1d(time_use) == np.ediff1d(time_use)[0]):
                raise ValueError('Can only include Fourier with evenly spaced data')
            if isinstance(fourier_harmonics, int):
                # if int, use all harmonics up to value indicated but without 0th harmonic
                fourier_harmonics = np.arange(1, fourier_harmonics + 1)
            coefs_fourier_amp = np.zeros(np.max(fourier_harmonics) + 1)
            coefs_fourier_phase = np.zeros(np.max(fourier_harmonics))
            for n in fourier_harmonics:
                if n == 0:
                    warnings.warn('Will not fit 0th harmonic as constant will be fit in polynomial')
                else:
                    coefs_fourier_amp[n], coefs_fourier_phase[n - 1] = \
                        get_fourier_coef(time_use, y_no_phase - y_best_polyfit, n, integ_method)
            y_residual_fourier = fourier_series(time_use, coefs_fourier_amp, coefs_fourier_phase)
            y_no_phase = y_no_phase - y_residual_fourier

        if deg >= 0:
            if resample:
                x_fit, y_no_phase_fit = resample_data(time, x, y_no_phase)[1:]
            else:
                x_fit = x
                y_no_phase_fit = y_no_phase
            coefs[1:] += polyfit_fixed_coefs(x_fit, y_no_phase_fit, deg, coef_fix)

    if fourier_harmonics is None:
        return coefs
    else:
        if pad_coefs_phase:
            # Set first value to zero
            coefs_fourier_phase = np.hstack((np.zeros(1), coefs_fourier_phase))
        return coefs, coefs_fourier_amp, coefs_fourier_phase

polyval_phase(poly_coefs, x, time=None, time_start=None, time_end=None, coefs_fourier_amp=None, coefs_fourier_phase=None, pad_coefs_phase=False)

Given the polynomial coefficients found by polyfit_phase for fitting a polynomial of degree len(polyfit)-2 of \(x\) to \(y\), this will return the approximation of \(y\):

\(y_{approx} = \frac{1}{2} \lambda_{phase}(x(t-T/4) - x(t+T/4)) + \sum_{n=0}^{n_{deg}} \lambda_n x^n\)

where \(\lambda_n=\)poly_coefs[-1-n], \(\lambda_{phase}=\)poly_coefs[0] and \(x\) is assumed periodic with period \(T\).

If coefs_fourier_amp is provided, then a fourier series will also be added to \(y_{approx}\):

\(y_{approx} + \frac{F_0}{2} + \sum_{n=1}^{N} F_n\cos(2n\pi ft - \Phi_n)\)

Parameters:

Name Type Description Default
poly_coefs ndarray

float [n_deg+2]
Polynomial coefficients as output by polyfit_phase, lowest power last.
\(\lambda_{phase}=\)poly_coefs[0] and \(\lambda_n=\)poly_coefs[-1-n].

required
x ndarray

float [n_x]
\(x\) coordinates used to approximate \(y\).

required
time Optional[ndarray]

float [n_x]
Time such that x[i] is \(x\) at time time[i]. If time provided, will use spline to apply shift to \(x\). If time not provided, assume time is np.arange(n_x), and will use np.roll to apply shift to \(x\).

None
time_start Optional[float]

Start time such that period is given by time_end - time_start + 1. If not provided, will set to min value in time.

None
time_end Optional[float]

End time such that period is given by time_end - time_start + 1. If not provided, will set to max value in time.

None
coefs_fourier_amp Optional[ndarray]

float [n_harmonics+1]
The amplitude Fourier coefficients \(F_n\).

None
coefs_fourier_phase Optional[ndarray]

float [n_harmonics]
The phase Fourier coefficients \(\Phi_n\).

None
pad_coefs_phase bool

If True, expect coefs_fourier_phase to be of length n_harmonics+1 with first value equal to zero.

False

Returns:

Name Type Description
y_approx ndarray

float [n_x]
Polynomial approximation to \(y\), possibly including phase and Fourier terms.

Source code in isca_tools/utils/numerical.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def polyval_phase(poly_coefs: np.ndarray, x: np.ndarray, time: Optional[np.ndarray] = None,
                  time_start: Optional[float] = None, time_end: Optional[float] = None,
                  coefs_fourier_amp: Optional[np.ndarray] = None,
                  coefs_fourier_phase: Optional[np.ndarray] = None,
                  pad_coefs_phase: bool = False) -> np.ndarray:
    """
    Given the polynomial coefficients found by `polyfit_phase` for fitting a polynomial
    of degree `len(polyfit)-2` of $x$ to $y$, this will return the approximation of $y$:

    $y_{approx} = \\frac{1}{2} \lambda_{phase}(x(t-T/4) - x(t+T/4)) + \sum_{n=0}^{n_{deg}} \lambda_n x^n$

    where $\lambda_n=$`poly_coefs[-1-n]`, $\lambda_{phase}=$`poly_coefs[0]` and
    $x$ is assumed periodic with period $T$.

    If `coefs_fourier_amp` is provided, then a fourier series will also be added to $y_{approx}$:

    $y_{approx} + \\frac{F_0}{2} + \\sum_{n=1}^{N} F_n\\cos(2n\\pi ft - \\Phi_n)$

    Args:
        poly_coefs: `float [n_deg+2]`</br>
            Polynomial coefficients as output by `polyfit_phase`, lowest power last.</br>
            $\lambda_{phase}=$`poly_coefs[0]` and $\lambda_n=$`poly_coefs[-1-n]`.
        x: `float [n_x]`</br>
            $x$ coordinates used to approximate $y$.
        time: `float [n_x]`</br>
            Time such that `x[i]` is $x$ at time `time[i]`.</n>
            If `time` provided, will use spline to apply shift to $x$.</n>
            If `time` not provided, assume time is `np.arange(n_x)`, and will use `np.roll` to apply shift to $x$.
        time_start: Start time such that period is given by `time_end - time_start + 1`.
            If not provided, will set to min value in `time`.
        time_end: End time such that period is given by `time_end - time_start + 1`.
            If not provided, will set to max value in `time`.
        coefs_fourier_amp: `float [n_harmonics+1]`</br>
            The amplitude Fourier coefficients $F_n$.
        coefs_fourier_phase: `float [n_harmonics]`</br>
            The phase Fourier coefficients $\\Phi_n$.
        pad_coefs_phase: If `True`, expect `coefs_fourier_phase` to be of length `n_harmonics+1` with first value
            equal to zero.

    Returns:
        y_approx: `float [n_x]`</br>
            Polynomial approximation to $y$, possibly including phase and Fourier terms.
    """
    # In this case, poly_coefs are output of polyfit_with_phase so first coefficient is the phase coefficient
    y_approx = np.polyval(poly_coefs[1:], x)
    x_shift = 0.5 * (get_var_shift(x, shift_phase=0.25, time=time, time_start=time_start, time_end=time_end) -
                     get_var_shift(x, shift_phase=-0.25, time=time, time_start=time_start, time_end=time_end))
    if coefs_fourier_amp is not None:
        time_use = np.arange(x.size) if time is None else time
        y_residual_fourier = fourier_series(time_use, coefs_fourier_amp, coefs_fourier_phase, pad_coefs_phase)
    else:
        y_residual_fourier = 0
    return y_approx + poly_coefs[0] * x_shift + y_residual_fourier

resample_data(time, x, y, x_return=None, n_return=None, bc_type='periodic', extrapolate=False)

Given that x[i] and y[i] both occur at time time[i], this resamples data to return values of y corresponding to x_return.

Parameters:

Name Type Description Default
time Optional[ndarray]

float [n_time]
Times such that x[i] and y[i] correspond to time time[i]. It assumes time has a spacing of 1, and starts with 0, so for a 360-day year, it would be np.arange(360).

required
x ndarray

float [n_time]
Value of variable \(x\) at each time.

required
y ndarray

float [n_time]
Value of variable \(y\) at each time.

required
x_return Optional[ndarray]

float [n_return]
Values of \(x\) for the resampled \(y\) data to be returned. If not provided, will use np.linspace(x.min(), x.max(), n_return).

None
n_return Optional[int]

Number of resampled data if x_return is not provided. Will set to n_time neither this nor x_return provided.

None
bc_type str

Boundary condition type in scipy.interpolate.CubicSpline.

'periodic'
extrapolate bool

Whether to extrapolate if any x_return outsidex is provided.

False

Returns:

Name Type Description
times_return ndarray

float [n_return_out]
Times corresponding to x_return. Not necessarily n_return values because can have multiple \(y\) values for each \(x\).

x_return_out ndarray

float [n_return_out]
\(x\) values corresponding to times_return. Will only contain values in x_return, but may contain multiple of each.

y_return ndarray

float [n_return_out]
\(y\) values corresponding to times_return and x_return_out.

Source code in isca_tools/utils/numerical.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def resample_data(time: Optional[np.ndarray], x: np.ndarray, y: np.ndarray, x_return: Optional[np.ndarray] = None,
                  n_return: Optional[int] = None, bc_type: str = 'periodic',
                  extrapolate: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Given that `x[i]` and `y[i]` both occur at time `time[i]`, this resamples data to return values of `y`
    corresponding to `x_return`.

    Args:
        time: `float [n_time]`</br>
            Times such that `x[i]` and `y[i]` correspond to time `time[i]`. It assumes time has a spacing of 1, and
            starts with 0, so for a 360-day year, it would be `np.arange(360)`.
        x: `float [n_time]`</br>
            Value of variable $x$ at each time.
        y: `float [n_time]`</br>
            Value of variable $y$ at each time.
        x_return: `float [n_return]`</br>
            Values of $x$ for the resampled $y$ data to be returned. If not provided, will use
            `np.linspace(x.min(), x.max(), n_return)`.
        n_return: Number of resampled data if `x_return` is not provided. Will set to `n_time` neither this
            nor `x_return` provided.
        bc_type: Boundary condition type in `scipy.interpolate.CubicSpline`.
        extrapolate: Whether to extrapolate if any `x_return` outside`x` is provided.

    Returns:
        times_return: `float [n_return_out]`</br>
            Times corresponding to `x_return`. Not necessarily `n_return` values because can have multiple $y$
            values for each $x$.
        x_return_out: `float [n_return_out]`</br>
            $x$ values corresponding to `times_return`. Will only contain values in `x_return`, but may contain
            multiple of each.
        y_return: `float [n_return_out]`</br>
            $y$ values corresponding to `times_return` and `x_return_out`.
    """
    if n_return is None:
        n_return = x.size
    if time is None:
        time = np.arange(x.size)
    time_spacing = np.median(np.ediff1d(time))
    if 'periodic' in bc_type:
        x_spline = CubicSpline(np.append(time, [time[-1] + time_spacing]), np.append(x, x[0]),
                               bc_type=bc_type)
        y_spline = CubicSpline(np.append(time, [time[-1] + time_spacing]), np.append(y, y[0]),
                               bc_type=bc_type)
    else:
        x_spline = CubicSpline(time, x, bc_type=bc_type)
        y_spline = CubicSpline(time, y, bc_type=bc_type)
    if x_return is None:
        x_return = np.linspace(x.min(), x.max(), n_return)
    times_return = []
    for i in range(x_return.size):
        times_return += [*x_spline.solve(x_return[i], extrapolate=extrapolate)]
    times_return = np.asarray(times_return) % time[-1]  # make return times between 0 and time[-1]
    return times_return, x_spline(times_return), y_spline(times_return)

resample_data_distance(time, x, y, n_return=None, bc_type='periodic', norm=False)

Given that x[i] and y[i] both occur at time time[i], this resamples data to return n_return values of \(x\) and \(y\) evenly spaced along the line connecting all (x, y) coordinates.

Parameters:

Name Type Description Default
time Optional[ndarray]

float [n_time]
Times such that x[i] and y[i] correspond to time time[i].
If time not provided, assume time is np.arange(n_x).

required
x ndarray

float [n_time]
Value of variable \(x\) at each time.

required
y ndarray

float [n_time]
Value of variable \(y\) at each time.

required
n_return Optional[int]

Number of resampled data, will set to n_time if not provided.

None
bc_type str

Boundary condition type in scipy.interpolate.CubicSpline for x and y.

'periodic'
norm bool

If True will normalize x and y so both have a range of 1, before calculating distance along line.

False

Returns:

Name Type Description
times_return ndarray

float [n_return]
Times of returned \(x\) and \(y\) such that they are evenly spaced along line connecting all input (x, y) coordinates.

x_return_out ndarray

float [n_return_out]
\(x\) values corresponding to times_return.

y_return ndarray

float [n_return_out]
\(y\) values corresponding to times_return and x_return.

Source code in isca_tools/utils/numerical.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def resample_data_distance(time: Optional[np.ndarray], x: np.ndarray, y: np.ndarray, n_return: Optional[int] = None,
                           bc_type: str = 'periodic', norm: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Given that `x[i]` and `y[i]` both occur at time `time[i]`, this resamples data to return `n_return` values of
    $x$ and $y$ evenly spaced along the line connecting all (x, y) coordinates.

    Args:
        time: `float [n_time]`</br>
            Times such that `x[i]` and `y[i]` correspond to time `time[i]`.</br>
            If time not provided, assume time is `np.arange(n_x)`.
        x: `float [n_time]`</br>
            Value of variable $x$ at each time.
        y: `float [n_time]`</br>
            Value of variable $y$ at each time.
        n_return:  Number of resampled data, will set to `n_time` if not provided.
        bc_type: Boundary condition type in `scipy.interpolate.CubicSpline` for `x` and `y`.
        norm: If `True` will normalize `x` and `y` so both have a range of 1, before calculating distance along line.

    Returns:
        times_return: `float [n_return]`</br>
            Times of returned $x$ and $y$ such that they are evenly spaced along line connecting all input
            (x, y) coordinates.
        x_return_out: `float [n_return_out]`</br>
            $x$ values corresponding to `times_return`.
        y_return: `float [n_return_out]`</br>
            $y$ values corresponding to `times_return` and `x_return`.
    """
    if n_return is None:
        n_return = x.size
    if time is None:
        time = np.arange(x.size)
    time_spacing = np.median(np.ediff1d(time))
    # dist[i] is distance along line from (x[0], y[0]) at time=time[i]
    if norm:
        coords_dist_calc = np.vstack((x / (np.max(x) - np.min(x)), y / (np.max(y) - np.min(y))))
    else:
        coords_dist_calc = np.vstack((x, y))
    dist = np.append(0, np.cumsum(np.sqrt(np.sum(np.diff(coords_dist_calc, axis=1) ** 2, axis=0))))
    x_spline = CubicSpline(np.append(time, [time[-1] + time_spacing]), np.append(x, x[0]),
                           bc_type=bc_type)
    y_spline = CubicSpline(np.append(time, [time[-1] + time_spacing]), np.append(y, y[0]),
                           bc_type=bc_type)
    dist_spline = CubicSpline(time, dist)
    dist_return = np.linspace(dist[0], dist[-1], n_return)
    # Adjust first and last values by tiny amount, to ensure that within the range when trying to solve
    small = 0.0001 * (dist_return[1] - dist_return[0])
    dist_return[0] += small
    dist_return[-1] -= small
    time_resample = np.zeros(n_return)
    for i in range(n_return):
        time_resample[i] = dist_spline.solve(dist_return[i], extrapolate=False)[0]
    return time_resample, x_spline(time_resample), y_spline(time_resample)

spline_integral(x, dy_dx, y0=0, x0=None, x_return=None, periodic=False)

Uses spline integration to solve for \(y\) given \(\frac{dy}{dx}\) such that \(y=y_0\) at \(x=x_0\).

Parameters:

Name Type Description Default
x ndarray

float [n_x]
Values of \(x\) where \(\frac{dy}{dx}\) given, and used to fit the spline.

required
dy_dx ndarray

float [n_x]
Values of \(\frac{dy}{dx}\) corresponding to \(x\), and used to fit the spline.

required
y0 float

Boundary condition, \(y(x_0)=y_0\).

0
x0 Optional[float]

Boundary condition, \(y(x_0)=y_0\).
If not given, will assume x0=x_return[0].

None
x_return Optional[ndarray]

float [n_x_return]
Values of \(y\) are returned for these \(x\) values. If not given, will set to x.

None
periodic bool

Whether to use periodic boundary condition.
If periodic expect \(\frac{dy}{dx}\) at \(x=\)x[-1]+x_spacing is equal to dy_dx[0].

False

Returns:

Name Type Description
y_return ndarray

float [n_x_return]
Values of \(y\) corresponding to `x_return.

Source code in isca_tools/utils/numerical.py
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def spline_integral(x: np.ndarray, dy_dx: np.ndarray, y0: float = 0, x0: Optional[float] = None,
                    x_return: Optional[np.ndarray] = None,
                    periodic: bool = False) -> np.ndarray:
    """
    Uses spline integration to solve for $y$ given $\\frac{dy}{dx}$ such that $y=y_0$ at $x=x_0$.

    Args:
        x: `float [n_x]`</br>
            Values of $x$ where $\\frac{dy}{dx}$ given, and used to fit the spline.
        dy_dx: `float [n_x]`</br>
            Values of $\\frac{dy}{dx}$ corresponding to $x$, and used to fit the spline.
        y0: Boundary condition, $y(x_0)=y_0$.
        x0: Boundary condition, $y(x_0)=y_0$.</br>
            If not given, will assume `x0=x_return[0]`.
        x_return: `float [n_x_return]`</br>
            Values of $y$ are returned for these $x$ values. If not given, will set to `x`.
        periodic: Whether to use periodic boundary condition.</br>
            If periodic expect $\\frac{dy}{dx}$ at $x=$`x[-1]+x_spacing` is equal to `dy_dx[0]`.

    Returns:
        y_return: `float [n_x_return]`</br>
            Values of $y$ corresponding to `x_return.
    """
    if periodic:
        x_spacing = np.median(np.ediff1d(x))
        spline_use = CubicSpline(np.append(x, x[-1] + x_spacing), np.append(dy_dx, dy_dx[0]), bc_type='periodic')
    else:
        spline_use = CubicSpline(x, dy_dx)
    if x_return is None:
        x_return = x
    if x0 is None:
        x0 = x_return[0]
    y = np.full_like(x_return, y0, dtype=float)
    for i in range(x_return.size):
        y[i] += spline_use.integrate(x0, x_return[i], extrapolate='periodic' if periodic else None)
    return y

weighted_median(x, w)

Compute the (lower) weighted median of 1D data.

This returns the first value x[i] (after sorting by x) for which the cumulative weight is at least half of the total weight. This is a common convention and always yields a valid weighted median (though the weighted median set can be an interval in special tie cases).

Parameters:

Name Type Description Default
x ndarray

1D array of values, shape (n,).

required
w ndarray

1D array of non-negative weights, shape (n,). Typically you want strictly positive weights on the points you include.

required

Returns:

Type Description
float

Weighted median value (float). By construction this implementation

float

returns a value that is an element of x.

Raises:

Type Description
ValueError

If x and w have different shapes, are not 1D, or if the total weight is zero.

Source code in isca_tools/utils/numerical.py
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
def weighted_median(x: np.ndarray, w: np.ndarray) -> float:
    """Compute the (lower) weighted median of 1D data.

    This returns the first value `x[i]` (after sorting by `x`) for which the
    cumulative weight is at least half of the total weight. This is a common
    convention and always yields a valid weighted median (though the weighted
    median set can be an interval in special tie cases).

    Args:
        x: 1D array of values, shape `(n,)`.
        w: 1D array of non-negative weights, shape `(n,)`. Typically you want
            strictly positive weights on the points you include.

    Returns:
        Weighted median value (float). By construction this implementation
        returns a value that is an element of `x`.

    Raises:
        ValueError: If `x` and `w` have different shapes, are not 1D, or if the
            total weight is zero.
    """
    x = np.asarray(x)
    w = np.asarray(w)

    if x.ndim != 1 or w.ndim != 1:
        raise ValueError("x and w must be 1D arrays.")
    if x.shape != w.shape:
        raise ValueError("x and w must have the same shape.")
    wsum = np.sum(w)
    if wsum <= 0:
        raise ValueError("Sum of weights must be > 0.")

    order = np.argsort(x)
    x = x[order]
    w = w[order]

    cw = np.cumsum(w)
    cutoff = 0.5 * wsum
    return float(x[np.searchsorted(cw, cutoff, side="left")])