Оптимизация поиска непересечений в матрице битов

Есть таблица битов шириной в тысячу элементов и с кол-вом строк более 10 тысяч. Нужно сформировать строки в группы от одного до четырёх элементов, где каждая строка в такой группе не имеет бита 1 на одной позиции с другой строкой.

Ну т.е. если ширина была бы 3, а строк 5,

1 0 0
0 1 0
0 0 1
1 0 1
1 1 1

, то на выходе три группы

1)

1 0 0
0 1 0
0 0 1
1 0 1
1 1 1

Известно, что наиболее часто встречающийся случай, это когда нет непересекающихся строк, а значит групп будет столько сколько строк.

Задача именно найти непересекающиеся строки и сгруппировать их по четыре. Только если невозможно уложить непересекающуюся строку по причине переполнения подходящих групп, только тогда непересекающаяся строка может оказаться в группе из одной строки.

Пока решил только полным перебором https://dotnetfiddle.net/k6tBjL, где каждая строка сравнивается с каждой другой строкой (ну почти), что даёт неприятную производительность.

Есть ли способ снизить сложность алгоритма и ускорить это дело?


Ответы (4 шт):

Автор решения: 4per

Суть ответа в том, что даже полный перебор можно сделать по разному.

И волшебной палочкой для улучшения перебора служит BitArray (спасибо Alexander Petrov, за напоминание).

И так у нас есть N (10 000) строк и M (1000) столбцов.

Так что полный перебор это N*N*M, но есть нюанс.

Решение 0. Приведённое ссылкой в вопросе.

Тут я использовал BitArray только для хранения строчек, но не использовал его операции. Я сравнивал каждую строчку с каждой строчкой и в каждом таком сравнении использовал for для сравнения ячеек строк.

Мне показалось, что for быстрее BitArray.And, но это верно только в идеальном случае, когда совпадение единиц в ячейках лежит в начале строки.

Решение 1. Сравниваем строки черезBitArray.And.

В общем случае получаем прирост производительности, потому что хоть тут и N*N*M, но *M - дешёвое, засчет BitArray.And

Решение 2. Добавляем эвристику.

Как можно улучшить этот N*N[*M] ?

Можно немного сократить первый множитель. Находим столбец с наибольшим числом единичек, сразу засовывем строки с единичкой в группы.

Далее бегаем только по оставшимся строкам, и сравниваем каждую со всеми что есть в группах.

Так мы получаем (N-X)*N[*M], и если X большое, то имеем ощутимый выигрыш.

Решение 3. А что если удешевить *N вместо *M ?

Транспонируем таблицу, получаем BitArray для каждого столбца. Алгоритм меняем полностью. Пробегаем по строкам, и все её столбцы имеющие единицу соединяем через BitArray.Or. Далее пробегаем по этому результирующему BitArray и складываем в группу, те строки в которых он имеет 0 (пока группа не заполнится).

Это радикально ускоряет время выполнения. Я остановился на этом варианте.

Код с решениями https://dotnetfiddle.net/6PzuIr

using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Globalization;

namespace ConsoleApp16;

internal class Program
{
    static long _comparisons;

    static void Main(string[] args)
    {
        Console.WriteLine("Start Data creation");
        BitArray[] data = CreateData();
        Console.WriteLine("Data created");

        Console.WriteLine("Start looking for non-intersected");
        var stopwatch = Stopwatch.StartNew();
        var buckets = GetBuckets_ColumnMode(data);
        Console.WriteLine($"Completed looking for non-intersected in {stopwatch.Elapsed}");

        var checkCount = buckets.Sum(x => x.Items.Count());
        Console.WriteLine($"Packed {checkCount} items");

        var grouped = buckets
            .GroupBy(x => x.Items.Count())
            .Select(g => $"{g.Key}-item: {g.Count()} buckets");

        foreach (var g in grouped)
        {
            Console.WriteLine(g);
        }

        var comparisons = _comparisons.ToString("N0", new CultureInfo("en-us"));
        Console.WriteLine($"{comparisons} comparisons");

        Console.ReadKey();
    }

    private static List<Bucket> GetBuckets(BitArray[] data)
    {
        List<Bucket> buckets = new List<Bucket>(data.Length);

        foreach (var bitArray in data)
        {
            bool added = false;
            foreach (var bucket in buckets)
            {
                if (bucket.IsFull) continue;

                bool intersected = false;
                foreach (var otherBitArray in bucket.Items)
                {
                    if (AreIntersected(bitArray, otherBitArray))
                    {
                        intersected = true;
                        break;
                    }
                }

                if (!intersected)
                {
                    bucket.Add(bitArray);
                    added = true;
                    break;
                }
            }

            if (!added)
            {
                buckets.Add(new Bucket(bitArray));
            }
        }

        return buckets;
    }

