-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(new sink): Initial pulsar
sink implementation
#1665
Conversation
pulsar
sink implementation
@bruceg do you mind reviewing this? |
And thanks for doing this @leshow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that it is mostly an adaptation of the kafka
sink, it looks pretty straight forward. I presume at some point authentication support would be useful to add.
Without working healthcheck and tests I can't of course fully approve it, but what I see looks like a good start.
let consumer = pulsar | ||
.consumer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know in detail how Pulsar handles authorization, but trying to access the remote as a consumer in healthcheck
but as a producer in the actual sink could hit different authorization paths.
I see the kafka
sink also uses this approach, but it is a potential issue to be aware of.
src/sinks/pulsar.rs
Outdated
} | ||
self.acker.ack(num_to_ack); | ||
} | ||
Err(e) => error!("future cancelled: {}", e), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could use a better message. The kafka
error at least referenced that it was a "delivery" future. The term "future" probably has little meaning to the average Vector user.
It looks like a non-issue to me, but I'd feel more comfortable if @LucioFranco or @lukesteensen took a glance at the |
BTW @leshow you can mark a pull request as "draft" when you submit it by clicking the arrow beside the "Create pull request". |
Thanks for the feedback. Okay, will do for next time, I don't see an option to do it now. So long as the structure is okay I'll wrap up the healthcheck and tests hopefully in the next few days. |
Hi, @leshow there is currently no authentication support on that PR, right? |
@KannarFr Correct, however if all you need is the ability to pass the |
Pulsar is not using user/pass auth but I'm sure it's not difficult, I think
I will commit that PR. :)
Le lun. 3 févr. 2020 à 18:40, Evan Cameron <[email protected]> a
écrit :
… @KannarFr <https://github.com/KannarFr> Correct, however if all you need
is the ability to pass the Authentication user/pass combo to pulsar I'm
pretty sure I can add that in for you.
https://docs.rs/pulsar/0.2.0/pulsar/struct.Pulsar.html#method.new
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#1665>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABZLQJOAMG3WKDL7SGYK72LRBBJJFANCNFSM4KONMSCA>
.
|
Done in leshow#1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SinkContext
changes look good 👍
I've added healthcheck & an integration test. Tried it locally with pulsar in the background and seems to work. I need to add another |
@leshow sounds good. Just re-request a review when you're ready. |
pulsar
sink implementationpulsar
sink implementation
pulsar
sink implementation
Okay, I've moved this out of WIP. It's working locally, if you want to reproduce, run
I've added what I think are the relevant configs and edit: I'm not sure if you want to wait until |
Seems like I still need to figure out the testing CI. |
@leshow you can run them locally via Docker. See this README. Once you get the tests passing we'll review. |
pulsar
sink implementation
@leshow do you have time to do it? I can do it by my side if needed. |
@KannarFr if you'd like to do the docker bit and submit it as a PR to my fork, sure. Other things have come up that have taken some time. You need to get pulsar-standalone running in a container so the integration tests can run. |
Cargo.toml
Outdated
@@ -141,6 +141,7 @@ colored = "1.9" | |||
warp = "0.1.20" | |||
evmap = { version = "7", features = ["bytes"] } | |||
logfmt = "0.0.2" | |||
pulsar = { git = "https://github.com/wyyerd/pulsar-rs" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At a minimum, we should be putting a checkout for this. (I realize the other git dependencies don't have one... Sorry for coming down on you.) Notably lockfiles aren't always used rust-lang/cargo#7169, and it would be nice to regain the ability to run cargo update
or support cargo install
.
Also, would we be comfortable waiting for a release of this library?
There is a big warning on https://github.com/wyyerd/pulsar-rs about future API changes:
Current status: Simple functionality, but expect API to change. Major API changes will come simultaneous with async-await stability, so look for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely, I mentioned elsewhere in this thread I'm not sure if you actually want to merge this until they do a release. In the meantime, I can add commit hash.
I'm at a bit of a loss as to how to add pulsar to the container in order to get the tests I added to run. I've also had an issue on at least 2 different machines, where running
Has anyone had a problem like this? |
@Hoverbear you had requested some changes previously. Are your concerns handled now? |
I think it's good. :) |
Sorry I took so much of your time working through the build issues. If you start moving things to async/await and need some help feel free to ping me, I can pick up a few things. |
Thank you so much @leshow , we are starting to move to that! You can see #1922 for some details about that and @LucioFranco or @MOZGIII may be able to direct your efforts! |
Signed-off-by: binarylogic <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! Thanks.
While looking into the units of the batch size, we discovered that it appears to be disconnected the pulsar crate (it is only implemented for the consumer). Have you tested that the batch size is actually constrained by the option? Did we guess right that it is supposed to be a number of messages? Obviously if the option is disconnected in the upstream crate, this isn't your problem, but we should not claim to support it then. |
I just looked again and I think you're correct, the batch size only looks like it's being used for consumers, I'll remove it as an option. |
Signed-off-by: Evan Cameron <[email protected]>
Thanks a lot for your work on this @leshow! We're happy to add your contribution. |
Agree! Thank you for pushing this to the finish line. |
Yey! Thanks! |
@bruceg do you plan to release with pulsar sink soon? |
|
Thanks for you answer, any news? Where are benchmark output? |
Documentation missing these fields on pulsar sink configuration:
And, I've got the following error:
When using the configuration:
As I'm running master maybe it's not related to your code. |
Thanks. I've fixed the documentation errors, they will be merged today. And we're planning to release
We use to run a variety of tests via https://github.com/timberio/vector-test-harness. All performance data is stored in a public S# bucket as documented in the readme. Hope that helps! |
@binarylogic thanks for your answer. Another point: address seems to must be an ip, can it be an hostname too? |
@KannarFr have a look at the docs for |
Got it, will contribute to pulsar-rs :). |
@binarylogic any news for 0.9.0? |
@KannarFr should be coming very very soon :) |
Ahaha, I'm sorry but I need this tag to define the exherbo.org package using the new pulsar sink. Maybe can you release a 0.9.0 with it and a 0.10.0 very very soon? |
I am not totally sure what the status on 0.10 is, though I think @binarylogic currently working on 0.9 as we speak. |
closes #690
Missing tests and proper healthcheck right now, I just wanted to show this version to get some feedback. It's based largely on the kafka sink. It impl's
Sink
, like the kafka sink it holds the in flight futures andAck
s back after completion.Let me know if this approach is suitable, if there are some configuration options missing we'd like to add, etc. I'd like to hold off on SSL at the moment because I don't think it's well supported in the underlying crate. This change depends on a change to
SinkContext
andpulsar-rs
also, worth looking at that.