Оптимизация подсчёта несовпадающих элементов двух массивов
Есть программа на Си, которая подсчитывает количество несовпадающих элементов двух массивов:
#include <stddef.h>
size_t arr_dist(size_t n, const char a[n], const char b[n]) {
size_t result = 0;
for (size_t i = 0; i < n; i++)
result += a[i] != b[i];
return result;
}
Каким образом можно оптимизировать скорость выполнения данной программы?
Ответы (2 шт):
Решил я в перерывах между тревогами попробовать оптимизировать этот код. Берем код
size_t arr_dist(size_t n, const char a[n], const char b[n]) {
size_t result = 0;
for (size_t i = 0; i < n; i++)
result += a[i] != b[i];
return result;
}
как основу. Так как в коде я виду конструкцию const char a[n], то я предполагаю, что используется gcc. Дальнейшие замеры и тесты будут на gcc/clang.
Самое первое, что я сделал, это попробовал подключить openmp.
size_t arr_dist_mp(size_t n, const char a[n], const char b[n]) {
size_t result = 0;
#pragma omp parallel for reduction(+:result)
for (size_t i = 0; i < n; i++)
result += a[i] != b[i];
return result;
}
Но толи я не умею готовить openmp (для gcc нужно добавить параметр -fopenmp), но у меня получилось по скорости где то в раза два медленнее. Почитав детальнее, я понял, что видимо внутренний цикл у меня слишком маленький и по рекомендациям выделил внутренний цикл
size_t arr_dist_mp1(size_t n, const char a[n], const char b[n]) {
#define SIZE 1000000
size_t result = 0;
#pragma omp parallel for reduction(+:result)
for (size_t i = 0; i < n; i+=SIZE)
for (size_t j = 0; j < SIZE; j++) {
result += a[i+j] != b[i+j];
}
return result;
}
Но ничего толком это не поменяло. Ок, сказал я себе. Применим тяжелую артелерию - sse. Я использовал вторую версию, потому что с ней код будет работать практически на любом x86_64. Получилось вот так
size_t arr_dist_sse(size_t n, const char a[n], const char b[n]) {
size_t result = 0;
for (size_t i = 0; i < n; i+=16) {
__m128i x1 = _mm_loadu_si128((__m128i*)(a+i));
__m128i x2 = _mm_loadu_si128((__m128i*)(b+i));
__m128i r = _mm_cmpeq_epi8(x1, x2);
result += __builtin_popcount(_mm_movemask_epi8(r));
}
return n - result;
}
Ого, сказал я и давайте посмотрим на первые результаты
| name | result | time, s |
|---|---|---|
| base | 99609989 | 0.28245 |
| openmp | 99609989 | 0.55218 |
| openmp2 | 99609989 | 0.59675 |
| sse | 99609989 | 0.06872 |
(комментарии - я тестировал на 100кк элементов, которые заполнялись случайным порядком, поэтому, от теста к тесту средняя колонка может будет меняться, но разные функции должны выдавать одинаковые результаты в пределах теста. Тестовая машинка - старый сервачек, где находится процессор Intel(R) Core(TM) i5-4300U CPU @ 1.90GHz).
в 4 раза быстрее на ровном месте. Круто же. Но потом я вспомнил, что я не указывал никаких опций для оптимизации. Исправимся. gcc -O3 main.c -fopenmp
| name | result | time, s |
|---|---|---|
| base | 99609989 | 0.03494 |
| openmp | 99609989 | 0.06401 |
| openmp2 | 99609989 | 0.07796 |
| sse | 99609989 | 0.03862 |
Видно странность? базовый вариант и sse сравнялись. Вот и gcc.
ок, а давайте посмотрим на clang. (если будете тестить, то для убунты нужно доустановить libomp-dev, а gcc умеет "из коробки").
clang, без оптимизаций
| name | result | time, s |
|---|---|---|
| base | 99609989 | 0.28044 |
| openmp | 99609989 | 0.72413 |
| openmp2 | 99609989 | 0.67469 |
| sse | 99609989 | 0.37775 |
sse вариант даже отстает. что то пошло не так. явно не так. Включим чуточку оптимизаций (clang -O3 -fopenmp main.c)
| name | result | time, s |
|---|---|---|
| base | 99609989 | 0.09226 |
| openmp | 99609989 | 0.16055 |
| openmp2 | 99609989 | 0.15885 |
| sse | 99609989 | 0.13449 |
Похоже, по сравнению с gcc, clang не умеет таких оптимизаций (в 3 раза проигрывает). А с sse совсем никак. Но может готовить не умеем? добавим немного clang -fopenmp main.c -O3 -march=native и
| name | result | time, s |
|---|---|---|
| base | 99609989 | 0.02328 |
| openmp | 99609989 | 0.16114 |
| openmp2 | 99609989 | 0.16035 |
| sse | 99609989 | 0.05990 |
обычная версия выиграла. да ещё как. А попробуем для gcc такое добавить и запустить (gcc -fopenmp main.c -O3 -march=native)
| name | result | time, s |
|---|---|---|
| base | 99609989 | 0.02889 |
| openmp | 99609989 | 0.05447 |
| openmp2 | 99609989 | 0.04901 |
| sse | 99609989 | 0.02540 |
и тут gcc показал себя ещё как.
Вывод. Опции компиляции могут с наскока дать больше прирост (или соизмеримый) с ручной оптимизацией. Можно ли ещё ускорить? возможно, но даже при 0.025 сек на 100кк элементов получается 8 гигабайт в секунду (у нас массива два), а это 64гигабита в секунду. Возможно.
Для тех, кто решит побаловаться и потестить у себя (или меня покритиковать:) ), вот код. У него вывод странный, но это что бы для SO сразу табличку красивую, что бы просто копипастить и не форматировать вручную.
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <immintrin.h>
#include <emmintrin.h>
char* gen(int n) {
char* r = malloc(n);
for (int i = 0; i < n; i++) {
r[i] = rand() % 256;
}
return r;
}
size_t arr_dist(size_t n, const char a[n], const char b[n]) {
size_t result = 0;
for (size_t i = 0; i < n; i++)
result += a[i] != b[i];
return result;
}
size_t arr_dist_mp(size_t n, const char a[n], const char b[n]) {
size_t result = 0;
#pragma omp parallel for reduction(+:result)
for (size_t i = 0; i < n; i++)
result += a[i] != b[i];
return result;
}
size_t arr_dist_mp1(size_t n, const char a[n], const char b[n]) {
#define SIZE 1000000
size_t result = 0;
#pragma omp parallel for reduction(+:result)
for (size_t i = 0; i < n; i+=SIZE)
for (size_t j = 0; j < SIZE; j++) {
result += a[i+j] != b[i+j];
}
return result;
}
size_t arr_dist_sse(size_t n, const char a[n], const char b[n]) {
size_t result = 0;
for (size_t i = 0; i < n; i+=16) {
__m128i x1 = _mm_loadu_si128((__m128i*)(a+i));
__m128i x2 = _mm_loadu_si128((__m128i*)(b+i));
__m128i r = _mm_cmpeq_epi8(x1, x2);
result += __builtin_popcount(_mm_movemask_epi8(r));
}
return n - result;
}
void test(int n, const char* name, const char* a, const char* b, size_t(*func)(size_t, const char*, const char*)) {
clock_t start = clock();
size_t r = func(n, a, b);
clock_t stop = clock();
printf("| %10s | %10zu | %.5f |\n", name, r, (stop-start) * 1.f / CLOCKS_PER_SEC);
}
int main() {
#define TEST_SIZE 100000000
char* a = gen(TEST_SIZE);
char* b = gen(TEST_SIZE);
puts("| name | result | time, s |");
puts("|------------|------------|---------|");
test(TEST_SIZE, "base", a, b, arr_dist);
test(TEST_SIZE, "openmp", a, b, arr_dist_mp);
test(TEST_SIZE, "openmp2", a, b, arr_dist_mp1);
test(TEST_SIZE, "sse" , a, b, arr_dist_sse);
free(b);
free(a);
}
// SSE2 intel.com/content/www/us/en/docs/intrinsics-guide/index.html
size_t arr_dist_best (size_t n, const char a [n], const char b [n]) {
size_t ub = n - (n % 16) ;
__m128i sums = _mm_setzero_si128 () ;
__m128i ones = _mm_set1_epi8 (1) ;
for (size_t i = 0; i < ub ; i += 16) {
__m128i as = _mm_loadu_si128 ((const __m128i_u *) & a [i]) ;
__m128i bs = _mm_loadu_si128 ((const __m128i_u *) & b [i]) ;
__m128i cs = _mm_cmpeq_epi8 (as, bs) ;
cs = _mm_andnot_si128 (cs, ones) ; // eq/ff -> 00; noteq /00 -> 01
cs = _mm_sad_epu8 (cs, _mm_setzero_si128 ()) ;
sums = _mm_add_epi64 (sums, cs) ;
}
size_t res = sums [0] + sums [1];
for ( size_t i = ub ; i < n ; ++ i )
res += a [ i ] != b [ i ];
return res ;
}