Skip to content

Commit

Permalink
Migrate ZRunnable, fix interruption handling in the measurementHandle…
Browse files Browse the repository at this point in the history
…Interruption, add provideSomeEnvironment and provideEnvironment to the ZAkkaFlow and ZAkkaSource
  • Loading branch information
pwliwanow committed Apr 9, 2024
1 parent 7c0c788 commit 395d38e
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 191 deletions.
78 changes: 0 additions & 78 deletions chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZManageable.scala

This file was deleted.

143 changes: 57 additions & 86 deletions chopsticks-fp/src/main/scala/dev/chopsticks/fp/ZRunnable.scala
Original file line number Diff line number Diff line change
@@ -1,86 +1,57 @@
//package dev.chopsticks.fp
//
//import zio.{Scope, URIO, URLayer, ZEnvironment, ZIO, ZLayer}
//
//final class ZRunnable1[A, R, E, V](fn: A => ZIO[R with Scope, E, V]) {
// def toLayer[S: zio.Tag](serviceFactory: (A => ZIO[Any, E, V]) => S): URLayer[R, S] = {
// val io: URIO[R with Scope, S] = ZIO
// .environmentWith[R with Scope]((env: ZEnvironment[R with Scope]) => {
// val newFn = (arg: A) => fn(arg).provideEnvironment(env)
// serviceFactory(newFn)
// })
// val res: ZLayer[R with Scope, E, S] = ZLayer.scoped(io)
// res
// }
//
// def toZIO: ZIO[R, E, A => ZIO[Any, E, V]] = {
// ZIO
// .environmentWith[R](env => {
// (arg: A) => fn(arg).provideEnvironment(env)
// })
// }
//}
//
////final class ZRunnable2[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]) {
//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2) => ZIO[Any, E, V]) => S): URLayer[R, Has[S]] = {
//// ZManaged
//// .access[R](env => {
//// val newFn = (arg1: A1, arg2: A2) => fn(arg1, arg2).provide(env)
//// serviceFactory(newFn)
//// })
//// .toLayer
//// }
////
//// def toZIO: ZIO[R, E, (A1, A2) => ZIO[Any, E, V]] = {
//// ZIO
//// .access[R](env => {
//// (arg1: A1, arg2: A2) => fn(arg1, arg2).provide(env)
//// })
//// }
////}
////
////final class ZRunnable3[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]) {
//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3) => ZIO[Any, E, V]) => S): URLayer[R, Has[S]] = {
//// ZManaged
//// .access[R](env => {
//// val newFn = (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provide(env)
//// serviceFactory(newFn)
//// })
//// .toLayer
//// }
////
//// def toZIO: ZIO[R, E, (A1, A2, A3) => ZIO[Any, E, V]] = {
//// ZIO
//// .access[R](env => {
//// (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provide(env)
//// })
//// }
////}
////
////final class ZRunnable4[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]) {
//// def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3, A4) => ZIO[Any, E, V]) => S): URLayer[R, Has[S]] = {
//// ZManaged
//// .access[R](env => {
//// val newFn = (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => fn(arg1, arg2, arg3, arg4).provide(env)
//// serviceFactory(newFn)
//// })
//// .toLayer
//// }
////
//// def toZIO: ZIO[R, E, (A1, A2, A3, A4) => ZIO[Any, E, V]] = {
//// ZIO
//// .access[R](env => {
//// (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => fn(arg1, arg2, arg3, arg4).provide(env)
//// })
//// }
////}
////
////object ZRunnable {
//// def apply[A, R, E, V](fn: A => ZIO[R, E, V]): ZRunnable1[A, R, E, V] = new ZRunnable1[A, R, E, V](fn)
//// def apply[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]): ZRunnable2[A1, A2, R, E, V] =
//// new ZRunnable2[A1, A2, R, E, V](fn)
//// def apply[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]): ZRunnable3[A1, A2, A3, R, E, V] =
//// new ZRunnable3[A1, A2, A3, R, E, V](fn)
//// def apply[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]): ZRunnable4[A1, A2, A3, A4, R, E, V] =
//// new ZRunnable4[A1, A2, A3, A4, R, E, V](fn)
////}
package dev.chopsticks.fp

