Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add default slot and keys recognition #754

Merged
merged 1 commit into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions redis/src/main/scala/zio/redis/ClusterExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ final case class ClusterExecutor(
}

for {
key <- ZIO.attempt(command.args(1).value).orElseFail(CusterKeyError)
keySlot = Slot((key.asCRC16 % SlotsAmount).toLong)
keyOpt <- ZIO.succeed(command.args.collectFirst { case key: RespArgument.Key => key })
keySlot = keyOpt.fold(Slot.Default)(key => Slot((key.asCRC16 & (SlotsAmount - 1)).toLong))
result <- executeSafe(keySlot)
} yield result
}
Expand Down Expand Up @@ -165,12 +165,8 @@ object ClusterExecutor {
for (i <- p.slotRange.start to p.slotRange.end) yield Slot(i) -> p.master.address
}.toMap

private final val CusterKeyError =
RedisError.ProtocolError("Key doesn't found. No way to dispatch this command to Redis Cluster")
private final val CusterKeyExecutorError =
RedisError.IOError(
new IOException("Executor for key doesn't found. No way to dispatch this command to Redis Cluster")
)
RedisError.IOError(new IOException("Executor doesn't found. No way to dispatch this command to Redis Cluster"))
private final val CusterConnectionError =
RedisError.IOError(new IOException("The connection to cluster has been failed. Can't reach a single startup node."))
}
7 changes: 7 additions & 0 deletions redis/src/main/scala/zio/redis/RespArgument.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package zio.redis

import zio.Chunk
import zio.redis.RespValue.BulkString
import zio.redis.codecs.CRC16
import zio.schema.Schema
import zio.schema.codec.BinaryCodec

Expand Down Expand Up @@ -48,6 +49,12 @@ object RespArgument {

final case class Key(bytes: Chunk[Byte]) extends RespArgument {
lazy val value: BulkString = RespValue.BulkString(bytes)

lazy val asCRC16: Int = {
val betweenBraces = bytes.dropWhile(b => b != '{').drop(1).takeWhile(b => b != '}')
val key = if (betweenBraces.isEmpty) bytes else betweenBraces
CRC16.get(key)
}
}

object Key {
Expand Down
7 changes: 0 additions & 7 deletions redis/src/main/scala/zio/redis/RespValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package zio.redis

import zio._
import zio.redis.codecs.CRC16
import zio.redis.options.Cluster.Slot
import zio.stream._

Expand Down Expand Up @@ -75,12 +74,6 @@ object RespValue {
private[redis] def asString: String = decode(value)

private[redis] def asLong: Long = internal.unsafeReadLong(asString, 0)

private[redis] def asCRC16: Int = {
val betweenBraces = value.dropWhile(b => b != '{').drop(1).takeWhile(b => b != '}')
val key = if (betweenBraces.isEmpty) value else betweenBraces
CRC16.get(key)
}
}

final case class Array(values: Chunk[RespValue]) extends RespValue
Expand Down
4 changes: 4 additions & 0 deletions redis/src/main/scala/zio/redis/options/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ object Cluster {

final case class Slot(number: Long) extends AnyVal

object Slot {
val Default: Slot = Slot(1)
}

final case class Node(id: String, address: RedisUri)

final case class SlotRange(start: Long, end: Long) {
Expand Down
4 changes: 2 additions & 2 deletions redis/src/test/scala/zio/redis/ApiSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ object ApiSpec
sortedSetsSuite,
hyperLogLogSuite,
geoSuite,
streamsSuite @@ clusterExecutorUnsupported,
scriptingSpec @@ clusterExecutorUnsupported,
streamsSuite,
scriptingSpec,
clusterSpec
).provideShared(
ClusterExecutor.layer,
Expand Down
2 changes: 1 addition & 1 deletion redis/src/test/scala/zio/redis/BaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait BaseSpec extends ZIOSpecDefault {

/* TODO
* We can try to support the most unsupported commands for cluster with:
* - default connection for commands without a key and for multiple key commands with
* - [DONE] default connection for commands without a key and for multiple key commands with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove the [DONE] :)

* the limitation that all keys have to be in the same slot
* - fork/join approach for commands that operate on keys with different slots
*/
Expand Down
23 changes: 23 additions & 0 deletions redis/src/test/scala/zio/redis/RespArgumentSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package zio.redis

import zio.test._

object RespArgumentSpec extends BaseSpec {
def spec: Spec[Any, RedisError.ProtocolError] =
suite("RespArgument")(
suite("BulkString.asCRC16")(
test("key without braces") {
val key = RespArgument.Key("hello world")
assertTrue(15332 == key.asCRC16)
Comment on lines +10 to +11
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick (I'm fine with leaving it as it is as well):

Suggested change
val key = RespArgument.Key("hello world")
assertTrue(15332 == key.asCRC16)
assertTrue(15332 == RespArgument.Key("hello world").asCRC16)

},
test("key between braces") {
val key = RespArgument.Key("hello{key1}wor}ld")
assertTrue(41957 == key.asCRC16)
},
test("empty key between braces") {
val key = RespArgument.Key("hello{}world")
assertTrue(40253 == key.asCRC16)
}
)
)
}
14 changes: 0 additions & 14 deletions redis/src/test/scala/zio/redis/RespValueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,6 @@ object RespValueSpec extends BaseSpec {
.runCollect
.map(assert(_)(equalTo(values)))
}
),
suite("BulkString.asCRC16")(
test("key without braces") {
val str = RespValue.bulkString("hello world")
assertTrue(15332 == str.asCRC16)
},
test("key between braces") {
val str = RespValue.bulkString("hello{key1}wor}ld")
assertTrue(41957 == str.asCRC16)
},
test("empty key between braces") {
val str = RespValue.bulkString("hello{}world")
assertTrue(40253 == str.asCRC16)
}
)
)
}