Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better async/pipelining support? #916

Closed
mberndt123 opened this issue Nov 10, 2023 · 0 comments · Fixed by #917
Closed

better async/pipelining support? #916

mberndt123 opened this issue Nov 10, 2023 · 0 comments · Fixed by #917
Labels
enhancement New feature or request

Comments

@mberndt123
Copy link
Contributor

mberndt123 commented Nov 10, 2023

Hey @mijicd,

I think I have run into a limitation of the zio-redis architecture that makes it hard for me to solve a quite straight-forward problem.

When you run a command, zio-redis will create a Promise for the response and place it into the requests Queue along with the command. Then there are some fibers that take care of sending that command to Redis and completing the Promise as soon as the response comes back from Redis. I think this is a great design because it enables pipelining without the user having to worry about it explicitly.

However it has one drawback: when you want to send several commands to the server that need to run in a specific order, you have to wait for each command to finish before sending the next one. This is because the return type of all the command methods is something like IO[RedisError, A]. What would be needed is something like UIO[IO[RedisError, A]], where the outer UIO finishes as soon as the command was placed in the requests Queue, and the inner IO then waits for the response from the server.

I'm not proposing to change all the method return types because that would be a very invasive change and everybody would have to call flatten all the time. Instead, my suggestion would be to add a G[_] type parameter to the Redis trait so that it's possible to have both a "synchronous" (return type IO[RedisError, A]) and an "asynchronous" (return type UIO[IO[RedisError, A]]) variant of the Redis trait without having to duplicate all the methods. I'd be glad to work on the implementation of such a design.

For motivation, I'd like to explain how I ran into this problem.

I'm reading messages (key-value pairs) from a Kafka topic, and I want to store these messages in Redis so that I can always retrieve the latest value for any given key. A naïve solution would be something like this:

Consumer.plainStream(…stuff…)
  .mapZIO(record => serviceWithZIO[Redis](_.hSet(record.key, "foo" -> record.value.foo)).as(record))
  .mapZIO(_.offset.commit)
}

This works, but performance is bad when you do it this way because it will wait for each hSet command to complete before sending the next one. But the question is, what to do instead? If I replace the first mapZIO with mapZIOPar, then the program no longer works correctly. When two messages with the same key arrive within a short time window, there is no guarantee which one will end up being sent to Redis first, and so the value of an older message might overwrite the value from a newer one. I also considered mapZIOParByKey, but this is also not a good solution (poor performance if the number of distinct keys is small, and it reorders messages, making it hard to commit the Kafka offsets).

@mijicd mijicd added the enhancement New feature or request label Nov 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants