как получить только первое сообщение из RabbitMq не задевая другие и завершить программу

есть код который получает сообщения из очереди с помощью библиотеки github.com/streadway/amqp

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    // Create a channel
    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()

    // Declare a queue
    q, err := ch.QueueDeclare(
        "main", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    if err != nil {
        panic(err)
    }

    // Receive messages
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        true,   // no-wait
        nil,    // args
    )
    if err != nil {
        panic(err)
    }

    var data []string

    for d := range msgs {
        data = append(data, string(d.Body))
    }

Функция ch.Consume() получает все сообщения из очереди в канал msgs, затем в цикле я записываю все сообщения в массив data. После чего все сообщения в очереди удаляются.

Мне же нужно получить только самое первое сообщение, обработать его и удалить , при этом вообще не затрагивая следующие сообщения в очереди, а затем завершить программу. То есть при следующем запуске программы она должна подобрать уже следующие сообщение, обработать и удалить, итд.

Как это реализовать?


Ответы (2 шт):

Автор решения: Xander

Я с этой библиотекой не работал, но, судя по документации, вы можете вместо ch.Consume использовать ch.Get, которое получает именно одно первое сообщение в очереди.

https://pkg.go.dev/github.com/streadway/amqp#Channel.Get

→ Ссылка
Автор решения: Dmitriy

вот полный код функции решающий эту задачу

func getMessageFromRabbitMQ() string {
    // RabbitMQ connection string
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    // Create a channel
    ch, err := conn.Channel()
    if err != nil {
        panic(err)
    }
    defer ch.Close()
    //Установка временного ограничения для чтения сообщений
    err = ch.Qos(
        1,     // prefetchCount: максимальное количество сообщений, которые может принять потребитель до его подтверждения
        0,     // prefetchSize
        false, // global
    )
    if err != nil {
        panic(err)
    }

    // Declare a queue
    q, err := ch.QueueDeclare(
        "mainqueue", // name
        true,        // durable
        false,       // delete when unused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )
    if err != nil {
        panic(err)
    }

    // Receive messages
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        panic(err)
    }

    var data string
    for item := range msgs {
        data = string(item.Body)
        item.Ack(false)
        break
    }

    return data
}
→ Ссылка