Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consuming stream stucks on 0.13 #133

Closed
tyranron opened this issue Jul 11, 2018 · 14 comments
Closed

Consuming stream stucks on 0.13 #133

tyranron opened this issue Jul 11, 2018 · 14 comments

Comments

@tyranron
Copy link

Consuming stream (returned by channel.basic_consume()) consumes newly appeared deliveries from RabbitMQ (in RabbitMQ Admin their are marked as Unacked) but does not propagate them into .for_each() closure.

How to reproduce:

  1. Push 2 deliveries to a RabbitMQ queue.
  2. Start consumer application. It will consume all 2 deliveries and process them in .for_each() just OK.
  3. When there is no deliveries in the queue and consumer waits for them, push another 1 delivery again.
  4. On the RabbitMQ server it becomes Unacked (so consumer has read it), but .for_each() closure is not invoked.

Heartbeat in background works OK.

Downgrading back to 0.12 solves the problem and .for_each() closure is invoked every time the consumer consumes delivery.

@Keruspe
Copy link
Collaborator

Keruspe commented Jul 12, 2018

Do you have a code example to reproduce your issue? I cannot reproduce it locally

@tyranron
Copy link
Author

@Keruspe I will try to create example repo which reproduces the error.

@tyranron
Copy link
Author

@Keruspe I was able to reproduce in this repo.

The interesting part is that if I do .basic_ack() inside .for_each() - everything is OK. But when I do .basic_ack() from another place and .for_each() closure returns Ok(()) it stucks.

On 0.12 it worked in both places.

@Keruspe
Copy link
Collaborator

Keruspe commented Jul 12, 2018

Thanks, I’ll try reproducing with that tomorrow.
Please note though that rabbitmq highly recommends not to share channels across threads so that might be at least a part of your issue

@fstephany
Copy link

I have the same kind of symptoms but I'm almost sure that my code is to blame. If there are messages in the queue, it sometimes hang when processing the first message (the server shows it as 'Unacked' but the Got message. Raw: {:?} log is never printed)

I basically do three operations:

  • process messages in stream.for_each
  • publish a result in another queue (but in the same channel),
  • Ack the message

I'm new to rust and RabbitMQ, so those clone() calls are probably not ideal and might be completely wrong. Should I start by using different channels for the basic_consume and publish?

.and_then(move |queue| {
    debug!("commits queue setup");
    channel.basic_consume(
        &queue,
        "worker",
        BasicConsumeOptions::default(),
        FieldTable::new())
})
.and_then(move |stream| {
    debug!("RabbitMQ queues and stream are setup");
    stream.for_each(move |message| {
        debug!("Got message. Raw: {:?}", message);
        let msg_clone = message.clone();
        let channel_clone = ch2.clone();
        
        let decoded_msg = std::str::from_utf8(&message.data).unwrap();
        let metadata = processor.process(decoded_msg);
        
        let serialized_result =match metadata {
            Ok(data) => serde_json::to_string(&data).unwrap(),
            Err(e) => format!("Error while processing commit: {}", e)
        };
        info!("Publishing result to RabbitMQ");

        ch2.basic_publish(
            "", 
            "analysis-results", 
            serialized_result.into_bytes(), 
            BasicPublishOptions::default(), 
            BasicProperties::default())
        .map(|_option| ())
        .and_then(move |()| {
            info!("Result published. Sending ack.");
            channel_clone.basic_ack(msg_clone.delivery_tag, false)
        })
    })
})  

@fstephany
Copy link

@tyranron Have you found what was the cause of the issue? I've created a minimal project illustrating the issue I have (which seems related to yours): https://github.com/fstephany/lapin-issue-133
If the README is not clear, let me know and I'll update it.

@Keruspe Did you manage to reproduce on your machine? If you have time, can you have a look at the consumer code and check for any obvious mistake? If you're a freelancer, a paid remote debug session is also a possibility ;)

@tyranron
Copy link
Author

@fstephany nope, sorry... I haven't worked on it recently.

@Keruspe
Copy link
Collaborator

Keruspe commented Mar 10, 2019

Fwiw, I’m working on a rework/refactor that should land by the end of the month which should improve that

@fstephany
Copy link

Thanks!
If you need help/testing, do not hesitate to ping.

@Keruspe
Copy link
Collaborator

Keruspe commented May 10, 2019

Could you try with 0.19.0?
I'm releasing a few alphas until it's properly ready but things should already be better

@fstephany
Copy link

I'll try to test it this week and let you know.

@gustabot42
Copy link

0.19.0.alpha still have the issue. the stream stucks after read the first new message after the queue get empty.

@Keruspe
Copy link
Collaborator

Keruspe commented Jun 8, 2019

Care to try with 0.19.0 beta1?

@Keruspe
Copy link
Collaborator

Keruspe commented Jun 16, 2019

This definitely should be OK now.

Please reopen a new issue with a code sample if you ever manage to reproduce it again

@Keruspe Keruspe closed this as completed Jun 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants