Помощь в оптимизации участка кода
У меня стоит задача сформировать звуковой сигнал (меандр) из 8-ми гармоник, каждая из которой может иметь или не иметь сдвиг по фазе (на пи).
#data - массив с байтами, в формате ['01010101', '11110000']
ADS = len(data) * SPP #Вычисляю размер массива под сигнал
signal = []
for i in range(ADS):
signal.append(0)
# Формируем по одному периоду сигнала на каждый байт
for b in range(len(data)):
d = 0
for N in range(1, 17, 2):
for i in range(SPP):
index = i + (b * SPP)
signal[index] += harm(N, FREQ, i, SPS, int(data[b][d]))
#s.next()
d += 1
# Функция для вычисления отдельной гармоники
def harm(N, freq, cs, sps, ph):
arg = freq*N*2*np.pi*(cs/sps)
if ph:
arg += np.pi
return AMPLITUDE*(4/np.pi)*np.sin(arg)/N
Хотел бы узнать, могу ли я как-то оптимизировать данный участок кода. Я думал использовать map, но не придумал, как заменить i (номер текущего отсчёта). Заранее спасибо!
Ответы (2 шт):
Попробую переписать код, чтобы сделать некоторые оптимизации по итогам обсуждения. Хотя самое лучшее было бы написать "векторную" версию кода на Numpy, но это надо думать. Попробую оптимизировать по мелочи, ну и просто чуть попроще код написать. В чём суть изменений:
- кэшируем вызов функции
harm - меняем долгую инициализацию списка через
appendна быструю - выносим получение
data[b]из внутренних циклов, теперь этоitem - вычисление
int()для последнего аргумента функцииharmнаоборот переносим внутрь кэшированной функции - перебор элементов коллекций и их индексов делаем через enumerate - это просто удобнее
from functools import lru_cache
# Функция для вычисления отдельной гармоники
@lru_cache(maxsize=SPP*16)
def harm(N, freq, cs, sps, ph):
arg = freq*N*2*np.pi*(cs/sps)
if int(ph): # <-- перенёс преобразование в int сюда
arg += np.pi
return AMPLITUDE*(4/np.pi)*np.sin(arg)/N
#data - массив с байтами, в формате ['01010101', '11110000']
ADS = len(data) * SPP #Вычисляю размер массива под сигнал
signal = [0] * ADS # <-- Такая инициализация списка должна быть сильно быстрее
# Формируем по одному периоду сигнала на каждый байт
for b, item in enumerate(data):
for d, N in enumerate(range(1, 17, 2)):
for i in range(SPP):
index = i + (b * SPP)
signal[index] += harm(N, FREQ, i, SPS, item[d]) # <-- изменён последний аргумент!
Какие ещё можно попробовать сделать дальнейшие оптимизации:
- заменить функции от
Numpyкоторыеnp.на функции изmath, потому что для скаляров скорее всегоNumpyбудет даже медленнее;Numpyбыстрый только для массивов/векторов/матриц. - перебор элементов в
itemтоже вынести выше из внутреннего цикла, но это уже мелочи, наверное - ну и, наконец, вместо
int(ph)можно сделать словарик, это тоже будет быстрее:
str2int = {
'0': 0,
'1': 1
}
if str2int[ph]:
...
Значение signal[index] может быть вычислено как табличная функция:
def harm(N, cs, ph):
arg = FREQ*N*2*np.pi*(cs/SPS)
if ph:
arg += np.pi
return AMPLITUDE*(4/np.pi)*np.sin(arg)/N
@functools.cache
def harm_sum(i, byte):
return sum(
harm(N, i, int(bit))
for bit, N in zip(byte, range(1, 17, 2))
)
for j, byte in enumerate(data):
for i in range(SPP):
index = i + (j * SPP)
signal[index] = harm_sum(i, byte)
Функция harm_sum вызывается для SPP * 256 вариантов аргументов. Если принять SPP = 60 то получим таблицу из 15360 значений. Внутренности harm_sum не оптимизировались.
Тесты делались для констант
SPP = 60
FREQ = 300
SPS = 48000
AMPLITUDE = 2.5
Времена работы:
время работы число байт оригинальный harm_sum код 10 0.010 0.012 100 0.152 0.098 1 000 0.930 0.263 10 000 9.283 0.401 100 000 95.210 1.691 1 000 000 - 15.252 10 000 000 - 141.268
Если требуется ускорить код ещё, следует отказаться от байтов в виде строк, значения harm_sum сохранить в прямоугольной таблице SPP*256 адресуемой значением байта.
P.S. Странно что функция выродилась в таблицу. Возможно я сделал ошибку, хотя тесты показывают что старый и новый код считают одинаково.