Skip to content

Commit

Permalink
add redis service that include codec and executor
Browse files Browse the repository at this point in the history
all redis commands depend on redis service
remove redis executor dependency from codec
  • Loading branch information
anatolysergeev committed Feb 21, 2022
1 parent 17bc2ca commit 915a517
Show file tree
Hide file tree
Showing 32 changed files with 350 additions and 311 deletions.
10 changes: 6 additions & 4 deletions benchmarks/src/main/scala/zio/redis/BenchmarksUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@
package zio.redis

import cats.effect.{IO => CatsIO}

import zio.{BootstrapRuntime, Has, ZIO, ZLayer}
import zio.logging.Logging
import zio.redis.codec.StringUtf8Codec
import zio.schema.codec.Codec
import zio.{BootstrapRuntime, ZIO, ZLayer}

trait BenchmarksUtils {
self: RedisClients with BootstrapRuntime =>

def unsafeRun[CL](f: CL => CatsIO[Unit])(implicit unsafeRunner: QueryUnsafeRunner[CL]): Unit =
unsafeRunner.unsafeRun(f)

def zioUnsafeRun(source: ZIO[RedisExecutor, RedisError, Unit]): Unit =
def zioUnsafeRun(source: ZIO[Has[Redis], RedisError, Unit]): Unit =
unsafeRun(source.provideLayer(BenchmarksUtils.Layer))
}

object BenchmarksUtils {
private final val Layer = Logging.ignore ++ ZLayer.succeed[Codec](StringUtf8Codec) >>> RedisExecutor.local.orDie
private final val Layer = {
val executor = Logging.ignore >>> RedisExecutor.local.orDie
executor ++ ZLayer.succeed[Codec](StringUtf8Codec) >>> Redis.live
}
}
8 changes: 4 additions & 4 deletions example/src/main/scala/example/ContributorsCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,31 @@ object ContributorsCache {
def fetchAll(repository: Repository): IO[ApiError, Contributors]
}

lazy val live: ZLayer[RedisExecutor with SttpClient, Nothing, ContributorsCache] =
lazy val live: ZLayer[Has[Redis] with SttpClient, Nothing, ContributorsCache] =
ZLayer.fromFunction { env =>
new Service {
def fetchAll(repository: Repository): IO[ApiError, Contributors] =
(read(repository) <> retrieve(repository)).provide(env)
}
}

private[this] def read(repository: Repository): ZIO[RedisExecutor, ApiError, Contributors] =
private[this] def read(repository: Repository): ZIO[Has[Redis], ApiError, Contributors] =
get(repository.key)
.returning[String]
.someOrFail(ApiError.CacheMiss(repository.key))
.map(_.fromJson[Contributors])
.rightOrFail(ApiError.CorruptedData)
.refineToOrDie[ApiError]

private[this] def retrieve(repository: Repository): ZIO[RedisExecutor with SttpClient, ApiError, Contributors] =
private[this] def retrieve(repository: Repository): ZIO[Has[Redis] with SttpClient, ApiError, Contributors] =
for {
req <- ZIO.succeed(basicRequest.get(urlOf(repository)).response(asJson[Chunk[Contributor]]))
res <- send(req).orElseFail(GithubUnreachable)
contributors <- res.body.fold(_ => ZIO.fail(UnknownProject(urlOf(repository).toString)), ZIO.succeed(_))
_ <- cache(repository, contributors)
} yield Contributors(contributors)

private def cache(repository: Repository, contributors: Chunk[Contributor]): URIO[RedisExecutor, Any] =
private def cache(repository: Repository, contributors: Chunk[Contributor]): URIO[Has[Redis], Any] =
ZIO
.fromOption(NonEmptyChunk.fromChunk(contributors))
.map(Contributors(_).toJson)
Expand Down
14 changes: 7 additions & 7 deletions example/src/main/scala/example/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ import com.typesafe.config.ConfigFactory
import example.api.Api
import example.config.{AppConfig, ServerConfig}
import sttp.client3.asynchttpclient.zio.AsyncHttpClientZioBackend
import zhttp.service.server.ServerChannelFactory
import zhttp.service.{EventLoopGroup, Server}

import zhttp.service.server.ServerChannelFactory
import zio._
import zio.config.getConfig
import zio.config.syntax._
import zio.config.typesafe.TypesafeConfig
import zio.console._
import zio.logging.Logging
import zio.magic._
import zio.redis.RedisExecutor
import zio.redis.{Redis, RedisExecutor}
import zio.redis.codec.StringUtf8Codec
import zio.schema.codec.Codec