    private static bool AreIntersected(BitArray bitArray, BitArray otherBitArray)
    {
        _comparisons++;

        var clone = new BitArray(bitArray);
        clone = clone.And(otherBitArray);
        return clone.HasAnySet();

        //for loop is better in edge cases only, when intersections are placed in the very left columns
        //
        //for (int i = 0; i < bitArray.Length; i++)
        //{
        //    if (bitArray[i] && otherBitArray[i])
        //    {
        //        return true;
        //    }
        //}
        //
        //return false;
    }

    private static List<Bucket> GetBuckets_WithHeuristic(BitArray[] data)
    {
        HashSet<int> knownIntersected = GetMaxColumn(data);

        List<Bucket> buckets = new List<Bucket>(data.Length);

        buckets.AddRange(data
            .Select((x, i) => (x, i))
            .Where(y => knownIntersected.Contains(y.i))
            .Select(y => new Bucket(y.x)));

        for (int bitArrayIndex = 0; bitArrayIndex < data.Length; bitArrayIndex++)
        {
            if (knownIntersected.Contains(bitArrayIndex))
            {
                continue;
            }

            bool added = false;
            foreach (var bucket in buckets)
            {
                if (bucket.IsFull) continue;

                bool crossed = false;
                foreach (var otherBitArray in bucket.Items)
                {
                    if (AreIntersected(data[bitArrayIndex], otherBitArray))
                    {
                        crossed = true;
                        break;
                    }
                }

                if (!crossed)
                {
                    bucket.Add(data[bitArrayIndex]);
                    added = true;
                    break;
                }
            }

            if (!added)
            {
                buckets.Add(new Bucket(data[bitArrayIndex]));
            }
        }

        return buckets;
    }

    private static HashSet<int> GetMaxColumn(BitArray[] data)
    {
        HashSet<int>[] columns = new HashSet<int>[data[0].Length];
        for (int col = 0; col < data[0].Length; col++)
        {
            columns[col] = new HashSet<int>(data.Length);
            for (int row = 0; row < data.Length; row++)
            {
                if (data[row][col])
                {
                    columns[col].Add(row);
                }
            }
        }

        return columns.MaxBy(x => x.Count)!;
    }

    private static List<Bucket> GetBuckets_ColumnMode(BitArray[] data)
    {
        Stopwatch sw = Stopwatch.StartNew();
        var columns = Transform(data);
        Console.WriteLine($"Transformed in {sw.Elapsed}");

        var used = new BitArray(data.Length);
        var buckets = new List<Bucket>(data.Length);
        for (int itemIndex = 0; itemIndex < data.Length; itemIndex++)
        {
            if (used[itemIndex])
            {
                continue;
            }
            var item = data[itemIndex];

            var mask = new BitArray(data.Length);
            for (int bitIndex = 0; bitIndex < data[0].Length; bitIndex++)
            {
                if (item[bitIndex])
                {
                    mask = mask.Or(columns[bitIndex]);
                    _comparisons++;
                }
                if (mask.HasAllSet())
                {
                    break;
                }
            }

            used[itemIndex] = true;
            var bucket = new Bucket(item);
            buckets.Add(bucket);

            if (!mask.HasAllSet())
            {
                for (int otherItemIndex = 0; otherItemIndex < data.Length && !bucket.IsFull; otherItemIndex++)
                {
                    if (otherItemIndex == itemIndex || used[otherItemIndex] || mask[otherItemIndex])
                    {
                        continue;
                    }

                    bucket.Add(data[otherItemIndex]);
                    used[otherItemIndex] = true;
                }
            }
        }

        return buckets;
    }

    private static BitArray[] Transform(BitArray[] data)
    {
        var result = new BitArray[data[0].Length];
        for (int row = 0; row < data.Length; row++)
        {
            for (int col = 0; col < data[0].Length; col++)
            {
                result[col] ??= new BitArray(data.Length);
                result[col][row] = data[row][col];
            }
        }
        return result;
    }

    private static BitArray[] CreateData()
    {
        const int itemCount = 10_000;
        const int bitCount = 1_000;

        var result = new BitArray[itemCount];
        for (int i = 0; i < itemCount; i++)
        {
            var bitArray = new BitArray(bitCount);
            for (int j = 0; j < bitCount; j++)
            {
                bitArray.Set(j, Random.Shared.Next(0, 2) == 1); // random
                // bitArray.Set(j, false); // all false, uncomment to have all items intersected
            }
            result[i] = bitArray;
        }

        return result;
    }
}

internal class Bucket
{
    public bool IsFull => Items.Length == 4;
    public BitArray[] Items { get; private set; }

    public Bucket(BitArray firstItem)
    {
        Items = [firstItem];
    }