import zio.{Scope, URIO, URLayer, ZEnvironment, ZIO, ZLayer}

final class ZRunnable1[A, R, E, V](fn: A => ZIO[R with Scope, E, V]) {
def toLayer[S: zio.Tag](serviceFactory: (A => ZIO[Any, E, V]) => S): URLayer[R, S] = {
val io: URIO[R with Scope, S] = ZIO
.environmentWith[R with Scope]((env: ZEnvironment[R with Scope]) => {
val newFn = (arg: A) => fn(arg).provideEnvironment(env)
serviceFactory(newFn)
})
ZLayer.scoped[R](io)
}
}

final class ZRunnable2[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]) {
def toLayer[S: zio.Tag](serviceFactory: ((A1, A2) => ZIO[Any, E, V]) => S): URLayer[R, S] = {
val io = ZIO
.environmentWith[R with Scope](env => {
val newFn = (arg1: A1, arg2: A2) => fn(arg1, arg2).provideEnvironment(env)
serviceFactory(newFn)
})
ZLayer.scoped[R](io)
}
}

final class ZRunnable3[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]) {
def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3) => ZIO[Any, E, V]) => S): URLayer[R, S] = {
val io = ZIO
.environmentWith[R with Scope](env => {
val newFn = (arg1: A1, arg2: A2, arg3: A3) => fn(arg1, arg2, arg3).provideEnvironment(env)
serviceFactory(newFn)
})
ZLayer.scoped[R](io)
}
}

final class ZRunnable4[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]) {
def toLayer[S: zio.Tag](serviceFactory: ((A1, A2, A3, A4) => ZIO[Any, E, V]) => S): URLayer[R, S] = {
val io = ZIO
.environmentWith[R](env => {
val newFn = (arg1: A1, arg2: A2, arg3: A3, arg4: A4) => fn(arg1, arg2, arg3, arg4).provideEnvironment(env)
serviceFactory(newFn)
})
ZLayer.scoped[R](io)
}
}

