Skip to content

Commit

Permalink
Switch to Pekko and ZIO 2
Browse files Browse the repository at this point in the history
  • Loading branch information
pwliwanow committed Apr 9, 2024
1 parent 996bb0f commit 91d1271
Show file tree
Hide file tree
Showing 174 changed files with 3,156 additions and 3,062 deletions.
33 changes: 17 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,26 @@ lazy val integrationTestSettings = inConfig(Build.ITest)(Defaults.testTasks)
lazy val util = Build
.defineProject("util")
.settings(
libraryDependencies ++= akkaSlf4jDeps ++ squantsDeps ++ loggingDeps ++
libraryDependencies ++= pekkoSlf4jDeps ++ squantsDeps ++ loggingDeps ++
pureconfigDeps ++ microlibsDeps ++ prometheusClientDeps ++ refinedDeps
)

lazy val testkit = Build
.defineProject("testkit")
.settings(
libraryDependencies ++= akkaTestDeps ++ scalatestDeps ++ janinoDeps ++ zioTestDeps,
libraryDependencies ++= pekkoTestDeps ++ scalatestDeps ++ janinoDeps ++ zioTestDeps,
Compile / packageBin / mappings ~= { _.filter(_._1.name != "logback-test.xml") }
)
.dependsOn(fp, util)

lazy val fp = Build
.defineProject("fp")
.settings(
libraryDependencies ++= akkaStreamDeps ++ zioDeps ++ logstageDeps ++ sourcecodeDeps ++ pprintDeps ++ zioMagicDeps.map(
libraryDependencies ++= pekkoStreamDeps ++ zioDeps ++ logstageDeps ++ sourcecodeDeps ++ pprintDeps ++ zioMagicDeps.map(
_ % "test"
)
),
// todo remove it after bumping logstage dependencies
dependencyOverrides ++= circeDeps
)
.dependsOn(util)

Expand All @@ -75,13 +77,13 @@ lazy val stream = Build