Expand All @@ -41,10 +40,11 @@ object Main extends App {
private val serverConfig = config.narrow(_.server)
private val redisConfig = config.narrow(_.redis)

private val codec = ZLayer.succeed[Codec](StringUtf8Codec)
private val redis = Logging.ignore ++ redisConfig ++ codec >>> RedisExecutor.live
private val sttp = AsyncHttpClientZioBackend.layer()
private val cache = redis ++ sttp >>> ContributorsCache.live
private val codec = ZLayer.succeed[Codec](StringUtf8Codec)
private val redisExecutor = Logging.ignore ++ redisConfig >>> RedisExecutor.live
private val redis = redisExecutor ++ codec >>> Redis.live
private val sttp = AsyncHttpClientZioBackend.layer()
private val cache = redis ++ sttp >>> ContributorsCache.live

def run(args: List[String]): URIO[ZEnv, ExitCode] =
getConfig[ServerConfig]
Expand Down
42 changes: 42 additions & 0 deletions redis/src/main/scala/zio/redis/Redis.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021 John A. De Goes and the ZIO contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package zio.redis

import zio.{Has, URLayer, ZIO, ZLayer}
import zio.schema.codec.Codec

trait Redis {
def codec: Codec
def executor: RedisExecutor
}

object Redis {

lazy val live: URLayer[Has[RedisExecutor] with Has[Codec], Has[Redis]] =
ZLayer.identity[Has[Codec]] ++ ZLayer.identity[Has[RedisExecutor]] >>> RedisService

private[this] final val RedisService: ZLayer[Has[Codec] with Has[RedisExecutor], Nothing, Has[Redis]] =
ZIO
.services[Codec, RedisExecutor]
.map { env =>
new Redis {
val codec: Codec = env._1
val executor: RedisExecutor = env._2
}
}
.toLayer
}
14 changes: 7 additions & 7 deletions redis/src/main/scala/zio/redis/RedisCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package zio.redis

import zio.ZIO
import zio._
import zio.redis.Input.{StringInput, Varargs}

final class RedisCommand[-In, +Out] private (val name: String, val input: Input[In], val output: Output[Out]) {
private[redis] def run(in: In): ZIO[RedisExecutor, RedisError, Out] =
private[redis] def run(in: In): ZIO[Has[Redis], RedisError, Out] =
ZIO
.accessM[RedisExecutor] { executor =>
val service = executor.get
val codec = service.codec
val command = Varargs(StringInput).encode(name.split(" "))(codec) ++ input.encode(in)(codec)
service.execute(command).flatMap[Any, Throwable, Out](out => ZIO.effect(output.unsafeDecode(out)(codec)))
.serviceWith[Redis] { service =>
val executor = service.executor
val codec = service.codec
val command = Varargs(StringInput).encode(name.split(" "))(codec) ++ input.encode(in)(codec)
executor.execute(command).flatMap[Any, Throwable, Out](out => ZIO.effect(output.unsafeDecode(out)(codec)))
}
.refineToOrDie[RedisError]
}
Expand Down
32 changes: 15 additions & 17 deletions redis/src/main/scala/zio/redis/RedisExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ package zio.redis
import zio._
import zio.clock.Clock
import zio.logging._
import zio.schema.codec.Codec

object RedisExecutor {
trait Service {
def codec: Codec
trait RedisExecutor {
def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue]
}

def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue]
}
object RedisExecutor {

lazy val live: ZLayer[Logging with Has[RedisConfig] with Has[Codec], RedisError.IOError, RedisExecutor] =
ZLayer.identity[Logging] ++ ByteStream.live ++ ZLayer.identity[Has[Codec]] >>> StreamedExecutor
lazy val live: ZLayer[Logging with Has[RedisConfig], RedisError.IOError, Has[RedisExecutor]] =
ZLayer.identity[Logging] ++ ByteStream.live >>> StreamedExecutor

lazy val local: ZLayer[Logging with Has[Codec], RedisError.IOError, RedisExecutor] =
ZLayer.identity[Logging] ++ ByteStream.default ++ ZLayer.identity[Has[Codec]] >>> StreamedExecutor
lazy val local: ZLayer[Logging, RedisError.IOError, Has[RedisExecutor]] =
ZLayer.identity[Logging] ++ ByteStream.default >>> StreamedExecutor

lazy val test: URLayer[zio.random.Random with Clock, RedisExecutor] = TestExecutor.live
lazy val test: URLayer[zio.random.Random with Clock, Has[RedisExecutor]] =
TestExecutor.live

private[this] final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])