    public void Add(BitArray nextItem)
    {
        if (Items.Length == 4)
        {
            throw new InvalidOperationException("Bucket is full");
        }

        Items = [.. Items, nextItem];
    }
}
→ Ссылка
Автор решения: maestro

Когда вы проверяете текущую группу на наличие пересечений с новым элементом, нет никакой необходимости сравнивать новый элемент со всеми членами группы. Вместо этого группа может хранить побитовое ИЛИ всех её членов, и тогда можно будет сравнивать новый элемент с побитовым ИЛИ. Это уменьшит количество сравнений.

Однако в конкретно вашем примере со случайной генерацией битовых последовательностей пересечения встречаются во всех элементах, поэтому количество групп всегда равно количеству элементов, с одним элементом в каждой группе. Если всё-таки тестировать алгоритм на ваших реальных данных, в которых действительно встречаются непересекающиеся битовые последовательности, то после добавления каждого нового члена в группу количество последующих сравнений уменьшится.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Globalization;

namespace ConsoleApp16;

internal class Program
{
    static long _comparisons;

    static void Main(string[] args)
    {
        Console.WriteLine("Start Data creation");
        BitArray[] data = CreateData();
        Console.WriteLine("Data created");

        Console.WriteLine("Start looking for non-intersected");
        var stopwatch = Stopwatch.StartNew();
        var buckets = GetBuckets(data);
        Console.WriteLine($"Completed looking for non-intersected in {stopwatch.Elapsed}");

        var checkCount = buckets.Sum(x => x.Items.Count());
        Console.WriteLine($"Packed {checkCount} items");

        var grouped = buckets
            .GroupBy(x => x.Items.Count())
            .Select(g => $"{g.Key}-item: {g.Count()} buckets");

        foreach (var g in grouped)
        {
            Console.WriteLine(g);
        }

        var comparisons = _comparisons.ToString("N0", new CultureInfo("en-us"));
        Console.WriteLine($"{comparisons} comparisons");

        Console.ReadKey();
    }

    private static List<Bucket> GetBuckets(BitArray[] data)
    {
        List<Bucket> buckets = new List<Bucket>(data.Length);

        foreach (var bitArray in data)
        {
            bool added = false;
            foreach (var bucket in buckets)
            {
                if (bucket.IsFull) continue;
                var copy = new BitArray(bucket.or_overall);
                bool intersected = copy.And(bitArray).HasAnySet();
                if (!intersected)
                {
                    bucket.Add(bitArray);
                    added = true;
                    break;
                }
            }
            if (!added)
            {
                buckets.Add(new Bucket(bitArray));
            }
        }

        return buckets;
    }

    private static BitArray[] CreateData()
    {
        const int itemCount = 10_000;
        const int bitCount = 1_000;

        var result = new BitArray[itemCount];
        for (int i = 0; i < itemCount; i++)
        {
            var bitArray = new BitArray(bitCount);
            for (int j = 0; j < bitCount; j++)
            {
                bitArray.Set(j, Random.Shared.Next(0, 2) == 1); // random
                // bitArray.Set(j, false); // all false, uncomment to have all items intersected
            }
            result[i] = bitArray;
        }

        return result;
    }
}

internal class Bucket
{
    public bool IsFull => Items.Length == 4;
    public BitArray[] Items { get; private set; }
    public BitArray or_overall;

    public Bucket(BitArray firstItem)
    {
        Items = [firstItem];
        or_overall = new BitArray(firstItem);
    }

    public void Add(BitArray nextItem)
    {
        if (Items.Length == 4)
        {
            throw new InvalidOperationException("Bucket is full");
        }
        or_overall.Or(nextItem);
        Items = [.. Items, nextItem];
    }
}
→ Ссылка
Автор решения: sibedir

Пройдите матрицу и просуммируйте биты 1 для каждой строки.

!!! Сравнивать на пересечение строки, для которых сумма битов 1 больше количества битов в строке, не имеет смысла

if (Sum[i] + Sum[j] > bitCount) continue;

В принципе можно отсортировать по Sum[i] и сравнивать только со строками, для которых Sum[j] <= bitCount - Sum[i].

Как вариант,

  1. заводим map, где ключ - сумма битов 1, значение - это структура, которая хранит номера непересекающихся строк и их объединение.
  2. на каждой итерации берём запись с максимальным ключом и пытаемся срастить его с другими, пока это возможно или пока не достигнуто предельное количество сращиваний (4).
  3. получившееся значение убираем из map'ы и сохраняем в вектор результатов.
  4. повторяем п. 2 пока map'а не пустая
→ Ссылка
Автор решения: DiD

По мне BitArray для задачи не очень подходит. Он упаковывает по 8 элементов в 1 байт. Кодирование повторяется с каждым новым элементом при каждой записи и, соответственно, декодирование при каждом чтении. Это не может не сказываться на производительности алгоритма.