lazy val dstream = Build
.defineProject("dstream")
.enablePlugins(AkkaGrpcPlugin)
.enablePlugins(PekkoGrpcPlugin)
.settings(
dependencyOverrides ++= akkaDiscoveryOverrideDeps,
libraryDependencies ++= akkaGrpcRuntimeDeps ++ enumeratumDeps ++ (zioMagicDeps ++ akkaTestDeps ++ zioTestDeps).map(
dependencyOverrides ++= pekkoDiscoveryOverrideDeps,
libraryDependencies ++= pekkoGrpcRuntimeDeps ++ enumeratumDeps ++ (zioMagicDeps ++ pekkoTestDeps ++ zioTestDeps).map(
_ % "test"
),
akkaGrpcCodeGeneratorSettings += "server_power_apis",
pekkoGrpcCodeGeneratorSettings += "server_power_apis",
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
.dependsOn(metric, stream)
Expand Down Expand Up @@ -130,7 +132,7 @@ lazy val kvdbFdb = Build
lazy val graphql = Build
.defineProject("graphql")
.settings(
libraryDependencies ++= calibanDeps
libraryDependencies ++= calibanDeps ++ circeDeps ++ jsoniterDeps
)
.dependsOn(fp, stream)

Expand Down Expand Up @@ -168,7 +170,7 @@ lazy val kvdbCodecPrimitiveValue = Build
lazy val metric = Build
.defineProject("metric")
.settings(
libraryDependencies ++= prometheusClientDeps ++ pureconfigDeps ++ zioCoreDeps ++ akkaStreamDeps ++ scalatestDeps.map(
libraryDependencies ++= prometheusClientDeps ++ pureconfigDeps ++ zioCoreDeps ++ pekkoStreamDeps ++ scalatestDeps.map(
_ % "test"
)
)
Expand All @@ -184,7 +186,7 @@ lazy val prometheus = Build
.defineProject("prometheus")
.settings(Build.createScalapbSettings(withGrpc = false))
.settings(
dependencyOverrides ++= akkaDiscoveryOverrideDeps,
dependencyOverrides ++= pekkoDiscoveryOverrideDeps,
libraryDependencies ++= scalapbRuntimeDeps
)

Expand All @@ -206,12 +208,12 @@ lazy val jwt = Build

lazy val sample = Build
.defineProject("sample")
.enablePlugins(AkkaGrpcPlugin)
.enablePlugins(PekkoGrpcPlugin)
.settings(
dependencyOverrides ++= akkaDiscoveryOverrideDeps,
dependencyOverrides ++= pekkoDiscoveryOverrideDeps,
libraryDependencies ++= janinoDeps ++ pprintDeps ++ zioMagicDeps,
publish / skip := true,
akkaGrpcCodeGeneratorSettings += "server_power_apis",
pekkoGrpcCodeGeneratorSettings += "server_power_apis",
scalacOptions ++= Seq(
s"-Wconf:src=${(Compile / sourceManaged).value.getCanonicalPath}/dev/chopsticks/sample/app/proto/.*&cat=deprecation:s"
),
Expand Down Expand Up @@ -256,7 +258,7 @@ lazy val avro4s = Build
lazy val openapi = Build
.defineProject("openapi")
.settings(
libraryDependencies ++= tapirDeps ++ zioSchemaDeps
libraryDependencies ++= circeDeps ++ tapirDeps ++ zioSchemaDeps
)
.dependsOn(util)

Expand All @@ -272,7 +274,6 @@ lazy val root = (project in file("."))
name := "chopsticks",
publish / skip := true,
dependencyUpdatesFilter -= moduleFilter(organization = "org.scala-lang")
// Build.ossPublishSettings
)
.aggregate(
util,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
package dev.chopsticks.dstream

import akka.NotUsed
import akka.grpc.GrpcClientSettings
import akka.grpc.scaladsl.{AkkaGrpcClient, StreamResponseRequestBuilder}
import akka.stream.scaladsl.Source
import dev.chopsticks.fp.akka_env.AkkaEnv
import zio.{UIO, URIO, URLayer, ZIO}
import org.apache.pekko.NotUsed
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.grpc.scaladsl.{PekkoGrpcClient, StreamResponseRequestBuilder}
import org.apache.pekko.stream.scaladsl.Source
import dev.chopsticks.fp.pekko_env.PekkoEnv
import zio.{UIO, URIO, URLayer, ZIO, ZLayer}

object DstreamClient {
trait Service[Assignment, Result] {
def requestBuilder(settings: GrpcClientSettings)
: UIO[Int => StreamResponseRequestBuilder[Source[Result, NotUsed], Assignment]]
}
trait DstreamClient[Assignment, Result] {
def requestBuilder(settings: GrpcClientSettings)
: UIO[Int => StreamResponseRequestBuilder[Source[Result, NotUsed], Assignment]]
}

object DstreamClient {
final class DstreamClientApiBuilder[Assignment, Result] {
def apply[R, Client <: AkkaGrpcClient](
def apply[R, Client <: PekkoGrpcClient](
makeClient: GrpcClientSettings => URIO[R, Client]
)(makeRequest: (Client, Int) => StreamResponseRequestBuilder[Source[Result, NotUsed], Assignment])(implicit
t1: zio.Tag[Assignment],
t2: zio.Tag[Result]
): URLayer[AkkaEnv with R, DstreamClient[Assignment, Result]] = {
val effect: ZIO[R with AkkaEnv, Nothing, Service[Assignment, Result]] = for {
): URLayer[PekkoEnv with R, DstreamClient[Assignment, Result]] = {
val effect: ZIO[R with PekkoEnv, Nothing, DstreamClient[Assignment, Result]] = for {
env <- ZIO.environment[R]
} yield {
new Service[Assignment, Result] {
new DstreamClient[Assignment, Result] {
override def requestBuilder(settings: GrpcClientSettings)
: UIO[Int => StreamResponseRequestBuilder[Source[Result, NotUsed], Assignment]] = {
makeClient(settings)
.provide(env)
.provideEnvironment(env)
.map(client => workerId => makeRequest(client, workerId))
}
}
}

effect.toLayer
ZLayer(effect)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dev.chopsticks.dstream

import akka.NotUsed
import akka.stream.scaladsl.Source
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.Source
import grpc.health.v1.HealthCheckResponse.ServingStatus
import grpc.health.v1.{Health, HealthCheckRequest, HealthCheckResponse}

Expand Down
Loading

0 comments on commit 91d1271

Please sign in to comment.