PostgreSql транзакции и блокировка таблицы при долговыполняемых запросах

У меня есть две таблицы. Схема первой таблицы:

create table db.content
(
    id   int
        constraint content_pk
            primary key,
    data text
);

create unique index content_data_uindex
    on db.content (data);

Вторая таблица:

create table db.used_content
(
    id         int
        constraint used_content_pk
             primary key generated always as identity,
    content_id int REFERENCES db.content (id)
);

В своей функции на GO я хочу:

  1. Получить все строки из таблицы content, которых нет в таблице used_content.
  2. Произвести над некоторыми строками действия. Какие именно строки нужны - решит алгоритм, поэтому нужна вся таблица целиком.
  3. Сохранить список использованных ID в таблицу used_content

Проблема возникает при многопоточности.

Я понимаю, что транзакции - это не мьютекс, который бы полностью решил эту проблему. Я прочитал в интернете про блокировку таблицы с помощью SELECT ... FOR NO KEY UPDATE, но это, как я понимаю, не то что мне нужно. Проблему это не решает.

Нашёл пока что для себя одно решение - использовать sync.Mutex. Насколько это правильный подход?

Код программы ниже:

package main

import (
    "example/pgxutil"
    "context"
    "errors"
    "fmt"
    "github.com/jackc/pgconn"
    "github.com/jackc/pgx/v4"
    "github.com/jackc/pgx/v4/pgxpool"
    "sync"
    "time"
)

const _dbConnString = "user=postgres password=pass host=localhost port=5432 dbname=tst"

type PsqlDB interface {
    Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error)
    Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
    QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
    QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error)
}

type Content struct {
    ID   int
    Data string
}

func GetContent(db PsqlDB) ([]Content, error) {
    const query = `SELECT id, data FROM db.content WHERE id NOT IN (SELECT content_id FROM db.used_content)`

    rows, err := db.Query(context.Background(), query)
    if err != nil {
        return nil, err
    }

    defer rows.Close()

    var res []Content

    for rows.Next() {
        var r Content
        if err = rows.Scan(&r.ID, &r.Data); err != nil {
            return nil, err
        }
        res = append(res, r)
    }

    if err = rows.Err(); err != nil {
        return nil, err
    }

    return res, nil
}

func AddUsedContent(db PsqlDB, usedIds []int) error {
    if len(usedIds) < 1 {
        return errors.New("slice could not be nil or empty")
    }

    sql := "INSERT INTO db.used_content (content_id) VALUES "

    for i := 0; i < len(usedIds); i++ {
        sql += fmt.Sprintf("(%d)", usedIds[i])
        if i != len(usedIds)-1 {
            sql += ","
        }
    }

    _, err := db.Exec(context.Background(), sql)

    return err
}

var _mx sync.Mutex

func LongComputation(db *pgxpool.Pool, wait time.Duration) {
    //_mx.Lock()          // Эти две строки решают мою проблему!
    //defer _mx.Unlock()  // Но как сделать это более правильно? На уровне базы данных
    trx, err := pgxutil.New(db)

    if err != nil {
        fmt.Println(err)
        return
    }

    err = trx.InTx(context.Background(), pgx.Serializable, func(tx pgx.Tx) error {

        content, err := GetContent(tx)
        if err != nil {
            return err
        }

        if len(content) == 0 {
            return errors.New("content is empty")
        }

        time.Sleep(wait)
        var usedIds []int
        for i := 0; i < len(content); i++ {
            usedIds = append(usedIds, content[i].ID)
        }

        return AddUsedContent(tx, usedIds)
    })

    fmt.Println("RESULT", err)
}

func main() {

    config, err := pgxpool.ParseConfig(_dbConnString)

    if err != nil {
        return
    }

    db, err := pgxpool.ConnectConfig(context.Background(), config)
    if err != nil {
        return
    }

    go LongComputation(db, 10*time.Second)
    time.Sleep(1 * time.Second)
    go LongComputation(db, 4*time.Second)

    time.Sleep(1 * time.Hour)
}

Вывод текущей программы:

RESULT nil

RESULT ERROR: could not serialize access due to read/write dependencies among transactions (SQLSTATE 40001)

Какой результат я ожидаю:

RESULT nil

RESULT content is empty


Ответы (0 шт):