В самом простейшем случае для хранения 1000 бит можно использовать массив из 32 элементов uint или 16 элементов ulong. То есть в одном элементе uint мы помещаем 32 бита данных. Сравнить их на не пересечение очень просто:

Также последовательно перебираем элементы. Проверку можно проводить в любом направлении пока не будет найдено пересечение. Два числа, если их биты не пересекаются, в операции a & b вернут 0. Если a & b больше 0, то проверку последовательности можно завершить.

if ((a[i] & b[i]) != 0) return true;

По однопоточным тестам uint оказался быстрее где-то на 8-10%, чем ulong. На рандомных данных это лучший результат в однопотоке.

Но на органических данных потенциально возможно увеличение производительности за счет использования для групп суммарно-общее значение через логическое ИЛИ. То есть сравнивать строку сначала с масками групп, а потом с оставшимися строками.

Также потенциально может помочь начальная сортировка строк по весу Хэмминга. Какие-то очень добрые люди удалили страницу из русской википедии.

Alexander Petrov в комментарии указал ссылку на документацию.

Также можно подключить параллельную обработку данных с помощью Parallel.ForEach. Тогда для хранения Bucket следует использовать ConcurrentBag. Но по тестам на dotnetfiddle это, очевидно, не даёт никакого прироста к скорости выполнения. По всей видимости, выполнение идёт также в один процесс, но с многопоточным оверхедом.

Так что самый производительный вариант беглых тестов https://dotnetfiddle.net/YuZD30

Скопирую код также сюда.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;

namespace ConsoleApp16
{
    internal class Program
    {
        static long _comparisons;

        static void Main(string[] args)
        {
            Console.WriteLine("Start Data creation");
            uint[][] data = CreateData();
            Console.WriteLine("Data created");

            Console.WriteLine("Start looking for non-intersected");
            var stopwatch = Stopwatch.StartNew();
            var buckets = GetBuckets(data);
            Console.WriteLine($"Completed looking for non-intersected in {stopwatch.Elapsed}");

            var checkCount = buckets.Sum(x => x.Items.Count());
            Console.WriteLine($"Packed {checkCount} items");

            var grouped = buckets
                .GroupBy(x => x.Items.Count())
                .Select(g => $"{g.Key}-item: {g.Count()} buckets");

            foreach (var g in grouped)
            {
                Console.WriteLine(g);
            }

            var comparisons = _comparisons.ToString("N0", new CultureInfo("en-us"));
            Console.WriteLine($"{comparisons} comparisons");
        }

        private static List<Bucket> GetBuckets(uint[][] data)
        {
            List<Bucket> buckets = new List<Bucket>(data.Length);

            foreach (var item in data)
            {
                bool added = false;

                foreach (var bucket in buckets)
                {
                    if (bucket.IsFull) continue;

                    bool intersected = false;
                    foreach (var other in bucket.Items)
                    {
                        if (AreIntersected(item, other))
                        {
                            intersected = true;
                            break;
                        }
                    }

                    if (!intersected)
                    {
                        bucket.Add(item);
                        added = true;
                        break;
                    }
                }

                if (!added)
                {
                    buckets.Add(new Bucket(item));
                }
            }

            return buckets;
        }

        private static bool AreIntersected(uint[] bitArray, uint[] otherBitArray)
        {
            _comparisons++;

            for (int i = 0; i < bitArray.Length; i++)
            {
                if ((bitArray[i] & otherBitArray[i]) != 0)
                {
                    return true;
                }
            }
            return false;
        }

        private static uint[][] CreateData()
        {
            const int itemCount = 10_000;
            const int bitCount = 1_000;
            const int uintCount = (bitCount + 31) / 32; // 32 uints for 1000 bits

            var result = new uint[itemCount][];
            Random random = new Random();

            for (int i = 0; i < itemCount; i++)
            {
                var uintArray = new uint[uintCount];

                for (int j = 0; j < uintCount; j++)
                {
                    uintArray[j] = (uint)random.Next(0, int.MaxValue);
                }
                result[i] = uintArray;
            }

            return result;
        }
    }

    internal class Bucket
    {
        public bool IsFull => Items.Length == 4;
        public uint[][] Items { get; private set; }

        public Bucket(uint[] firstItem)
        {
            Items = new[] { firstItem };
        }

        public void Add(uint[] nextItem)
        {
            if (Items.Length == 4)
            {
                throw new InvalidOperationException("Bucket is full");
            }

            var newItems = new uint[Items.Length + 1][];
            Items.CopyTo(newItems, 0);
            newItems[^1] = nextItem;
            Items = newItems;
        }
    }
}
→ Ссылка