Максимально быстрая фильтрация динамических данных по диапазону временного ряда

Примечание

В этой задаче можно использовать любые типы данных и модули, в том числе и сторонних разработчиков (numpy, pandas, sorted containers и т.д.). Главный критерий - скорость выполнения.

Дано:

На вход нашей логики последовательно поступают одинаковые по структуре словари, содержащие информацию (параметры) некоего события. В этих словарях несколько значений являются типами float (назовем ключи param_1, param_2...) и одно из значений является временной меткой этого события (назовем ключ dt). Мы разбираем этот словарь по отдельным спискам. Каждый параметр - в отдельный список, временной ряд - в отдельный список. Таким образом под одним и тем же индексом в разных списках содержится информация об одном событии. О временных метках нам известно, что они неубывающие, то есть последующая метка может быть либо равна либо больше предыдущей. Таймдельта между рядом стоящими метками - условно-случайна, может быть микросекунды, может быть минуты, может вообще условно не быть.

Так же нам задан временной интервал (назовем duration)

Задача

Для последующей обработки нам необходимо, чтобы в вышеуказанных списках содержалась информация о событиях, которые попадают в duration относительно последнего пришедшего события. Все события старше - удаляются.

Мое решение

Берем pandas.Dataframe. Приходит событие - раскладываем словарь по столбцам датафрейма. Фильтруем по столбцу dt по маске dt > время последнего события - duration. Разбирем датафрейм на списки.

Написал для наглядности симулятор процесса:

import time
import datetime
import random
import pandas as pd

random.seed(123)
duration = 30
df = pd.DataFrame(columns=['Param_1', 'Param_2', 'Param_3', 'dt'])

for _ in range(10000):
    # получили словарь
    input_dict = {'Param_1': random.random(),
                  'Param_2': random.random(),
                  'Param_3': random.random(),
                  'dt': datetime.datetime.now()}

    # добавили в датафрейм
    df.loc[0 if df.empty else df.index[-1]+1] = input_dict.values()

    # отфильтровали
    df = df.loc[df['dt'].gt(df.dt.iloc[-1] - datetime.timedelta(seconds=duration))]

    # разобрали по спискам
    param_1 = df.Param_1.to_list()
    param_2 = df.Param_2.to_list()
    param_3 = df.Param_3.to_list()
    dt = df.dt.to_list()

    # это рандомная задержка, чтобы эмулировать случайную разницу между событиями
    time.sleep(random.random() * 3)

Вопрос

Есть варианты быстрее? Буду благодарен за любые комментарии по оптимизации кода.


Ответы (2 шт):

Автор решения: CrazyElf

Примечание: Очередь deque из ответа MBo работает в 10 раз быстрее чем Pandas в этой задаче, лучше используйте её. По сути это и есть кольцевой буфер, про который я в конце ответа пишу.


Я немного погонял ваш код (выключив time.sleep и уменьшив duration). Довольно много времени (которое сильно зависит от того, какая часть данных попадает в интервал) у вас съедает вот этот кусок кода:

    # разобрали по спискам
    param_1 = df.Param_1.to_list()
    param_2 = df.Param_2.to_list()
    param_3 = df.Param_3.to_list()
    dt = df.dt.to_list()

Так что сам то код достаточно быстрый, а вот преобразование столбцов в list съедает тучу времени. Подумайте, нужны ли вам вообще именно списки на выходе. Если брать из Pandas прямо numpy.array, которые там внутри лежат, то на это время вообще не тратится:

    # разобрали по спискам
    param_1 = df.Param_1.values
    param_2 = df.Param_2.values
    param_3 = df.Param_3.values
    dt = df.dt.values

Кстати, вот это преобразование просто лишнее:

df.loc[len(df)] = [val for val in input_dict.values()]

Можно прямо взять и присвоить:

df.loc[len(df)] = input_dict.values()

Из дальнейшего, я бы попробовал перейти от дат к timestamp, может быть будет быстрее. В остальном не думаю, что код такой уж сильно неоптимальный и особо нуждается в переработке.

Ещё можно попробовать использовать какую-то in memory базу данных. Но будет ли быстрее - не знаю.

Так то если совсем алгоритмически оптимально подходить, то, наверное, можно завести нужное число кольцевых буферов (по числу нужных списков) и гонять данные по ним, просто сдвигая указатели начала и конца. Но опять же получение из них списков не будет тривиальным. )

→ Ссылка
Автор решения: MBo

Библиотеки вроде Pandas наверняка будут быстрее, а алгоритмически для этого может применяться двунаправленная очередь collections.deque

При добавлении в конец удаляются (вы сами удаляете) из начала записи, пока время не станет больше lasttime-duration

from collections import deque
from time import time, sleep
from random import randint

duration = 1

d = deque()
for i in range(100):
    sleep(randint(1,10)/100)
    t = time()
    while d and t-d[0]>duration:
        d.popleft()
    d.append(t)
    print(len(d))
→ Ссылка