Аналог метода pandas.Series.resample для numpy для ресемплирования временного ряда

Дано

Есть временные ряды. Приведу небольшой кусочек, в рабочей задаче количество элементов может достигать миллионов. Если для решения понадобится большой кусок - напишите, я предоставлю.

import datetime

time_series = [datetime.datetime(2024, 5, 13, 2, 9, 41, 713000),
               datetime.datetime(2024, 5, 13, 2, 9, 41, 713000),
               datetime.datetime(2024, 5, 13, 2, 9, 42, 228000),
               datetime.datetime(2024, 5, 13, 2, 9, 42, 228000), 
               datetime.datetime(2024, 5, 13, 2, 9, 43, 896000), 
               datetime.datetime(2024, 5, 13, 2, 9, 45, 673000), 
               datetime.datetime(2024, 5, 13, 2, 9, 45, 673000), 
               datetime.datetime(2024, 5, 13, 2, 9, 46, 241000), 
               datetime.datetime(2024, 5, 13, 2, 9, 56, 503000), 
               datetime.datetime(2024, 5, 13, 2, 9, 56, 549000), 
               datetime.datetime(2024, 5, 13, 2, 9, 56, 582000), 
               datetime.datetime(2024, 5, 13, 2, 9, 56, 591000), 
               datetime.datetime(2024, 5, 13, 2, 9, 56, 612000), 
               datetime.datetime(2024, 5, 13, 2, 9, 56, 612000)]

Про эти ряды надо знать, что они монотонно возрастают и временные промежутки между событиями случайны.

Требуется

В частном случае: получить индексы первых и последних объектов ряда ресемплированных по произвольному периоду относительно времени самого "молодого" события (конечного события ряда).

В общем случае: применить к группам ресемплированных индексов произвольную функцию.

Решение pandas

Эта задача достаточно просто решается с помощью pandas. Я решаю так:

import pandas as pd

time_series_pandas = pd.Series(range(len(time_series)), index=time_series))
df_resample = time_series_pandas.resample('60s', origin='end').agg(first='first', last='last')

# Так как мне нужны два списка, отсортированных по убыванию, то потом еще вот так:
start, end =  df_resample[::-1].T.values.tolist()

Это решение для частного случая. Для общего случая, вместо .agg можно применить .apply c произвольной функцией.

Вопрос: а как все тоже самое, но для numpy?

Так как ряды огромные, хочется оптимизировать, и первое, что приходит в голову это заменить pandas на numpy. Поиски в интернетах дали вот такое решение, но это совсем не то, тут ресемплинг по блокам из N элементов, а у меня временной ряд. И вот такое решение (второе решение). Это уже кое что, но я пока не до конца разобрался можно ли его применить конкретно к моей частной задаче и уж точно его не получится применить к общей задаче, так как мы ограничены набором интерполяций interp1d. (UPD Стал разбираться с этим методом, он не очень, так как np.linspace не работает с datetime)

Итого

Как для временного ряда заменить метод pandas.Series.resample на функционал numpy, разумеется, с учетом того, что это должно стать существенно быстрее?


Ответы (1 шт):

Автор решения: Alexey Trukhanov

Не прошло и двух лет как я нашел решение :) Мне кажется, это полезно, оставлю здесь.

Для начала, надо привести значения datetime в нампи массиве к UNIX-time. (В рабочей задаче время итак хранится в наносекундах UNIX-time, так что еще проще, но так как в постановке задачи я указал, что время - это datetime, то для чистоты эксперимента надо конвертировать)

import numpy as np

np_arr = np.array(time_series).astype("datetime64[ns]").astype('int64')

Далее, мы делим нацело этот массив на временной шаг, который нам требуется для ресемплинга, взятый в наносекундах.

np_arr_bins = np_arr // (60 * 1_000_000_000)

Мы получаем группы одинаковых чисел. Каждая группа одинаковых чисел и будет указывать нам на одну "корзину".

Следующим шагом мы последовательно применяем, сначала np.diff, чтобы оставить ненулевым только первое вхождение в каждой группе, остальные станут нулями, а затем np.flatnonzero(), который вернет нам индексы ненулевых элементов, которые и будут стартовыми индексами каждой корзины. Останется только добавить первый 0. Вот так:

change = np.flatnonzero(np.diff(np_arr_bins)) + 1
starts = np.concatenate(([0], change))

В получившемся массиве, каждые рядом стоящие элементы starts[i], starts[i+1] будут открывающим (закрытым) и закрывающим (открытым) индексами для каждой группы ресемплинга.

Чтобы применить к каждой группе агрегирующую функцию, применяется: np.ufunc.reduceat(data_array, starts), где ufunc - универсальная функция, например:

sums = np.add.reduceat(data_array, starts)

вычислит сумму каждой группы.

Конечно, такой способ синтаксически значительно более сложный, чем .resample, но по моим замерам выигрывает у пандаса два порядка в скорости.

→ Ссылка