Оптимизация поиска непересечений в матрице битов
Есть таблица битов шириной в тысячу элементов и с кол-вом строк более 10 тысяч. Нужно сформировать строки в группы от одного до четырёх элементов, где каждая строка в такой группе не имеет бита 1 на одной позиции с другой строкой.
Ну т.е. если ширина была бы 3, а строк 5,
1 0 0
0 1 0
0 0 1
1 0 1
1 1 1
, то на выходе три группы
1)
1 0 0
0 1 0
0 0 1
1 0 1
1 1 1
Известно, что наиболее часто встречающийся случай, это когда нет непересекающихся строк, а значит групп будет столько сколько строк.
Задача именно найти непересекающиеся строки и сгруппировать их по четыре. Только если невозможно уложить непересекающуюся строку по причине переполнения подходящих групп, только тогда непересекающаяся строка может оказаться в группе из одной строки.
Пока решил только полным перебором https://dotnetfiddle.net/k6tBjL, где каждая строка сравнивается с каждой другой строкой (ну почти), что даёт неприятную производительность.
Есть ли способ снизить сложность алгоритма и ускорить это дело?
Ответы (4 шт):
Суть ответа в том, что даже полный перебор можно сделать по разному.
И волшебной палочкой для улучшения перебора служит BitArray (спасибо Alexander Petrov, за напоминание).
И так у нас есть N (10 000) строк и M (1000) столбцов.
Так что полный перебор это N*N*M
, но есть нюанс.
Решение 0. Приведённое ссылкой в вопросе.
Тут я использовал BitArray только для хранения строчек, но не использовал его операции. Я сравнивал каждую строчку с каждой строчкой и в каждом таком сравнении использовал for для сравнения ячеек строк.
Мне показалось, что for быстрее BitArray.And
, но это верно только в идеальном случае, когда совпадение единиц в ячейках лежит в начале строки.
Решение 1. Сравниваем строки черезBitArray.And
.
В общем случае получаем прирост производительности, потому что хоть тут и N*N*M
, но *M
- дешёвое, засчет BitArray.And
Решение 2. Добавляем эвристику.
Как можно улучшить этот N*N[*M]
?
Можно немного сократить первый множитель. Находим столбец с наибольшим числом единичек, сразу засовывем строки с единичкой в группы.
Далее бегаем только по оставшимся строкам, и сравниваем каждую со всеми что есть в группах.
Так мы получаем (N-X)*N[*M]
, и если X большое, то имеем ощутимый выигрыш.
Решение 3. А что если удешевить *N
вместо *M
?
Транспонируем таблицу, получаем BitArray для каждого столбца.
Алгоритм меняем полностью. Пробегаем по строкам, и все её столбцы имеющие единицу соединяем через BitArray.Or
. Далее пробегаем по этому результирующему BitArray и складываем в группу, те строки в которых он имеет 0 (пока группа не заполнится).
Это радикально ускоряет время выполнения. Я остановился на этом варианте.
Код с решениями https://dotnetfiddle.net/6PzuIr
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Globalization;
namespace ConsoleApp16;
internal class Program
{
static long _comparisons;
static void Main(string[] args)
{
Console.WriteLine("Start Data creation");
BitArray[] data = CreateData();
Console.WriteLine("Data created");
Console.WriteLine("Start looking for non-intersected");
var stopwatch = Stopwatch.StartNew();
var buckets = GetBuckets_ColumnMode(data);
Console.WriteLine($"Completed looking for non-intersected in {stopwatch.Elapsed}");
var checkCount = buckets.Sum(x => x.Items.Count());
Console.WriteLine($"Packed {checkCount} items");
var grouped = buckets
.GroupBy(x => x.Items.Count())
.Select(g => $"{g.Key}-item: {g.Count()} buckets");
foreach (var g in grouped)
{
Console.WriteLine(g);
}
var comparisons = _comparisons.ToString("N0", new CultureInfo("en-us"));
Console.WriteLine($"{comparisons} comparisons");
Console.ReadKey();
}
private static List<Bucket> GetBuckets(BitArray[] data)
{
List<Bucket> buckets = new List<Bucket>(data.Length);
foreach (var bitArray in data)
{
bool added = false;
foreach (var bucket in buckets)
{
if (bucket.IsFull) continue;
bool intersected = false;
foreach (var otherBitArray in bucket.Items)
{
if (AreIntersected(bitArray, otherBitArray))
{
intersected = true;
break;
}
}
if (!intersected)
{
bucket.Add(bitArray);
added = true;
break;
}
}
if (!added)
{
buckets.Add(new Bucket(bitArray));
}
}
return buckets;
}
private static bool AreIntersected(BitArray bitArray, BitArray otherBitArray)
{
_comparisons++;
var clone = new BitArray(bitArray);
clone = clone.And(otherBitArray);
return clone.HasAnySet();
//for loop is better in edge cases only, when intersections are placed in the very left columns
//
//for (int i = 0; i < bitArray.Length; i++)
//{
// if (bitArray[i] && otherBitArray[i])
// {
// return true;
// }
//}
//
//return false;
}
private static List<Bucket> GetBuckets_WithHeuristic(BitArray[] data)
{
HashSet<int> knownIntersected = GetMaxColumn(data);
List<Bucket> buckets = new List<Bucket>(data.Length);
buckets.AddRange(data
.Select((x, i) => (x, i))
.Where(y => knownIntersected.Contains(y.i))
.Select(y => new Bucket(y.x)));
for (int bitArrayIndex = 0; bitArrayIndex < data.Length; bitArrayIndex++)
{
if (knownIntersected.Contains(bitArrayIndex))
{
continue;
}
bool added = false;
foreach (var bucket in buckets)
{
if (bucket.IsFull) continue;
bool crossed = false;
foreach (var otherBitArray in bucket.Items)
{
if (AreIntersected(data[bitArrayIndex], otherBitArray))
{
crossed = true;
break;
}
}
if (!crossed)
{
bucket.Add(data[bitArrayIndex]);
added = true;
break;
}
}
if (!added)
{
buckets.Add(new Bucket(data[bitArrayIndex]));
}
}
return buckets;
}
private static HashSet<int> GetMaxColumn(BitArray[] data)
{
HashSet<int>[] columns = new HashSet<int>[data[0].Length];
for (int col = 0; col < data[0].Length; col++)
{
columns[col] = new HashSet<int>(data.Length);
for (int row = 0; row < data.Length; row++)
{
if (data[row][col])
{
columns[col].Add(row);
}
}
}
return columns.MaxBy(x => x.Count)!;
}
private static List<Bucket> GetBuckets_ColumnMode(BitArray[] data)
{
Stopwatch sw = Stopwatch.StartNew();
var columns = Transform(data);
Console.WriteLine($"Transformed in {sw.Elapsed}");
var used = new BitArray(data.Length);
var buckets = new List<Bucket>(data.Length);
for (int itemIndex = 0; itemIndex < data.Length; itemIndex++)
{
if (used[itemIndex])
{
continue;
}
var item = data[itemIndex];
var mask = new BitArray(data.Length);
for (int bitIndex = 0; bitIndex < data[0].Length; bitIndex++)
{
if (item[bitIndex])
{
mask = mask.Or(columns[bitIndex]);
_comparisons++;
}
if (mask.HasAllSet())
{
break;
}
}
used[itemIndex] = true;
var bucket = new Bucket(item);
buckets.Add(bucket);
if (!mask.HasAllSet())
{
for (int otherItemIndex = 0; otherItemIndex < data.Length && !bucket.IsFull; otherItemIndex++)
{
if (otherItemIndex == itemIndex || used[otherItemIndex] || mask[otherItemIndex])
{
continue;
}
bucket.Add(data[otherItemIndex]);
used[otherItemIndex] = true;
}
}
}
return buckets;
}
private static BitArray[] Transform(BitArray[] data)
{
var result = new BitArray[data[0].Length];
for (int row = 0; row < data.Length; row++)
{
for (int col = 0; col < data[0].Length; col++)
{
result[col] ??= new BitArray(data.Length);
result[col][row] = data[row][col];
}
}
return result;
}
private static BitArray[] CreateData()
{
const int itemCount = 10_000;
const int bitCount = 1_000;
var result = new BitArray[itemCount];
for (int i = 0; i < itemCount; i++)
{
var bitArray = new BitArray(bitCount);
for (int j = 0; j < bitCount; j++)
{
bitArray.Set(j, Random.Shared.Next(0, 2) == 1); // random
// bitArray.Set(j, false); // all false, uncomment to have all items intersected
}
result[i] = bitArray;
}
return result;
}
}
internal class Bucket
{
public bool IsFull => Items.Length == 4;
public BitArray[] Items { get; private set; }
public Bucket(BitArray firstItem)
{
Items = [firstItem];
}
public void Add(BitArray nextItem)
{
if (Items.Length == 4)
{
throw new InvalidOperationException("Bucket is full");
}
Items = [.. Items, nextItem];
}
}
Когда вы проверяете текущую группу на наличие пересечений с новым элементом, нет никакой необходимости сравнивать новый элемент со всеми членами группы. Вместо этого группа может хранить побитовое ИЛИ всех её членов, и тогда можно будет сравнивать новый элемент с побитовым ИЛИ. Это уменьшит количество сравнений.
Однако в конкретно вашем примере со случайной генерацией битовых последовательностей пересечения встречаются во всех элементах, поэтому количество групп всегда равно количеству элементов, с одним элементом в каждой группе. Если всё-таки тестировать алгоритм на ваших реальных данных, в которых действительно встречаются непересекающиеся битовые последовательности, то после добавления каждого нового члена в группу количество последующих сравнений уменьшится.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Globalization;
namespace ConsoleApp16;
internal class Program
{
static long _comparisons;
static void Main(string[] args)
{
Console.WriteLine("Start Data creation");
BitArray[] data = CreateData();
Console.WriteLine("Data created");
Console.WriteLine("Start looking for non-intersected");
var stopwatch = Stopwatch.StartNew();
var buckets = GetBuckets(data);
Console.WriteLine($"Completed looking for non-intersected in {stopwatch.Elapsed}");
var checkCount = buckets.Sum(x => x.Items.Count());
Console.WriteLine($"Packed {checkCount} items");
var grouped = buckets
.GroupBy(x => x.Items.Count())
.Select(g => $"{g.Key}-item: {g.Count()} buckets");
foreach (var g in grouped)
{
Console.WriteLine(g);
}
var comparisons = _comparisons.ToString("N0", new CultureInfo("en-us"));
Console.WriteLine($"{comparisons} comparisons");
Console.ReadKey();
}
private static List<Bucket> GetBuckets(BitArray[] data)
{
List<Bucket> buckets = new List<Bucket>(data.Length);
foreach (var bitArray in data)
{
bool added = false;
foreach (var bucket in buckets)
{
if (bucket.IsFull) continue;
var copy = new BitArray(bucket.or_overall);
bool intersected = copy.And(bitArray).HasAnySet();
if (!intersected)
{
bucket.Add(bitArray);
added = true;
break;
}
}
if (!added)
{
buckets.Add(new Bucket(bitArray));
}
}
return buckets;
}
private static BitArray[] CreateData()
{
const int itemCount = 10_000;
const int bitCount = 1_000;
var result = new BitArray[itemCount];
for (int i = 0; i < itemCount; i++)
{
var bitArray = new BitArray(bitCount);
for (int j = 0; j < bitCount; j++)
{
bitArray.Set(j, Random.Shared.Next(0, 2) == 1); // random
// bitArray.Set(j, false); // all false, uncomment to have all items intersected
}
result[i] = bitArray;
}
return result;
}
}
internal class Bucket
{
public bool IsFull => Items.Length == 4;
public BitArray[] Items { get; private set; }
public BitArray or_overall;
public Bucket(BitArray firstItem)
{
Items = [firstItem];
or_overall = new BitArray(firstItem);
}
public void Add(BitArray nextItem)
{
if (Items.Length == 4)
{
throw new InvalidOperationException("Bucket is full");
}
or_overall.Or(nextItem);
Items = [.. Items, nextItem];
}
}
Пройдите матрицу и просуммируйте биты 1 для каждой строки.
!!! Сравнивать на пересечение строки, для которых сумма битов 1 больше количества битов в строке, не имеет смысла
if (Sum[i] + Sum[j] > bitCount) continue;
В принципе можно отсортировать по Sum[i] и сравнивать только со строками, для которых Sum[j] <= bitCount - Sum[i]
.
Как вариант,
- заводим
map
, где ключ - сумма битов 1, значение - это структура, которая хранит номера непересекающихся строк и их объединение. - на каждой итерации берём запись с максимальным ключом и пытаемся срастить его с другими, пока это возможно или пока не достигнуто предельное количество сращиваний (4).
- получившееся значение убираем из map'ы и сохраняем в вектор результатов.
- повторяем п. 2 пока map'а не пустая
По мне BitArray для задачи не очень подходит. Он упаковывает по 8 элементов в 1 байт. Кодирование повторяется с каждым новым элементом при каждой записи и, соответственно, декодирование при каждом чтении. Это не может не сказываться на производительности алгоритма.
В самом простейшем случае для хранения 1000 бит можно использовать массив из 32 элементов uint
или 16 элементов ulong
. То есть в одном элементе uint
мы помещаем 32 бита данных. Сравнить их на не пересечение очень просто:
Также последовательно перебираем элементы. Проверку можно проводить в любом направлении пока не будет найдено пересечение. Два числа, если их биты не пересекаются, в операции a & b
вернут 0. Если a & b
больше 0, то проверку последовательности можно завершить.
if ((a[i] & b[i]) != 0) return true;
По однопоточным тестам uint
оказался быстрее где-то на 8-10%, чем ulong
.
На рандомных данных это лучший результат в однопотоке.
Но на органических данных потенциально возможно увеличение производительности за счет использования для групп суммарно-общее значение через логическое ИЛИ. То есть сравнивать строку сначала с масками групп, а потом с оставшимися строками.
Также потенциально может помочь начальная сортировка строк по весу Хэмминга. Какие-то очень добрые люди удалили страницу из русской википедии.
Alexander Petrov в комментарии указал ссылку на документацию.
Также можно подключить параллельную обработку данных с помощью Parallel.ForEach
. Тогда для хранения Bucket
следует использовать ConcurrentBag
. Но по тестам на dotnetfiddle это, очевидно, не даёт никакого прироста к скорости выполнения. По всей видимости, выполнение идёт также в один процесс, но с многопоточным оверхедом.
Так что самый производительный вариант беглых тестов https://dotnetfiddle.net/YuZD30
Скопирую код также сюда.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
namespace ConsoleApp16
{
internal class Program
{
static long _comparisons;
static void Main(string[] args)
{
Console.WriteLine("Start Data creation");
uint[][] data = CreateData();
Console.WriteLine("Data created");
Console.WriteLine("Start looking for non-intersected");
var stopwatch = Stopwatch.StartNew();
var buckets = GetBuckets(data);
Console.WriteLine($"Completed looking for non-intersected in {stopwatch.Elapsed}");
var checkCount = buckets.Sum(x => x.Items.Count());
Console.WriteLine($"Packed {checkCount} items");
var grouped = buckets
.GroupBy(x => x.Items.Count())
.Select(g => $"{g.Key}-item: {g.Count()} buckets");
foreach (var g in grouped)
{
Console.WriteLine(g);
}
var comparisons = _comparisons.ToString("N0", new CultureInfo("en-us"));
Console.WriteLine($"{comparisons} comparisons");
}
private static List<Bucket> GetBuckets(uint[][] data)
{
List<Bucket> buckets = new List<Bucket>(data.Length);
foreach (var item in data)
{
bool added = false;
foreach (var bucket in buckets)
{
if (bucket.IsFull) continue;
bool intersected = false;
foreach (var other in bucket.Items)
{
if (AreIntersected(item, other))
{
intersected = true;
break;
}
}
if (!intersected)
{
bucket.Add(item);
added = true;
break;
}
}
if (!added)
{
buckets.Add(new Bucket(item));
}
}
return buckets;
}
private static bool AreIntersected(uint[] bitArray, uint[] otherBitArray)
{
_comparisons++;
for (int i = 0; i < bitArray.Length; i++)
{
if ((bitArray[i] & otherBitArray[i]) != 0)
{
return true;
}
}
return false;
}
private static uint[][] CreateData()
{
const int itemCount = 10_000;
const int bitCount = 1_000;
const int uintCount = (bitCount + 31) / 32; // 32 uints for 1000 bits
var result = new uint[itemCount][];
Random random = new Random();
for (int i = 0; i < itemCount; i++)
{
var uintArray = new uint[uintCount];
for (int j = 0; j < uintCount; j++)
{
uintArray[j] = (uint)random.Next(0, int.MaxValue);
}
result[i] = uintArray;
}
return result;
}
}
internal class Bucket
{
public bool IsFull => Items.Length == 4;
public uint[][] Items { get; private set; }
public Bucket(uint[] firstItem)
{
Items = new[] { firstItem };
}
public void Add(uint[] nextItem)
{
if (Items.Length == 4)
{
throw new InvalidOperationException("Bucket is full");
}
var newItems = new uint[Items.Length + 1][];
Items.CopyTo(newItems, 0);
newItems[^1] = nextItem;
Items = newItems;
}
}
}