PostgreSql транзакции и блокировка таблицы при долговыполняемых запросах
У меня есть две таблицы. Схема первой таблицы:
create table db.content
(
id int
constraint content_pk
primary key,
data text
);
create unique index content_data_uindex
on db.content (data);
Вторая таблица:
create table db.used_content
(
id int
constraint used_content_pk
primary key generated always as identity,
content_id int REFERENCES db.content (id)
);
В своей функции на GO я хочу:
- Получить все строки из таблицы
content, которых нет в таблицеused_content. - Произвести над некоторыми строками действия. Какие именно строки нужны - решит алгоритм, поэтому нужна вся таблица целиком.
- Сохранить список использованных ID в таблицу
used_content
Проблема возникает при многопоточности.
Я понимаю, что транзакции - это не мьютекс, который бы полностью решил эту проблему.
Я прочитал в интернете про блокировку таблицы с помощью SELECT ... FOR NO KEY UPDATE, но это, как я понимаю, не то что мне нужно. Проблему это не решает.
Нашёл пока что для себя одно решение - использовать sync.Mutex. Насколько это правильный подход?
Код программы ниже:
package main
import (
"example/pgxutil"
"context"
"errors"
"fmt"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"sync"
"time"
)
const _dbConnString = "user=postgres password=pass host=localhost port=5432 dbname=tst"
type PsqlDB interface {
Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error)
}
type Content struct {
ID int
Data string
}
func GetContent(db PsqlDB) ([]Content, error) {
const query = `SELECT id, data FROM db.content WHERE id NOT IN (SELECT content_id FROM db.used_content)`
rows, err := db.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()
var res []Content
for rows.Next() {
var r Content
if err = rows.Scan(&r.ID, &r.Data); err != nil {
return nil, err
}
res = append(res, r)
}
if err = rows.Err(); err != nil {
return nil, err
}
return res, nil
}
func AddUsedContent(db PsqlDB, usedIds []int) error {
if len(usedIds) < 1 {
return errors.New("slice could not be nil or empty")
}
sql := "INSERT INTO db.used_content (content_id) VALUES "
for i := 0; i < len(usedIds); i++ {
sql += fmt.Sprintf("(%d)", usedIds[i])
if i != len(usedIds)-1 {
sql += ","
}
}
_, err := db.Exec(context.Background(), sql)
return err
}
var _mx sync.Mutex
func LongComputation(db *pgxpool.Pool, wait time.Duration) {
//_mx.Lock() // Эти две строки решают мою проблему!
//defer _mx.Unlock() // Но как сделать это более правильно? На уровне базы данных
trx, err := pgxutil.New(db)
if err != nil {
fmt.Println(err)
return
}
err = trx.InTx(context.Background(), pgx.Serializable, func(tx pgx.Tx) error {
content, err := GetContent(tx)
if err != nil {
return err
}
if len(content) == 0 {
return errors.New("content is empty")
}
time.Sleep(wait)
var usedIds []int
for i := 0; i < len(content); i++ {
usedIds = append(usedIds, content[i].ID)
}
return AddUsedContent(tx, usedIds)
})
fmt.Println("RESULT", err)
}
func main() {
config, err := pgxpool.ParseConfig(_dbConnString)
if err != nil {
return
}
db, err := pgxpool.ConnectConfig(context.Background(), config)
if err != nil {
return
}
go LongComputation(db, 10*time.Second)
time.Sleep(1 * time.Second)
go LongComputation(db, 4*time.Second)
time.Sleep(1 * time.Hour)
}
Вывод текущей программы:
RESULT nil
RESULT ERROR: could not serialize access due to read/write dependencies among transactions (SQLSTATE 40001)
Какой результат я ожидаю:
RESULT nil
RESULT content is empty