Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): Initial pulsar sink implementation #1665

Merged
merged 16 commits into from
Mar 20, 2020

Conversation

leshow
Copy link
Contributor

@leshow leshow commented Jan 31, 2020

closes #690

Missing tests and proper healthcheck right now, I just wanted to show this version to get some feedback. It's based largely on the kafka sink. It impl's Sink, like the kafka sink it holds the in flight futures and Acks back after completion.

Let me know if this approach is suitable, if there are some configuration options missing we'd like to add, etc. I'd like to hold off on SSL at the moment because I don't think it's well supported in the underlying crate. This change depends on a change to SinkContext and pulsar-rs also, worth looking at that.

@leshow leshow requested a review from lukesteensen as a code owner January 31, 2020 20:51
@binarylogic binarylogic changed the title WIP: feat(pulsar sink): add pulsar sink WIP: feat(new sink): Initial pulsar sink implementation Jan 31, 2020
@binarylogic binarylogic requested review from bruceg and removed request for lukesteensen January 31, 2020 21:59
@binarylogic
Copy link
Contributor

@bruceg do you mind reviewing this?

@binarylogic
Copy link
Contributor

And thanks for doing this @leshow.

Copy link
Member

@bruceg bruceg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that it is mostly an adaptation of the kafka sink, it looks pretty straight forward. I presume at some point authentication support would be useful to add.

Without working healthcheck and tests I can't of course fully approve it, but what I see looks like a good start.

Comment on lines +169 to +176
let consumer = pulsar
.consumer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know in detail how Pulsar handles authorization, but trying to access the remote as a consumer in healthcheck but as a producer in the actual sink could hit different authorization paths.

I see the kafka sink also uses this approach, but it is a potential issue to be aware of.

}
self.acker.ack(num_to_ack);
}
Err(e) => error!("future cancelled: {}", e),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use a better message. The kafka error at least referenced that it was a "delivery" future. The term "future" probably has little meaning to the average Vector user.

@bruceg
Copy link
Member

bruceg commented Feb 1, 2020

It looks like a non-issue to me, but I'd feel more comfortable if @LucioFranco or @lukesteensen took a glance at the SinkContext change to make sure there wasn't some obvious problem with exposing the executor this way.

@bruceg
Copy link
Member

bruceg commented Feb 1, 2020

BTW @leshow you can mark a pull request as "draft" when you submit it by clicking the arrow beside the "Create pull request".

@leshow
Copy link
Contributor Author

leshow commented Feb 1, 2020

Thanks for the feedback. Okay, will do for next time, I don't see an option to do it now. So long as the structure is okay I'll wrap up the healthcheck and tests hopefully in the next few days.

@KannarFr
Copy link
Contributor

KannarFr commented Feb 3, 2020

Hi, @leshow there is currently no authentication support on that PR, right?

@leshow
Copy link
Contributor Author

leshow commented Feb 3, 2020

@KannarFr Correct, however if all you need is the ability to pass the Authentication user/pass combo to pulsar I'm pretty sure I can add that in for you. https://docs.rs/pulsar/0.2.0/pulsar/struct.Pulsar.html#method.new

@KannarFr
Copy link
Contributor

KannarFr commented Feb 3, 2020 via email

@KannarFr
Copy link
Contributor

KannarFr commented Feb 3, 2020

Done in leshow#1.

Copy link
Contributor

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SinkContext changes look good 👍

@leshow
Copy link
Contributor Author

leshow commented Feb 6, 2020

I've added healthcheck & an integration test. Tried it locally with pulsar in the background and seems to work. I need to add another Consumer in the test still. This branch is also pointing at the master branch of pulsar-rs, which could be problematic.

@binarylogic
Copy link
Contributor

@leshow sounds good. Just re-request a review when you're ready.

@leshow leshow changed the title WIP: feat(new sink): Initial pulsar sink implementation feat(new sink): pulsar sink implementation Feb 7, 2020
@leshow leshow changed the title feat(new sink): pulsar sink implementation feat(new sink): Pulsar sink implementation Feb 7, 2020
@leshow
Copy link
Contributor Author

leshow commented Feb 7, 2020

Okay, I've moved this out of WIP. It's working locally, if you want to reproduce, run pulsar-standalone in the background, and the integration test with:

