Аналог метода pandas.Series.resample для numpy для ресемплирования временного ряда
Дано
Есть временные ряды. Приведу небольшой кусочек, в рабочей задаче количество элементов может достигать миллионов. Если для решения понадобится большой кусок - напишите, я предоставлю.
import datetime
time_series = [datetime.datetime(2024, 5, 13, 2, 9, 41, 713000),
datetime.datetime(2024, 5, 13, 2, 9, 41, 713000),
datetime.datetime(2024, 5, 13, 2, 9, 42, 228000),
datetime.datetime(2024, 5, 13, 2, 9, 42, 228000),
datetime.datetime(2024, 5, 13, 2, 9, 43, 896000),
datetime.datetime(2024, 5, 13, 2, 9, 45, 673000),
datetime.datetime(2024, 5, 13, 2, 9, 45, 673000),
datetime.datetime(2024, 5, 13, 2, 9, 46, 241000),
datetime.datetime(2024, 5, 13, 2, 9, 56, 503000),
datetime.datetime(2024, 5, 13, 2, 9, 56, 549000),
datetime.datetime(2024, 5, 13, 2, 9, 56, 582000),
datetime.datetime(2024, 5, 13, 2, 9, 56, 591000),
datetime.datetime(2024, 5, 13, 2, 9, 56, 612000),
datetime.datetime(2024, 5, 13, 2, 9, 56, 612000)]
Про эти ряды надо знать, что они монотонно возрастают и временные промежутки между событиями случайны.
Требуется
В частном случае: получить индексы первых и последних объектов ряда ресемплированных по произвольному периоду относительно времени самого "молодого" события (конечного события ряда).
В общем случае: применить к группам ресемплированных индексов произвольную функцию.
Решение pandas
Эта задача достаточно просто решается с помощью pandas. Я решаю так:
import pandas as pd
time_series_pandas = pd.Series(range(len(time_series)), index=time_series))
df_resample = time_series_pandas.resample('60s', origin='end').agg(first='first', last='last')
# Так как мне нужны два списка, отсортированных по убыванию, то потом еще вот так:
start, end = df_resample[::-1].T.values.tolist()
Это решение для частного случая. Для общего случая, вместо .agg можно применить .apply c произвольной функцией.
Вопрос: а как все тоже самое, но для numpy?
Так как ряды огромные, хочется оптимизировать, и первое, что приходит в голову это заменить pandas на numpy. Поиски в интернетах дали вот такое решение, но это совсем не то, тут ресемплинг по блокам из N элементов, а у меня временной ряд. И вот такое решение (второе решение). Это уже кое что, но я пока не до конца разобрался можно ли его применить конкретно к моей частной задаче и уж точно его не получится применить к общей задаче, так как мы ограничены набором интерполяций interp1d. (UPD Стал разбираться с этим методом, он не очень, так как np.linspace не работает с datetime)
Итого
Как для временного ряда заменить метод pandas.Series.resample на функционал numpy, разумеется, с учетом того, что это должно стать существенно быстрее?
Ответы (1 шт):
Не прошло и двух лет как я нашел решение :) Мне кажется, это полезно, оставлю здесь.
Для начала, надо привести значения datetime в нампи массиве к UNIX-time.
(В рабочей задаче время итак хранится в наносекундах UNIX-time, так что еще проще, но так как в постановке задачи я указал, что время - это datetime, то для чистоты эксперимента надо конвертировать)
import numpy as np
np_arr = np.array(time_series).astype("datetime64[ns]").astype('int64')
Далее, мы делим нацело этот массив на временной шаг, который нам требуется для ресемплинга, взятый в наносекундах.
np_arr_bins = np_arr // (60 * 1_000_000_000)
Мы получаем группы одинаковых чисел. Каждая группа одинаковых чисел и будет указывать нам на одну "корзину".
Следующим шагом мы последовательно применяем, сначала np.diff, чтобы оставить ненулевым только первое вхождение в каждой группе, остальные станут нулями, а затем np.flatnonzero(), который вернет нам индексы ненулевых элементов, которые и будут стартовыми индексами каждой корзины. Останется только добавить первый 0. Вот так:
change = np.flatnonzero(np.diff(np_arr_bins)) + 1
starts = np.concatenate(([0], change))
В получившемся массиве, каждые рядом стоящие элементы starts[i], starts[i+1] будут открывающим (закрытым) и закрывающим (открытым) индексами для каждой группы ресемплинга.
Чтобы применить к каждой группе агрегирующую функцию, применяется: np.ufunc.reduceat(data_array, starts), где ufunc - универсальная функция, например:
sums = np.add.reduceat(data_array, starts)
вычислит сумму каждой группы.
Конечно, такой способ синтаксически значительно более сложный, чем .resample, но по моим замерам выигрывает у пандаса два порядка в скорости.