object ZRunnable {
def apply[A, R, E, V](fn: A => ZIO[R, E, V]): ZRunnable1[A, R, E, V] = new ZRunnable1[A, R, E, V](fn)
def apply[A1, A2, R, E, V](fn: (A1, A2) => ZIO[R, E, V]): ZRunnable2[A1, A2, R, E, V] =
new ZRunnable2[A1, A2, R, E, V](fn)
def apply[A1, A2, A3, R, E, V](fn: (A1, A2, A3) => ZIO[R, E, V]): ZRunnable3[A1, A2, A3, R, E, V] =
new ZRunnable3[A1, A2, A3, R, E, V](fn)
def apply[A1, A2, A3, A4, R, E, V](fn: (A1, A2, A3, A4) => ZIO[R, E, V]): ZRunnable4[A1, A2, A3, A4, R, E, V] =
new ZRunnable4[A1, A2, A3, A4, R, E, V](fn)
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,32 @@ package object zio_ext {

private def measurementHandleInterruption[R1, E2, A2](name: String, startTime: Long)(io: ZIO[R1, E2, A2])(implicit
ctx: LogCtx
) = {
ZIO
.acquireReleaseExitWith(io.interruptible.fork) { (fib, exit: Exit[Any, Any]) =>
val log = for {
elapse <- nanoTime.map(endTime => endTime - startTime)
elapsed = Nanoseconds(elapse).inBestUnit.rounded(2)
_ <- IzLogging.loggerWithContext(ctx).map {
_
.withCustomContext("task" -> name, "elapsed" -> elapsed)
.log(ctx.level)("interrupting...")
}
} yield ()

log.when(exit.isInterrupted) *> fib.interrupt
} { fib =>
fib.join
}
): ZIO[R1 with IzLogging, E2, A2] = {
// impl adopted from zio.ReleaseExit, with the exception that the `io` is also within the `restore` block
ZIO.uninterruptibleMask { restore =>
for {
fib <- restore(io).fork
exit <- restore(fib.join).exit
ret <- {
ZIO
.when(exit.isInterrupted) {
for {
elapse <- nanoTime.map(endTime => endTime - startTime)
elapsed = Nanoseconds(elapse).inBestUnit.rounded(2)
_ <- IzLogging.loggerWithContext(ctx).map {
_
.withCustomContext("task" -> name, "elapsed" -> elapsed)
.log(ctx.level)("interrupting...")
}
} yield ()
}
.zipRight(fib.interrupt)
}.foldCauseZIO(
cause2 => ZIO.refailCause(exit.foldExit(_ ++ cause2, _ => cause2)),
_ => ZIO.done(exit)
)
} yield ret
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.pekko.stream._
import org.apache.pekko.stream.scaladsl.{Flow, Keep, Source}
import dev.chopsticks.fp.pekko_env.PekkoEnv
import dev.chopsticks.fp.zio_ext.TaskExtensions
import zio.{RIO, Unsafe, ZIO}
import zio.{IO, RIO, Unsafe, ZEnvironment, ZIO}

import scala.annotation.nowarn
import scala.annotation.unchecked.uncheckedVariance
Expand All @@ -26,10 +26,14 @@ final class ZAkkaFlow[-R, +E, -In, +Out, +Mat](val make: ZAkkaScope => ZIO[
E,
Flow[In, Out, Mat]
]) {
// todo [migration]
// def provide(r: R)(implicit ev: NeedsEnv[R]): IO[E, ZAkkaFlow[Any, E, In, Out, Mat]] = {
// toZIO.provide(r)
// }
def provideEnvironment(r: => ZEnvironment[R]): IO[E, ZAkkaFlow[Any, E, In, Out, Mat]] = {
toZIO.provideEnvironment(r)
}

def provideSomeEnvironment[R0](f: ZEnvironment[R0] => ZEnvironment[R])
: ZIO[R0, E, ZAkkaFlow[Any, E, In, Out, Mat]] = {
toZIO.provideSomeEnvironment(f)
}

def toZIO: ZIO[R, E, ZAkkaFlow[Any, E, In, Out, Mat]] = {
ZIO.environmentWith[R] { env =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import eu.timepit.refined.types.numeric.PosInt
import org.reactivestreams.Publisher
import shapeless.<:!<
import zio.stream.ZStream
import zio.{RIO, URIO, Unsafe, ZIO}
import zio.{IO, RIO, URIO, Unsafe, ZEnvironment, ZIO}

import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.{ExecutionContextExecutor, Future}
Expand Down Expand Up @@ -195,10 +195,13 @@ final class ZAkkaSource[-R, +E, +Out, +Mat](val make: ZAkkaScope => ZIO[
E,
Source[Out, Mat]
]) {
// todo [migration]
// def provide(r: R)(implicit ev: NeedsEnv[R]): IO[E, ZAkkaSource[Any, E, Out, Mat]] = {
// toZIO.provide(r)
// }
def provideEnvironment(r: => ZEnvironment[R]): IO[E, ZAkkaSource[Any, E, Out, Mat]] = {
toZIO.provideEnvironment(r)
}

def provideSomeEnvironment[R0](f: ZEnvironment[R0] => ZEnvironment[R]): ZIO[R0, E, ZAkkaSource[Any, E, Out, Mat]] = {
toZIO.provideSomeEnvironment(f)
}

def toZIO: ZIO[R, E, ZAkkaSource[Any, E, Out, Mat]] = {
ZIO.environmentWith[R] { env =>
Expand Down

0 comments on commit 395d38e

Please sign in to comment.