Подключение к стриму RabbitMQ (AMQP) из Flutter приложения, ошибка consumer prefetch count is not set for 'queue
Всем доброго времени суток. Я начинающий во флаттере, и столкнулся с проблеммой. Я разрабатываю приложение для андроид и айос. В приложении будет экран, на котором будут данные, которые я получаю из СТРИМА брокера сообщений (RabbitMQ, AMQP 0.9.1). Когда я подключаюсь к обычной очереди - всё в порядке, всё работает, уходит и приходит. Но когда я пытаюсь подключится к стриму, получаю ошибку
E/flutter (18563): [ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: ChannelException(PRECONDITION_FAILED): PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'ENBfqmhud95FsU5VwmRrvp0mDY-5e4wlLk5XApzMF7M' in vhost '/''
Вроде бы не хватает prefetch, но я нигде в стримах не указываю явно prefetch. Да и на сервере (в менеджинге) никаких дополнительных аргументов у стрима нет. Вот мой код в дарте:
_channel = await _clientGet.channel();
await _channel.qos(0, 100, global: true);
Queue queue = await _channel.queue(AppConstants.queueName, durable: true, arguments: <String, Object>{'x-queue-type': 'stream'});
Consumer consumer = await queue.consume(noAck: false, arguments: <String, Object>{"prefetch": 100});
consumer.listen((AmqpMessage message) {
// setState(() {
// _updateLocation();
// });
print('xxxReceived message: ${message.payloadAsString}');
});
Я уже пробовал(по ощущениям) всё что можно было, но ни информации, ни примера, ни решения не нашёл. Уже 2 дня борюсь с этими стримами.
Вот пример кода из фронта(JS), который подключается к этим же стримам:
const amqp = new AMQPWebSocketClient(element.mapRow.getAttribute('data-rabbitmq-url'), "/", element.mapRow.getAttribute('data-rabbitmq-user'), element.mapRow.getAttribute('data-rabbitmq-pass'))
const conn = await amqp.connect()
if(!conn) throw new Error('No connect to RMQ');
const ch = await conn.channel()
let mainQueue = await ch.queue(queueName, {}, {"x-queue-type": "stream" })
let reverseQueue = await ch.queue(queueName + reversePrefix, {}, {"x-queue-type": "stream" })
const consumer = await mainQueue.subscribe({ noAck: false, args: { "x-stream-offset": "last" } }, (msg) => {
...
}
}
П.С: изменить стримы на обычные очереди нельзя, на них держится фронт в проде.