Максимально быстрая фильтрация динамических данных по диапазону временного ряда
Примечание
В этой задаче можно использовать любые типы данных и модули, в том числе и сторонних разработчиков (numpy, pandas, sorted containers
и т.д.). Главный критерий - скорость выполнения.
Дано:
На вход нашей логики последовательно поступают одинаковые по структуре словари, содержащие информацию (параметры) некоего события. В этих словарях несколько значений являются типами float
(назовем ключи param_1, param_2...
) и одно из значений является временной меткой этого события (назовем ключ dt
). Мы разбираем этот словарь по отдельным спискам. Каждый параметр - в отдельный список, временной ряд - в отдельный список. Таким образом под одним и тем же индексом в разных списках содержится информация об одном событии. О временных метках нам известно, что они неубывающие, то есть последующая метка может быть либо равна либо больше предыдущей. Таймдельта между рядом стоящими метками - условно-случайна, может быть микросекунды, может быть минуты, может вообще условно не быть.
Так же нам задан временной интервал (назовем duration
)
Задача
Для последующей обработки нам необходимо, чтобы в вышеуказанных списках содержалась информация о событиях, которые попадают в duration
относительно последнего пришедшего события. Все события старше - удаляются.
Мое решение
Берем pandas.Dataframe
. Приходит событие - раскладываем словарь по столбцам датафрейма. Фильтруем по столбцу dt
по маске dt > время последнего события - duration
. Разбирем датафрейм на списки.
Написал для наглядности симулятор процесса:
import time
import datetime
import random
import pandas as pd
random.seed(123)
duration = 30
df = pd.DataFrame(columns=['Param_1', 'Param_2', 'Param_3', 'dt'])
for _ in range(10000):
# получили словарь
input_dict = {'Param_1': random.random(),
'Param_2': random.random(),
'Param_3': random.random(),
'dt': datetime.datetime.now()}
# добавили в датафрейм
df.loc[0 if df.empty else df.index[-1]+1] = input_dict.values()
# отфильтровали
df = df.loc[df['dt'].gt(df.dt.iloc[-1] - datetime.timedelta(seconds=duration))]
# разобрали по спискам
param_1 = df.Param_1.to_list()
param_2 = df.Param_2.to_list()
param_3 = df.Param_3.to_list()
dt = df.dt.to_list()
# это рандомная задержка, чтобы эмулировать случайную разницу между событиями
time.sleep(random.random() * 3)
Вопрос
Есть варианты быстрее? Буду благодарен за любые комментарии по оптимизации кода.
Ответы (2 шт):
Примечание: Очередь deque
из ответа MBo работает в 10 раз быстрее чем Pandas
в этой задаче, лучше используйте её. По сути это и есть кольцевой буфер, про который я в конце ответа пишу.
Я немного погонял ваш код (выключив time.sleep
и уменьшив duration
). Довольно много времени (которое сильно зависит от того, какая часть данных попадает в интервал) у вас съедает вот этот кусок кода:
# разобрали по спискам
param_1 = df.Param_1.to_list()
param_2 = df.Param_2.to_list()
param_3 = df.Param_3.to_list()
dt = df.dt.to_list()
Так что сам то код достаточно быстрый, а вот преобразование столбцов в list
съедает тучу времени. Подумайте, нужны ли вам вообще именно списки на выходе. Если брать из Pandas
прямо numpy.array
, которые там внутри лежат, то на это время вообще не тратится:
# разобрали по спискам
param_1 = df.Param_1.values
param_2 = df.Param_2.values
param_3 = df.Param_3.values
dt = df.dt.values
Кстати, вот это преобразование просто лишнее:
df.loc[len(df)] = [val for val in input_dict.values()]
Можно прямо взять и присвоить:
df.loc[len(df)] = input_dict.values()
Из дальнейшего, я бы попробовал перейти от дат к timestamp
, может быть будет быстрее. В остальном не думаю, что код такой уж сильно неоптимальный и особо нуждается в переработке.
Ещё можно попробовать использовать какую-то in memory базу данных. Но будет ли быстрее - не знаю.
Так то если совсем алгоритмически оптимально подходить, то, наверное, можно завести нужное число кольцевых буферов (по числу нужных списков) и гонять данные по ним, просто сдвигая указатели начала и конца. Но опять же получение из них списков не будет тривиальным. )
Библиотеки вроде Pandas наверняка будут быстрее, а алгоритмически для этого может применяться двунаправленная очередь collections.deque
При добавлении в конец удаляются (вы сами удаляете) из начала записи, пока время не станет больше lasttime-duration
from collections import deque
from time import time, sleep
from random import randint
duration = 1
d = deque()
for i in range(100):
sleep(randint(1,10)/100)
t = time()
while d and t-d[0]>duration:
d.popleft()
d.append(t)
print(len(d))