как получить только первое сообщение из RabbitMq не задевая другие и завершить программу
есть код который получает сообщения из очереди с помощью библиотеки github.com/streadway/amqp
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
if err != nil {
panic(err)
}
defer conn.Close()
// Create a channel
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
// Declare a queue
q, err := ch.QueueDeclare(
"main", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Receive messages
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
true, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
var data []string
for d := range msgs {
data = append(data, string(d.Body))
}
Функция ch.Consume()
получает все сообщения из очереди в канал msgs, затем в цикле я записываю все сообщения в массив data. После чего все сообщения в очереди удаляются.
Мне же нужно получить только самое первое сообщение, обработать его и удалить , при этом вообще не затрагивая следующие сообщения в очереди, а затем завершить программу. То есть при следующем запуске программы она должна подобрать уже следующие сообщение, обработать и удалить, итд.
Как это реализовать?
Ответы (2 шт):
Я с этой библиотекой не работал, но, судя по документации, вы можете вместо ch.Consume
использовать ch.Get
, которое получает именно одно первое сообщение в очереди.
вот полный код функции решающий эту задачу
func getMessageFromRabbitMQ() string {
// RabbitMQ connection string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
if err != nil {
panic(err)
}
defer conn.Close()
// Create a channel
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
//Установка временного ограничения для чтения сообщений
err = ch.Qos(
1, // prefetchCount: максимальное количество сообщений, которые может принять потребитель до его подтверждения
0, // prefetchSize
false, // global
)
if err != nil {
panic(err)
}
// Declare a queue
q, err := ch.QueueDeclare(
"mainqueue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
panic(err)
}
// Receive messages
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
panic(err)
}
var data string
for item := range msgs {
data = string(item.Body)
item.Ack(false)
break
}
return data
}