diff --git a/docs/index.md b/docs/index.md index 6e7c3ef97..b9478ea0f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -49,21 +49,19 @@ object ZIORedisExample extends ZIOAppDefault { def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec } - val myApp: ZIO[Redis, RedisError, Unit] = for { - redis <- ZIO.service[Redis] - _ <- redis.set("myKey", 8L, Some(1.minutes)) - v <- redis.get("myKey").returning[Long] - _ <- Console.printLine(s"Value of myKey: $v").orDie - _ <- redis.hSet("myHash", ("k1", 6), ("k2", 2)) - _ <- redis.rPush("myList", 1, 2, 3, 4) - _ <- redis.sAdd("mySet", "a", "b", "a", "c") - } yield () - - override def run = myApp.provide( - Redis.layer, - SingleNodeExecutor.local, - ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier) - ) + val myApp: ZIO[Redis, RedisError, Unit] = + for { + redis <- ZIO.service[Redis] + _ <- redis.set("myKey", 8L, Some(1.minutes)) + v <- redis.get("myKey").returning[Long] + _ <- Console.printLine(s"Value of myKey: $v").orDie + _ <- redis.hSet("myHash", ("k1", 6), ("k2", 2)) + _ <- redis.rPush("myList", 1, 2, 3, 4) + _ <- redis.sAdd("mySet", "a", "b", "a", "c") + } yield () + + override def run = + myApp.provide(Redis.local, ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier)) } ``` @@ -108,9 +106,8 @@ object EmbeddedRedisSpec extends ZIOSpecDefault { } ).provideShared( EmbeddedRedis.layer, - SingleNodeExecutor.layer, ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier), - Redis.layer + Redis.singleNode ) @@ TestAspect.silentLogging } ``` diff --git a/modules/benchmarks/src/main/scala/zio/redis/benchmarks/BenchmarkRuntime.scala b/modules/benchmarks/src/main/scala/zio/redis/benchmarks/BenchmarkRuntime.scala index 52456a35e..d47e67b4b 100644 --- a/modules/benchmarks/src/main/scala/zio/redis/benchmarks/BenchmarkRuntime.scala +++ b/modules/benchmarks/src/main/scala/zio/redis/benchmarks/BenchmarkRuntime.scala @@ -35,10 +35,9 @@ trait BenchmarkRuntime { object BenchmarkRuntime { private final val Layer = ZLayer.make[Redis]( - SingleNodeExecutor.local, + Redis.local, ZLayer.succeed[CodecSupplier](new CodecSupplier { def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec - }), - Redis.layer + }) ) } diff --git a/modules/embedded/src/test/scala/zio/redis/embedded/EmbeddedRedisSpec.scala b/modules/embedded/src/test/scala/zio/redis/embedded/EmbeddedRedisSpec.scala index d484f805d..c4a31f84e 100644 --- a/modules/embedded/src/test/scala/zio/redis/embedded/EmbeddedRedisSpec.scala +++ b/modules/embedded/src/test/scala/zio/redis/embedded/EmbeddedRedisSpec.scala @@ -43,11 +43,10 @@ object EmbeddedRedisSpec extends ZIOSpecDefault { } ).provideShared( EmbeddedRedis.layer, - SingleNodeExecutor.layer, ZLayer.succeed[CodecSupplier](new CodecSupplier { def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec }), - Redis.layer + Redis.singleNode ) @@ TestAspect.silentLogging } diff --git a/modules/example/src/main/scala/example/Main.scala b/modules/example/src/main/scala/example/Main.scala index b8132b300..c247963b5 100644 --- a/modules/example/src/main/scala/example/Main.scala +++ b/modules/example/src/main/scala/example/Main.scala @@ -33,8 +33,7 @@ object Main extends ZIOAppDefault { AppConfig.layer, ContributorsCache.layer, HttpClientZioBackend.layer(), - Redis.layer, - SingleNodeExecutor.layer, + Redis.singleNode, ZLayer.succeed[CodecSupplier](new CodecSupplier { def get[A: Schema]: BinaryCodec[A] = ProtobufCodec.protobufCodec }) diff --git a/modules/redis/src/main/scala/zio/redis/Redis.scala b/modules/redis/src/main/scala/zio/redis/Redis.scala index 7c261eb5a..892496007 100644 --- a/modules/redis/src/main/scala/zio/redis/Redis.scala +++ b/modules/redis/src/main/scala/zio/redis/Redis.scala @@ -17,6 +17,7 @@ package zio.redis import zio._ +import zio.redis.internal._ trait Redis extends api.Connection @@ -33,7 +34,16 @@ trait Redis with api.Cluster object Redis { - lazy val layer: URLayer[RedisExecutor with CodecSupplier, Redis] = + lazy val cluster: ZLayer[CodecSupplier & RedisClusterConfig, RedisError, Redis] = + ClusterExecutor.layer >>> makeLayer + + lazy val local: ZLayer[CodecSupplier, RedisError.IOError, Redis] = + SingleNodeExecutor.local >>> makeLayer + + lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, Redis] = + SingleNodeExecutor.layer >>> makeLayer + + private def makeLayer: URLayer[CodecSupplier & RedisExecutor, Redis] = ZLayer { for { codecSupplier <- ZIO.service[CodecSupplier] diff --git a/modules/redis/src/main/scala/zio/redis/api/Cluster.scala b/modules/redis/src/main/scala/zio/redis/api/Cluster.scala index 917433a90..00291dde4 100644 --- a/modules/redis/src/main/scala/zio/redis/api/Cluster.scala +++ b/modules/redis/src/main/scala/zio/redis/api/Cluster.scala @@ -20,7 +20,7 @@ import zio.redis.Input._ import zio.redis.Output.{ChunkOutput, ClusterPartitionOutput, UnitOutput} import zio.redis._ import zio.redis.api.Cluster.{AskingCommand, ClusterSetSlots, ClusterSlots} -import zio.redis.internal.{RedisCommand, RedisEnvironment} +import zio.redis.internal.{RedisCommand, RedisEnvironment, RedisExecutor} import zio.redis.options.Cluster.SetSlotSubCommand._ import zio.redis.options.Cluster.{Partition, Slot} import zio.{Chunk, IO} diff --git a/modules/redis/src/main/scala/zio/redis/internal/ClusterConnection.scala b/modules/redis/src/main/scala/zio/redis/internal/ClusterConnection.scala new file mode 100644 index 000000000..28b4b3c3f --- /dev/null +++ b/modules/redis/src/main/scala/zio/redis/internal/ClusterConnection.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2021 John A. De Goes and the ZIO contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.redis.internal + +import zio._ +import zio.redis._ +import zio.redis.options.Cluster.{Partition, Slot} + +private[redis] final case class ClusterConnection( + partitions: Chunk[Partition], + executors: Map[RedisUri, ExecutorScope], + slots: Map[Slot, RedisUri] +) { + def executor(slot: Slot): Option[RedisExecutor] = executors.get(slots(slot)).map(_.executor) + + def addExecutor(uri: RedisUri, es: ExecutorScope): ClusterConnection = + copy(executors = executors + (uri -> es)) +} diff --git a/modules/redis/src/main/scala/zio/redis/ClusterExecutor.scala b/modules/redis/src/main/scala/zio/redis/internal/ClusterExecutor.scala similarity index 92% rename from modules/redis/src/main/scala/zio/redis/ClusterExecutor.scala rename to modules/redis/src/main/scala/zio/redis/internal/ClusterExecutor.scala index 68fbec6c4..d0929e772 100644 --- a/modules/redis/src/main/scala/zio/redis/ClusterExecutor.scala +++ b/modules/redis/src/main/scala/zio/redis/internal/ClusterExecutor.scala @@ -14,22 +14,23 @@ * limitations under the License. */ -package zio.redis +package zio.redis.internal import zio._ -import zio.redis.ClusterExecutor._ +import zio.redis._ import zio.redis.api.Cluster.AskingCommand -import zio.redis.internal.{RedisConnection, RespCommand, RespCommandArgument, RespValue} import zio.redis.options.Cluster._ import java.io.IOException -final class ClusterExecutor private ( +private[redis] final class ClusterExecutor private ( clusterConnection: Ref.Synchronized[ClusterConnection], config: RedisClusterConfig, scope: Scope.Closeable ) extends RedisExecutor { + import ClusterExecutor._ + def execute(command: RespCommand): IO[RedisError, RespValue] = { def execute(keySlot: Slot) = @@ -93,7 +94,7 @@ final class ClusterExecutor private ( } } -object ClusterExecutor { +private[redis] object ClusterExecutor { lazy val layer: ZLayer[RedisClusterConfig, RedisError, RedisExecutor] = ZLayer.scoped { @@ -106,7 +107,7 @@ object ClusterExecutor { } yield executor } - private[redis] def create( + def create( config: RedisClusterConfig, scope: Scope.Closeable ): ZIO[Scope, RedisError, ClusterExecutor] = @@ -148,16 +149,15 @@ object ClusterExecutor { _ <- layerScope.addFinalizerExit(closableScope.close(_)) } yield ExecutorScope(executor, closableScope) - private def redis(address: RedisUri) = { - val executorLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) >>> SingleNodeExecutor.layer - val codecLayer = ZLayer.succeed[CodecSupplier](CodecSupplier.utf8) - val redisLayer = executorLayer ++ codecLayer >>> Redis.layer + private def redis(address: RedisUri) = for { closableScope <- Scope.make + configLayer = ZLayer.succeed(RedisConfig(address.host, address.port)) + supplierLayer = ZLayer.succeed(CodecSupplier.utf8) + redisLayer = ZLayer.make[Redis](configLayer, supplierLayer, Redis.singleNode) layer <- closableScope.extend[Any](redisLayer.memoize) _ <- logScopeFinalizer("Temporary redis connection is closed") } yield (layer, closableScope) - } private def slotAddress(partitions: Chunk[Partition]) = partitions.flatMap { p => @@ -166,6 +166,7 @@ object ClusterExecutor { private final val CusterKeyExecutorError = RedisError.IOError(new IOException("Executor doesn't found. No way to dispatch this command to Redis Cluster")) + private final val CusterConnectionError = RedisError.IOError(new IOException("The connection to cluster has been failed. Can't reach a single startup node.")) } diff --git a/modules/redis/src/main/scala/zio/redis/internal/ExecutorScope.scala b/modules/redis/src/main/scala/zio/redis/internal/ExecutorScope.scala new file mode 100644 index 000000000..7ec64719c --- /dev/null +++ b/modules/redis/src/main/scala/zio/redis/internal/ExecutorScope.scala @@ -0,0 +1,21 @@ +/* + * Copyright 2021 John A. De Goes and the ZIO contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.redis.internal + +import zio.Scope + +private[redis] final case class ExecutorScope(executor: RedisExecutor, scope: Scope.Closeable) diff --git a/modules/redis/src/main/scala/zio/redis/internal/RedisEnvironment.scala b/modules/redis/src/main/scala/zio/redis/internal/RedisEnvironment.scala index c52de87a7..025aae087 100644 --- a/modules/redis/src/main/scala/zio/redis/internal/RedisEnvironment.scala +++ b/modules/redis/src/main/scala/zio/redis/internal/RedisEnvironment.scala @@ -16,7 +16,7 @@ package zio.redis.internal -import zio.redis.{CodecSupplier, RedisExecutor} +import zio.redis.CodecSupplier import zio.schema.Schema import zio.schema.codec.BinaryCodec diff --git a/modules/redis/src/main/scala/zio/redis/RedisExecutor.scala b/modules/redis/src/main/scala/zio/redis/internal/RedisExecutor.scala similarity index 79% rename from modules/redis/src/main/scala/zio/redis/RedisExecutor.scala rename to modules/redis/src/main/scala/zio/redis/internal/RedisExecutor.scala index 7dafc10c8..8dfb0ec8c 100644 --- a/modules/redis/src/main/scala/zio/redis/RedisExecutor.scala +++ b/modules/redis/src/main/scala/zio/redis/internal/RedisExecutor.scala @@ -14,11 +14,11 @@ * limitations under the License. */ -package zio.redis +package zio.redis.internal import zio.IO -import zio.redis.internal.{RespCommand, RespValue} +import zio.redis.RedisError -trait RedisExecutor { - private[redis] def execute(command: RespCommand): IO[RedisError, RespValue] +private[redis] trait RedisExecutor { + def execute(command: RespCommand): IO[RedisError, RespValue] } diff --git a/modules/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala b/modules/redis/src/main/scala/zio/redis/internal/SingleNodeExecutor.scala similarity index 92% rename from modules/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala rename to modules/redis/src/main/scala/zio/redis/internal/SingleNodeExecutor.scala index fa2a6574c..4b13a07db 100644 --- a/modules/redis/src/main/scala/zio/redis/SingleNodeExecutor.scala +++ b/modules/redis/src/main/scala/zio/redis/internal/SingleNodeExecutor.scala @@ -14,13 +14,13 @@ * limitations under the License. */ -package zio.redis +package zio.redis.internal import zio._ -import zio.redis.SingleNodeExecutor._ -import zio.redis.internal.{RedisConnection, RespCommand, RespValue} +import zio.redis.internal.SingleNodeExecutor._ +import zio.redis.{RedisConfig, RedisError} -final class SingleNodeExecutor private ( +private[redis] final class SingleNodeExecutor private ( connection: RedisConnection, requests: Queue[Request], responses: Queue[Promise[RedisError, RespValue]] @@ -70,20 +70,14 @@ final class SingleNodeExecutor private ( } -object SingleNodeExecutor { +private[redis] object SingleNodeExecutor { lazy val layer: ZLayer[RedisConfig, RedisError.IOError, RedisExecutor] = RedisConnection.layer >>> makeLayer lazy val local: ZLayer[Any, RedisError.IOError, RedisExecutor] = RedisConnection.local >>> makeLayer - private final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue]) - - private final val True: Any => Boolean = _ => true - - private final val RequestQueueSize = 16 - - private[redis] def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] = + def create(connection: RedisConnection): URIO[Scope, SingleNodeExecutor] = for { requests <- Queue.bounded[Request](RequestQueueSize) responses <- Queue.unbounded[Promise[RedisError, RespValue]] @@ -92,6 +86,12 @@ object SingleNodeExecutor { _ <- logScopeFinalizer(s"$executor Node Executor is closed") } yield executor + private final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue]) + + private final val True: Any => Boolean = _ => true + + private final val RequestQueueSize = 16 + private def makeLayer: ZLayer[RedisConnection, RedisError.IOError, RedisExecutor] = ZLayer.scoped(ZIO.serviceWithZIO[RedisConnection](create)) } diff --git a/modules/redis/src/main/scala/zio/redis/internal/package.scala b/modules/redis/src/main/scala/zio/redis/internal/package.scala new file mode 100644 index 000000000..16aad1fc7 --- /dev/null +++ b/modules/redis/src/main/scala/zio/redis/internal/package.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2021 John A. De Goes and the ZIO contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zio.redis + +import zio._ + +package object internal { + private[redis] def logScopeFinalizer(msg: String): URIO[Scope, Unit] = + for { + scope <- ZIO.scope + _ <- scope.addFinalizerExit { + case Exit.Success(_) => ZIO.logTrace(s"$msg with success") + case Exit.Failure(th) => ZIO.logTraceCause(s"$msg with failure", th) + } + } yield () + +} diff --git a/modules/redis/src/main/scala/zio/redis/options/Cluster.scala b/modules/redis/src/main/scala/zio/redis/options/Cluster.scala index c7b6d21a2..afc79701e 100644 --- a/modules/redis/src/main/scala/zio/redis/options/Cluster.scala +++ b/modules/redis/src/main/scala/zio/redis/options/Cluster.scala @@ -16,30 +16,16 @@ package zio.redis.options -import zio.redis.{RedisExecutor, RedisUri} -import zio.{Chunk, Scope} +import zio.Chunk +import zio.redis.RedisUri object Cluster { - private[redis] final val SlotsAmount = 16384 - final case class ExecutorScope(executor: RedisExecutor, scope: Scope.Closeable) - - final case class ClusterConnection( - partitions: Chunk[Partition], - executors: Map[RedisUri, ExecutorScope], - slots: Map[Slot, RedisUri] - ) { - def executor(slot: Slot): Option[RedisExecutor] = executors.get(slots(slot)).map(_.executor) - - def addExecutor(uri: RedisUri, es: ExecutorScope): ClusterConnection = - copy(executors = executors + (uri -> es)) - } - final case class Slot(number: Long) extends AnyVal object Slot { - val Default: Slot = Slot(1) + final val Default: Slot = Slot(1) } final case class Node(id: String, address: RedisUri) diff --git a/modules/redis/src/main/scala/zio/redis/package.scala b/modules/redis/src/main/scala/zio/redis/package.scala index 948ec1623..cb1131fbe 100644 --- a/modules/redis/src/main/scala/zio/redis/package.scala +++ b/modules/redis/src/main/scala/zio/redis/package.scala @@ -28,13 +28,4 @@ package object redis with options.Scripting { type Id[+A] = A - - private[redis] def logScopeFinalizer(msg: String): URIO[Scope, Unit] = - for { - scope <- ZIO.scope - _ <- scope.addFinalizerExit { - case Exit.Success(_) => ZIO.logTrace(s"$msg with success") - case Exit.Failure(th) => ZIO.logTraceCause(s"$msg with failure", th) - } - } yield () } diff --git a/modules/redis/src/test/scala/zio/redis/ApiSpec.scala b/modules/redis/src/test/scala/zio/redis/ApiSpec.scala index eb8e12223..dd910842d 100644 --- a/modules/redis/src/test/scala/zio/redis/ApiSpec.scala +++ b/modules/redis/src/test/scala/zio/redis/ApiSpec.scala @@ -34,11 +34,7 @@ object ApiSpec hashSuite, streamsSuite, scriptingSpec - ).provideShared( - SingleNodeExecutor.local, - Redis.layer, - ZLayer.succeed(ProtobufCodecSupplier) - ) + ).provideShared(Redis.local, ZLayer.succeed(ProtobufCodecSupplier)) private val clusterSuite = suite("Cluster executor")( @@ -55,8 +51,7 @@ object ApiSpec scriptingSpec, clusterSpec ).provideShared( - ClusterExecutor.layer, - Redis.layer, + Redis.cluster, ZLayer.succeed(ProtobufCodecSupplier), ZLayer.succeed(RedisClusterConfig(Chunk(RedisUri("localhost", 5000)))) ).filterNotTags(_.contains(BaseSpec.ClusterExecutorUnsupported)) diff --git a/modules/redis/src/test/scala/zio/redis/KeysSpec.scala b/modules/redis/src/test/scala/zio/redis/KeysSpec.scala index 57ca511eb..c205cf003 100644 --- a/modules/redis/src/test/scala/zio/redis/KeysSpec.scala +++ b/modules/redis/src/test/scala/zio/redis/KeysSpec.scala @@ -499,9 +499,8 @@ object KeysSpec { ZLayer .make[Redis]( ZLayer.succeed(RedisConfig("localhost", 6380)), - SingleNodeExecutor.layer, ZLayer.succeed[CodecSupplier](ProtobufCodecSupplier), - Redis.layer + Redis.singleNode ) .fresh } diff --git a/modules/redis/src/test/scala/zio/redis/ClusterExecutorSpec.scala b/modules/redis/src/test/scala/zio/redis/internal/ClusterExecutorSpec.scala similarity index 92% rename from modules/redis/src/test/scala/zio/redis/ClusterExecutorSpec.scala rename to modules/redis/src/test/scala/zio/redis/internal/ClusterExecutorSpec.scala index d124db050..16ac7c9b5 100644 --- a/modules/redis/src/test/scala/zio/redis/ClusterExecutorSpec.scala +++ b/modules/redis/src/test/scala/zio/redis/internal/ClusterExecutorSpec.scala @@ -1,7 +1,7 @@ -package zio.redis +package zio.redis.internal import zio._ -import zio.redis.internal.CRC16 +import zio.redis._ import zio.redis.options.Cluster.{Slot, SlotsAmount} import zio.test._ @@ -28,9 +28,7 @@ object ClusterExecutorSpec extends BaseSpec { value2 <- redis.get(key).returning[String] // have to redirect without error ASK value3 <- redis.get(key).returning[String] // have to redirect without creating new connection _ <- ZIO.serviceWithZIO[Redis](_.setSlotStable(keySlot)).provideLayer(destMasterConn) - } yield { - assertTrue(value1 == value2) && assertTrue(value2 == value3) - } + } yield assertTrue(value1 == value2) && assertTrue(value2 == value3) } @@ TestAspect.flaky, test("check client responsiveness when Moved redirect happened") { for { @@ -59,28 +57,25 @@ object ClusterExecutorSpec extends BaseSpec { _ <- ZIO.serviceWithZIO[Redis](_.setSlotNode(keySlot, destMaster.id)).provideLayer(sourceMasterConn) value2 <- redis.get(key).returning[String] // have to refresh connection value3 <- redis.get(key).returning[String] // have to get value without refreshing connection - } yield { - assertTrue(value1 == value2) && assertTrue(value2 == value3) - } + } yield assertTrue(value1 == value2) && assertTrue(value2 == value3) } ).provideLayerShared(ClusterLayer) private final def getRedisNodeLayer(uri: RedisUri): Layer[Any, Redis] = ZLayer.make[Redis]( ZLayer.succeed(RedisConfig(uri.host, uri.port)), - SingleNodeExecutor.layer, ZLayer.succeed(ProtobufCodecSupplier), - Redis.layer + Redis.singleNode ) private val ClusterLayer: Layer[Any, Redis] = { val address1 = RedisUri("localhost", 5010) val address2 = RedisUri("localhost", 5000) + ZLayer.make[Redis]( ZLayer.succeed(RedisClusterConfig(Chunk(address1, address2))), - ClusterExecutor.layer.orDie, ZLayer.succeed(ProtobufCodecSupplier), - Redis.layer + Redis.cluster ) } }