cargo test pulsar --features pulsar-integration-test

I've added what I think are the relevant configs and toml files, but I may have missed something. I was just looking at other PRs and copying what they did.

edit: I'm not sure if you want to wait until pulsar-rs has a release before you actually merge this, I'll leave that up to you folks.

@binarylogic binarylogic requested a review from bruceg February 7, 2020 16:47
@leshow
Copy link
Contributor Author

leshow commented Feb 8, 2020

Seems like I still need to figure out the testing CI.

@binarylogic binarylogic added the needs: docs Needs documentation updates label Feb 9, 2020
@binarylogic
Copy link
Contributor

@leshow you can run them locally via Docker. See this README. Once you get the tests passing we'll review.

@binarylogic binarylogic changed the title feat(new sink): Pulsar sink implementation feat(new sink): Initial pulsar sink implementation Feb 10, 2020
@KannarFr
Copy link
Contributor

KannarFr commented Feb 12, 2020

@leshow do you have time to do it? I can do it by my side if needed.

@leshow
Copy link
Contributor Author

leshow commented Feb 13, 2020

@KannarFr if you'd like to do the docker bit and submit it as a PR to my fork, sure. Other things have come up that have taken some time. You need to get pulsar-standalone running in a container so the integration tests can run.

Cargo.toml Outdated
@@ -141,6 +141,7 @@ colored = "1.9"
warp = "0.1.20"
evmap = { version = "7", features = ["bytes"] }
logfmt = "0.0.2"
pulsar = { git = "https://github.com/wyyerd/pulsar-rs" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a minimum, we should be putting a checkout for this. (I realize the other git dependencies don't have one... Sorry for coming down on you.) Notably lockfiles aren't always used rust-lang/cargo#7169, and it would be nice to regain the ability to run cargo update or support cargo install.

Also, would we be comfortable waiting for a release of this library?

There is a big warning on https://github.com/wyyerd/pulsar-rs about future API changes:

Current status: Simple functionality, but expect API to change. Major API changes will come simultaneous with async-await stability, so look for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, I mentioned elsewhere in this thread I'm not sure if you actually want to merge this until they do a release. In the meantime, I can add commit hash.

@leshow
Copy link
Contributor Author

leshow commented Feb 14, 2020

I'm at a bit of a loss as to how to add pulsar to the container in order to get the tests I added to run. I've also had an issue on at least 2 different machines, where running make test will consume all available system memory then crash with an error like:

Starting vector_elasticsearch_1 ... done
   Compiling vector v0.8.0 (/home/leshow/dev/rust/vector)
error: linking with `cc` failed: exit code: 1
  |
  = note: "cc" "-Wl,--as-needed" "-Wl,-z,noexecstack" "-m64" "-L" "/home/leshow/.rustup/toolchains/1.41.0-x86_64-unknown-linux-gnu/lib/rustlib/x86_64-unknown-linux-gnu/lib" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.103giqk6wtmhpcv6.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.13obbnmdfykyhc3l.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.14zo4roytzzhj6mi.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.15lz2hk08g91v568.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.16whzk623alwv7i8.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.16y5fyibfga3p1jo.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.19h3e3n1uyqbaszt.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.19kbg2dommqxb82f.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.19kc5nhqnompj6au.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1a7fu4u3qti5a3jv.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1hq4w0rrd58d3n8s.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1kgz424er569b6k.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1l3ufz9yktzay4at.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1l3x534mfyibiaa8.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1ohibu38sjtvp7dj.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1prpmiq1z5kawlhi.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1qzrfow0172vxc85.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1tbyvnu4qugdayz1.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1vvmc14v4jr6rfa2.rcgu.o" "/home/leshow/dev/rust/vector/target/debug/deps/vector-a2c9ee1397164ad8.1vxpu4w64y2rr5y5.rcgu.o" "/home/leshow/dev/rust/vector/tar 

...

Has anyone had a problem like this?

@bruceg
Copy link
Member

bruceg commented Mar 17, 2020

@Hoverbear you had requested some changes previously. Are your concerns handled now?

@Hoverbear
Copy link
Contributor

I think it's good. :)

@leshow
Copy link
Contributor Author

leshow commented Mar 17, 2020

Sorry I took so much of your time working through the build issues. If you start moving things to async/await and need some help feel free to ping me, I can pick up a few things.

@Hoverbear
Copy link
Contributor

Thank you so much @leshow , we are starting to move to that! You can see #1922 for some details about that and @LucioFranco or @MOZGIII may be able to direct your efforts!

Copy link
Contributor

@binarylogic binarylogic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Thanks.

@bruceg
Copy link
Member

bruceg commented Mar 17, 2020

While looking into the units of the batch size, we discovered that it appears to be disconnected the pulsar crate (it is only implemented for the consumer). Have you tested that the batch size is actually constrained by the option? Did we guess right that it is supposed to be a number of messages?

Obviously if the option is disconnected in the upstream crate, this isn't your problem, but we should not claim to support it then.

@bruceg bruceg removed the needs: docs Needs documentation updates label Mar 17, 2020
@leshow
Copy link
Contributor Author

leshow commented Mar 17, 2020

I just looked again and I think you're correct, the batch size only looks like it's being used for consumers, I'll remove it as an option.

Signed-off-by: Evan Cameron <[email protected]>
@bruceg bruceg merged commit c8b47b3 into vectordotdev:master Mar 20, 2020
@bruceg
Copy link
Member

bruceg commented Mar 20, 2020

Thanks a lot for your work on this @leshow! We're happy to add your contribution.

@binarylogic
Copy link
Contributor

Agree! Thank you for pushing this to the finish line.

@KannarFr
Copy link
Contributor

Yey! Thanks!

@KannarFr
Copy link
Contributor

@bruceg do you plan to release with pulsar sink soon?

@binarylogic
Copy link
Contributor

0.10.0 should go out next week pending benchmark results on our test harness

@KannarFr
Copy link
Contributor

@binarylogic

Thanks for you answer, any news? Where are benchmark output?

@KannarFr
Copy link
Contributor

@leshow

Documentation missing these fields on pulsar sink configuration:

  type = "pulsar" # required
  inputs = ["source_pulsar_sink"] # required

And, I've got the following error:

kannar@pond ~/git/cc/conf/vector % ../../vector/target/debug/vector validate vector.toml                                                                                            
Mar 31 09:46:48.024 ERROR vector: Configuration error: unknown variant `codec`, expected `text` or `json` for key `sinks.pulsar_output`
Mar 31 09:46:48.024 ERROR vector: Failed to parse config file. path="vector.toml"

When using the configuration:

[sinks.pulsar_output]
  type = "pulsar" # required
  inputs = ["input"] # required
  address = "address" # required
  topic = "topic" # required

  # Auth
  auth.name = "${AUTH_PROVIDER}" # optional, no default
  auth.token = "${AUTH_TOKEN}" # optional, no default

  encoding.codec = "text" # required

As I'm running master maybe it's not related to your code.

@binarylogic
Copy link
Contributor

Thanks. I've fixed the documentation errors, they will be merged today. And we're planning to release 0.9.0 this week. Benchmarks look good.

Where are benchmark output?

We use to run a variety of tests via https://github.com/timberio/vector-test-harness. All performance data is stored in a public S# bucket as documented in the readme. Hope that helps!

@KannarFr
Copy link
Contributor

KannarFr commented Mar 31, 2020

@binarylogic thanks for your answer.

Another point: address seems to must be an ip, can it be an hostname too?

@leshow
Copy link
Contributor Author

leshow commented Apr 1, 2020

@KannarFr have a look at the docs for pulsar-rs, the address is passed off to them.

@KannarFr
Copy link
Contributor

KannarFr commented Apr 3, 2020

Got it, will contribute to pulsar-rs :).

@KannarFr
Copy link
Contributor

KannarFr commented Apr 9, 2020

@binarylogic any news for 0.9.0?

@LucioFranco
Copy link
Contributor

@KannarFr should be coming very very soon :)

@KannarFr
Copy link
Contributor

Ahaha, I'm sorry but I need this tag to define the exherbo.org package using the new pulsar sink. Maybe can you release a 0.9.0 with it and a 0.10.0 very very soon?

@KannarFr
Copy link
Contributor

@LucioFranco ^

@LucioFranco
Copy link
Contributor

I am not totally sure what the status on 0.10 is, though I think @binarylogic currently working on 0.9 as we speak.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

New apache_pulsar sink
6 participants