Expand All @@ -44,12 +43,12 @@ object RedisExecutor {

private[this] final val StreamedExecutor =
ZLayer
.fromServicesManaged[ByteStream.Service, Logger[String], Codec, Any, RedisError.IOError, RedisExecutor.Service] {
(byteStream, logging, codec) =>
.fromServicesManaged[ByteStream.Service, Logger[String], Any, RedisError.IOError, RedisExecutor] {
(byteStream, logging) =>
for {
reqQueue <- Queue.bounded[Request](RequestQueueSize).toManaged_
resQueue <- Queue.unbounded[Promise[RedisError, RespValue]].toManaged_
live = new Live(reqQueue, resQueue, byteStream, logging, codec)
live = new Live(reqQueue, resQueue, byteStream, logging)
_ <- live.run.forkManaged
} yield live
}
Expand All @@ -58,9 +57,8 @@ object RedisExecutor {
reqQueue: Queue[Request],
resQueue: Queue[Promise[RedisError, RespValue]],
byteStream: ByteStream.Service,
logger: Logger[String],
override val codec: Codec
) extends Service {
logger: Logger[String]
) extends RedisExecutor {

def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue] =
Promise
Expand Down
10 changes: 5 additions & 5 deletions redis/src/main/scala/zio/redis/ResultBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package zio.redis

import zio.ZIO
import zio.{Has, ZIO}
import zio.redis.ResultBuilder.NeedsReturnType
import zio.schema.Schema

Expand All @@ -33,18 +33,18 @@ object ResultBuilder {
final abstract class NeedsReturnType

trait ResultBuilder1[+F[_]] extends ResultBuilder {
def returning[R: Schema]: ZIO[RedisExecutor, RedisError, F[R]]
def returning[R: Schema]: ZIO[Has[Redis], RedisError, F[R]]
}

trait ResultBuilder2[+F[_, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema]: ZIO[RedisExecutor, RedisError, F[R1, R2]]
def returning[R1: Schema, R2: Schema]: ZIO[Has[Redis], RedisError, F[R1, R2]]
}

trait ResultBuilder3[+F[_, _, _]] extends ResultBuilder {
def returning[R1: Schema, R2: Schema, R3: Schema]: ZIO[RedisExecutor, RedisError, F[R1, R2, R3]]
def returning[R1: Schema, R2: Schema, R3: Schema]: ZIO[Has[Redis], RedisError, F[R1, R2, R3]]
}

trait ResultOutputBuilder extends ResultBuilder {
def returning[R: Output]: ZIO[RedisExecutor, RedisError, R]
def returning[R: Output]: ZIO[Has[Redis], RedisError, R]
}
}
8 changes: 2 additions & 6 deletions redis/src/main/scala/zio/redis/TestExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import zio.clock.Clock
import zio.duration._
import zio.redis.RedisError.ProtocolError
import zio.redis.RespValue.{bulkString, BulkString}
import zio.redis.codec.StringUtf8Codec
import zio.redis.TestExecutor.{KeyInfo, KeyType}
import zio.schema.codec.Codec
import zio.stm.{random => _, _}

import scala.annotation.tailrec
Expand All @@ -45,9 +43,7 @@ private[redis] final class TestExecutor private (
hashes: TMap[String, Map[String, String]],
sortedSets: TMap[String, Map[String, Double]],
clock: Clock
) extends RedisExecutor.Service {

override val codec: Codec = StringUtf8Codec
) extends RedisExecutor {

override def execute(command: Chunk[RespValue.BulkString]): IO[RedisError, RespValue] =
for {
Expand Down Expand Up @@ -3931,7 +3927,7 @@ private[redis] object TestExecutor {
lazy val redisType: RedisType = KeyType.toRedisType(`type`)
}

lazy val live: URLayer[zio.random.Random with Clock, RedisExecutor] = {
lazy val live: URLayer[zio.random.Random with Clock, Has[RedisExecutor]] = {
val executor = for {
seed <- random.nextInt
clock <- ZIO.identity[Clock]
Expand Down
Loading

0 comments on commit 915a517

Please sign in to comment.