From bb2416a0c9d18ab464bcab5fb7e5b4aec0edc06e Mon Sep 17 00:00:00 2001 From: Pawel Iwanow <7759586+pwliwanow@users.noreply.github.com> Date: Tue, 9 Apr 2024 17:31:10 +0200 Subject: [PATCH] Migrate ZRunnable, fix interruption handling in the measurementHandleInterruption, add provideSomeEnvironment and provideEnvironment to the ZAkkaFlow and ZAkkaSource --- .../scala/dev/chopsticks/fp/ZManageable.scala | 78 ---------- .../scala/dev/chopsticks/fp/ZRunnable.scala | 143 +++++++----------- .../dev/chopsticks/fp/zio_ext/package.scala | 43 +++--- .../GraphQlSubscriptionSource.scala | 1 - .../dev/chopsticks/stream/ZAkkaFlow.scala | 14 +- .../dev/chopsticks/stream/ZAkkaSource.scala | 13 +- version.sbt | 2 +- 7 files changed, 101 insertions(+), 193 deletions(-) delete mode 100644 chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZManageable.scala diff --git a/chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZManageable.scala b/chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZManageable.scala deleted file mode 100644 index faacbfe2..00000000 --- a/chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZManageable.scala +++ /dev/null @@ -1,78 +0,0 @@ -//package dev.chopsticks.fp -// -//import zio.{Scope, Tag, URLayer, ZEnvironment, ZIO, ZLayer} -// -////final class ZManageable1[A, R: Tag, E, V](fn: A => ZIO[R with Scope, E, V]) { -//// def toLayer[S: zio.Tag](serviceFactory: (A => ZIO[Scope, E, V]) => S): URLayer[R, S] = { -//// ZLayer.scoped { -//// for { -//// env <- ZIO.environment[R] -//// } yield { -//// val newFn = (arg: A) => fn(arg).provideSome[Scope](ZLayer.succeed(env.get)) -//// serviceFactory(newFn) -//// } -//// } -//// ZManaged -//// .access[R](env => { -//// val newFn = (arg: A) => fn(arg).provide(env) -//// serviceFactory(newFn) -//// }) -//// .toLayer -//// } -// -//// def toManaged[S: Tag]: ZManaged[R, E, A => ZIO[Any with Scope, E, V]] = { -//// ZManaged.environment[R].map { env => -//// (arg: A) => fn(arg).provideEnvironment(ZEnvironment(env)) -//// } -//// } -// -//// def toManaged[S]: ZManaged[R, E, A => ZManaged[Any, E, V]] = { -//// ZManaged -//// .access[R](env => { -//// (arg: A) => fn(arg).provide(env) -//// }) -//// } -//// } -// -////final class ZManageable2[A1, A2, R, E, V](fn: (A1, A2) => ZManaged[R, E, V]) { -//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2) => ZManaged[Any, E, V]) => S): URLayer[R, Has[S]] = { -//// ZManaged -//// .access[R](env => { -//// val newFn = (arg1: A1, arg2: A2) => fn(arg1, arg2).provide(env) -//// serviceFactory(newFn) -//// }) -//// .toLayer -//// } -//// -//// def toZManaged: ZManaged[R, E, (A1, A2) => ZManaged[Any, E, V]] = { -//// ZManaged -//// .access[R](env => { -//// (arg1: A1, arg2: A2) => fn(arg1, arg2).provide(env) -//// }) -//// } -////} -//// -////final class ZManageable3[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZManaged[R, E, V]) { -//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3) => ZManaged[Any, E, V]) => S): URLayer[R, Has[S]] = { -//// ZManaged -//// .access[R](env => { -//// val newFn = (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provide(env) -//// serviceFactory(newFn) -//// }) -//// .toLayer -//// } -//// -//// def toZManaged: ZManaged[R, E, (A1, A2, A3) => ZManaged[Any, E, V]] = { -//// ZManaged -//// .access[R](env => { -//// (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provide(env) -//// }) -//// } -////} -// -////object ZManageable { -//// def apply[A, R: Tag, E, V](fn: A => ZIO[R with Scope, E, V]): ZManageable1[A, R, E, V] = new ZManageable1(fn) -////// def apply[A1, A2, R, E, V](fn: (A1, A2) => ZManaged[R, E, V]): ZManageable2[A1, A2, R, E, V] = new ZManageable2(fn) -////// def apply[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZManaged[R, E, V]): ZManageable3[A1, A2, A3, R, E, V] = -////// new ZManageable3(fn) -////} diff --git a/chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZRunnable.scala b/chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZRunnable.scala index 1b9a1a4a..ad4f97fc 100644 --- a/chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZRunnable.scala +++ b/chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZRunnable.scala @@ -1,86 +1,57 @@ -//package dev.chopsticks.fp -// -//import zio.{Scope, URIO, URLayer, ZEnvironment, ZIO, ZLayer} -// -//final class ZRunnable1[A, R, E, V](fn: A => ZIO[R with Scope, E, V]) { -// def toLayer[S: zio.Tag](serviceFactory: (A => ZIO[Any, E, V]) => S): URLayer[R, S] = { -// val io: URIO[R with Scope, S] = ZIO -// .environmentWith[R with Scope]((env: ZEnvironment[R with Scope]) => { -// val newFn = (arg: A) => fn(arg).provideEnvironment(env) -// serviceFactory(newFn) -// }) -// val res: ZLayer[R with Scope, E, S] = ZLayer.scoped(io) -// res -// } -// -// def toZIO: ZIO[R, E, A => ZIO[Any, E, V]] = { -// ZIO -// .environmentWith[R](env => { -// (arg: A) => fn(arg).provideEnvironment(env) -// }) -// } -//} -// -////final class ZRunnable2[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]) { -//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2) => ZIO[Any, E, V]) => S): URLayer[R, Has[S]] = { -//// ZManaged -//// .access[R](env => { -//// val newFn = (arg1: A1, arg2: A2) => fn(arg1, arg2).provide(env) -//// serviceFactory(newFn) -//// }) -//// .toLayer -//// } -//// -//// def toZIO: ZIO[R, E, (A1, A2) => ZIO[Any, E, V]] = { -//// ZIO -//// .access[R](env => { -//// (arg1: A1, arg2: A2) => fn(arg1, arg2).provide(env) -//// }) -//// } -////} -//// -////final class ZRunnable3[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]) { -//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3) => ZIO[Any, E, V]) => S): URLayer[R, Has[S]] = { -//// ZManaged -//// .access[R](env => { -//// val newFn = (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provide(env) -//// serviceFactory(newFn) -//// }) -//// .toLayer -//// } -//// -//// def toZIO: ZIO[R, E, (A1, A2, A3) => ZIO[Any, E, V]] = { -//// ZIO -//// .access[R](env => { -//// (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provide(env) -//// }) -//// } -////} -//// -////final class ZRunnable4[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]) { -//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3, A4) => ZIO[Any, E, V]) => S): URLayer[R, Has[S]] = { -//// ZManaged -//// .access[R](env => { -//// val newFn = (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => fn(arg1, arg2, arg3, arg4).provide(env) -//// serviceFactory(newFn) -//// }) -//// .toLayer -//// } -//// -//// def toZIO: ZIO[R, E, (A1, A2, A3, A4) => ZIO[Any, E, V]] = { -//// ZIO -//// .access[R](env => { -//// (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => fn(arg1, arg2, arg3, arg4).provide(env) -//// }) -//// } -////} -//// -////object ZRunnable { -//// def apply[A, R, E, V](fn: A => ZIO[R, E, V]): ZRunnable1[A, R, E, V] = new ZRunnable1[A, R, E, V](fn) -//// def apply[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]): ZRunnable2[A1, A2, R, E, V] = -//// new ZRunnable2[A1, A2, R, E, V](fn) -//// def apply[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]): ZRunnable3[A1, A2, A3, R, E, V] = -//// new ZRunnable3[A1, A2, A3, R, E, V](fn) -//// def apply[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]): ZRunnable4[A1, A2, A3, A4, R, E, V] = -//// new ZRunnable4[A1, A2, A3, A4, R, E, V](fn) -////} +package dev.chopsticks.fp + +import zio.{Scope, URIO, URLayer, ZEnvironment, ZIO, ZLayer} + +final class ZRunnable1[A, R, E, V](fn: A => ZIO[R with Scope, E, V]) { + def toLayer[S: zio.Tag](serviceFactory: (A => ZIO[Any, E, V]) => S): URLayer[R, S] = { + val io: URIO[R with Scope, S] = ZIO + .environmentWith[R with Scope]((env: ZEnvironment[R with Scope]) => { + val newFn = (arg: A) => fn(arg).provideEnvironment(env) + serviceFactory(newFn) + }) + ZLayer.scoped[R](io) + } +} + +final class ZRunnable2[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]) { + def toLayer[S: zio.Tag](serviceFactory: ((A1, A2) => ZIO[Any, E, V]) => S): URLayer[R, S] = { + val io = ZIO + .environmentWith[R with Scope](env => { + val newFn = (arg1: A1, arg2: A2) => fn(arg1, arg2).provideEnvironment(env) + serviceFactory(newFn) + }) + ZLayer.scoped[R](io) + } +} + +final class ZRunnable3[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]) { + def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3) => ZIO[Any, E, V]) => S): URLayer[R, S] = { + val io = ZIO + .environmentWith[R with Scope](env => { + val newFn = (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provideEnvironment(env) + serviceFactory(newFn) + }) + ZLayer.scoped[R](io) + } +} + +final class ZRunnable4[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]) { + def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3, A4) => ZIO[Any, E, V]) => S): URLayer[R, S] = { + val io = ZIO + .environmentWith[R](env => { + val newFn = (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => fn(arg1, arg2, arg3, arg4).provideEnvironment(env) + serviceFactory(newFn) + }) + ZLayer.scoped[R](io) + } +} + +object ZRunnable { + def apply[A, R, E, V](fn: A => ZIO[R, E, V]): ZRunnable1[A, R, E, V] = new ZRunnable1[A, R, E, V](fn) + def apply[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]): ZRunnable2[A1, A2, R, E, V] = + new ZRunnable2[A1, A2, R, E, V](fn) + def apply[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]): ZRunnable3[A1, A2, A3, R, E, V] = + new ZRunnable3[A1, A2, A3, R, E, V](fn) + def apply[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]): ZRunnable4[A1, A2, A3, A4, R, E, V] = + new ZRunnable4[A1, A2, A3, A4, R, E, V](fn) +} diff --git a/chopsticks-fp/src/main/scala/dev/chopsticks/fp/zio_ext/package.scala b/chopsticks-fp/src/main/scala/dev/chopsticks/fp/zio_ext/package.scala index 14cdb2e1..ec596b6d 100644 --- a/chopsticks-fp/src/main/scala/dev/chopsticks/fp/zio_ext/package.scala +++ b/chopsticks-fp/src/main/scala/dev/chopsticks/fp/zio_ext/package.scala @@ -132,23 +132,32 @@ package object zio_ext { private def measurementHandleInterruption[R1, E2, A2](name: String, startTime: Long)(io: ZIO[R1, E2, A2])(implicit ctx: LogCtx - ) = { - ZIO - .acquireReleaseExitWith(io.interruptible.fork) { (fib, exit: Exit[Any, Any]) => - val log = for { - elapse <- nanoTime.map(endTime => endTime - startTime) - elapsed = Nanoseconds(elapse).inBestUnit.rounded(2) - _ <- IzLogging.loggerWithContext(ctx).map { - _ - .withCustomContext("task" -> name, "elapsed" -> elapsed) - .log(ctx.level)("interrupting...") - } - } yield () - - log.when(exit.isInterrupted) *> fib.interrupt - } { fib => - fib.join - } + ): ZIO[R1 with IzLogging, E2, A2] = { + // impl adopted from zio.ReleaseExit, with the exception that the `io` is also within the `restore` block + ZIO.uninterruptibleMask { restore => + for { + fib <- restore(io).fork + exit <- restore(fib.join).exit + ret <- { + ZIO + .when(exit.isInterrupted) { + for { + elapse <- nanoTime.map(endTime => endTime - startTime) + elapsed = Nanoseconds(elapse).inBestUnit.rounded(2) + _ <- IzLogging.loggerWithContext(ctx).map { + _ + .withCustomContext("task" -> name, "elapsed" -> elapsed) + .log(ctx.level)("interrupting...") + } + } yield () + } + .zipRight(fib.interrupt) + }.foldCauseZIO( + cause2 => ZIO.refailCause(exit.foldExit(_ ++ cause2, _ => cause2)), + _ => ZIO.done(exit) + ) + } yield ret + } } } diff --git a/chopsticks-graphql/src/main/scala/dev/chopsticks/graphql/subscription/GraphQlSubscriptionSource.scala b/chopsticks-graphql/src/main/scala/dev/chopsticks/graphql/subscription/GraphQlSubscriptionSource.scala index 2070be28..5cd33b18 100644 --- a/chopsticks-graphql/src/main/scala/dev/chopsticks/graphql/subscription/GraphQlSubscriptionSource.scala +++ b/chopsticks-graphql/src/main/scala/dev/chopsticks/graphql/subscription/GraphQlSubscriptionSource.scala @@ -27,7 +27,6 @@ import dev.chopsticks.graphql.subscription.GraphQlSubscriptionExchangeModel.{ } import dev.chopsticks.graphql.subscription.GraphQlSubscriptionExchangeModel.GraphQlSubscriptionProtocolServerMessage.GraphQlConnectionData import dev.chopsticks.stream.GraphStageWithActorLogic -import io.circe.Printer import scala.concurrent.duration._ import scala.concurrent.{Future, Promise} diff --git a/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaFlow.scala b/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaFlow.scala index deb08280..6dbaae24 100644 --- a/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaFlow.scala +++ b/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaFlow.scala @@ -5,7 +5,7 @@ import org.apache.pekko.stream._ import org.apache.pekko.stream.scaladsl.{Flow, Keep, Source} import dev.chopsticks.fp.pekko_env.PekkoEnv import dev.chopsticks.fp.zio_ext.TaskExtensions -import zio.{RIO, Unsafe, ZIO} +import zio.{IO, RIO, Unsafe, ZEnvironment, ZIO} import scala.annotation.nowarn import scala.annotation.unchecked.uncheckedVariance @@ -26,10 +26,14 @@ final class ZAkkaFlow[-R, +E, -In, +Out, +Mat](val make: ZAkkaScope => ZIO[ E, Flow[In, Out, Mat] ]) { - // todo [migration] -// def provide(r: R)(implicit ev: NeedsEnv[R]): IO[E, ZAkkaFlow[Any, E, In, Out, Mat]] = { -// toZIO.provide(r) -// } + def provideEnvironment(r: => ZEnvironment[R]): IO[E, ZAkkaFlow[Any, E, In, Out, Mat]] = { + toZIO.provideEnvironment(r) + } + + def provideSomeEnvironment[R0](f: ZEnvironment[R0] => ZEnvironment[R]) + : ZIO[R0, E, ZAkkaFlow[Any, E, In, Out, Mat]] = { + toZIO.provideSomeEnvironment(f) + } def toZIO: ZIO[R, E, ZAkkaFlow[Any, E, In, Out, Mat]] = { ZIO.environmentWith[R] { env => diff --git a/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaSource.scala b/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaSource.scala index 1590bd88..18255fbb 100644 --- a/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaSource.scala +++ b/chopsticks-stream/src/main/scala/dev/chopsticks/stream/ZAkkaSource.scala @@ -12,7 +12,7 @@ import eu.timepit.refined.types.numeric.PosInt import org.reactivestreams.Publisher import shapeless.<:!< import zio.stream.ZStream -import zio.{RIO, URIO, Unsafe, ZIO} +import zio.{IO, RIO, URIO, Unsafe, ZEnvironment, ZIO} import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ExecutionContextExecutor, Future} @@ -194,10 +194,13 @@ final class ZAkkaSource[-R, +E, +Out, +Mat](val make: ZAkkaScope => ZIO[ E, Source[Out, Mat] ]) { - // todo [migration] -// def provide(r: R)(implicit ev: NeedsEnv[R]): IO[E, ZAkkaSource[Any, E, Out, Mat]] = { -// toZIO.provide(r) -// } + def provideEnvironment(r: => ZEnvironment[R]): IO[E, ZAkkaSource[Any, E, Out, Mat]] = { + toZIO.provideEnvironment(r) + } + + def provideSomeEnvironment[R0](f: ZEnvironment[R0] => ZEnvironment[R]): ZIO[R0, E, ZAkkaSource[Any, E, Out, Mat]] = { + toZIO.provideSomeEnvironment(f) + } def toZIO: ZIO[R, E, ZAkkaSource[Any, E, Out, Mat]] = { ZIO.environmentWith[R] { env => diff --git a/version.sbt b/version.sbt index 43799873..df624bba 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "3.13.0-SNAPSHOT" +ThisBuild / version := "4.0.0-SNAPSHOT"