Skip to content

Commit

Permalink
Upgrade to Cats Effect 3 ecosystem (close #)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Nov 21, 2023
1 parent b18af90 commit 43069e6
Show file tree
Hide file tree
Showing 50 changed files with 555 additions and 552 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
package com.snowplowanalytics.snowplow.enrich.common.fs2

import java.net.URI
import java.nio.file.{Path, Paths, StandardCopyOption}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NonFatal

import cats.{Applicative, Parallel}
import cats.Applicative
import cats.implicits._

import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.effect.concurrent.{Ref, Semaphore}
import cats.effect.kernel.{Async, Ref, Resource, Sync}
import cats.effect.std.Semaphore

import retry.{RetryDetails, RetryPolicies, RetryPolicy, retryingOnSomeErrors}
import retry.{RetryDetails, RetryPolicies, RetryPolicy, Sleep, retryingOnSomeErrors}

import fs2.Stream
import fs2.hash.md5
import fs2.io.file.{exists, move, readAll, tempFileResource, writeAll}
import fs2.io.file.{CopyFlag, CopyFlags, Files, Path}

import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
Expand Down Expand Up @@ -58,54 +58,52 @@ object Assets {
/**
* Initializes the assets state.
* Tries to find them on local FS and download them if they're missing.
* @param blocker Thread pool for downloading and reading files.
* @param sem Permit shared with the enriching, used while initializing the state.
* @param clients Clients to download the URIS.
* @param enrichments Configurations of the enrichments. Contains the list of assets.
*/
def make[F[_]: ConcurrentEffect: Timer: ContextShift](
blocker: Blocker,
def make[F[_]: Async](
sem: Semaphore[F],
clients: Clients[F],
assets: List[Asset]
): F[State[F]] =
for {
_ <- sem.acquire
map <- build[F](blocker, clients, assets)
map <- build[F](clients, assets)
hashes <- Ref.of[F, Map[URI, Hash]](map)
_ <- sem.release
} yield State(hashes, clients)

def build[F[_]: ConcurrentEffect: Timer: ContextShift](
blocker: Blocker,
def build[F[_]: Async](
clients: Clients[F],
assets: List[Asset]
): F[Map[URI, Hash]] =
for {
_ <- Logger[F].info("Initializing (downloading) enrichments assets")
curDir <- getCurDir
hashOpts <- buildFromLocal(blocker, assets)
curDir <- Files.forAsync[F].currentWorkingDirectory
hashOpts <- buildFromLocal(assets)
hashes <- hashOpts.traverse {
case (uri, path, Some(hash)) =>
Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash)
case (uri, path, None) =>
download[F](blocker, curDir, clients, (uri, path)).use { a =>
move(blocker, a.tpmPath, a.finalPath, List(StandardCopyOption.REPLACE_EXISTING)).as(uri -> a.hash)
download[F](curDir, clients, (uri, path)).use { a =>
Files.forAsync[F].move(a.tpmPath, a.finalPath, CopyFlags(CopyFlag.ReplaceExisting)).as(uri -> a.hash)
}
}
} yield hashes.toMap

def buildFromLocal[F[_]: Sync: ContextShift](blocker: Blocker, assets: List[Asset]): F[List[(URI, String, Option[Hash])]] =
assets.traverse { case (uri, path) => local[F](blocker, path).map(hash => (uri, path, hash)) }
def buildFromLocal[F[_]: Async](assets: List[Asset]): F[List[(URI, String, Option[Hash])]] =
assets.traverse { case (uri, path) => local[F](Path(path)).map(hash => (uri, path, hash)) }

/** Checks if file already exists on filesystem. */
def local[F[_]: Sync: ContextShift](blocker: Blocker, path: String): F[Option[Hash]] = {
val fpath = Paths.get(path)
exists(blocker, fpath).ifM(
Hash.fromStream(readAll(fpath, blocker, 1024)).map(_.some),
Sync[F].pure(none)
)
}
def local[F[_]: Async](path: Path): F[Option[Hash]] =
Files
.forAsync[F]
.exists(path)
.ifM(
Hash.fromStream(Files.forAsync[F].readAll(path)).map(_.some),
Sync[F].pure(none)
)
}

/** MD5 hash. */
Expand All @@ -121,7 +119,7 @@ object Assets {
stream.through(md5).compile.to(Array).map(fromBytes)
}

/** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable). */
/** Pair of a tracked `URI` and destination path on local FS (`fs2.io.file.Path` is not serializable). */
type Asset = (URI, String)

case class Downloaded(
Expand All @@ -132,20 +130,20 @@ object Assets {
)

/** Initializes the [[updateStream]] if refresh period is specified. */
def run[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer, A](
blocker: Blocker,
def run[F[_]: Async, A](
shifter: ShiftExecution[F],
sem: Semaphore[F],
updatePeriod: Option[FiniteDuration],
assetsState: Assets.State[F],
enrichments: Ref[F, Environment.Enrichments[F]]
enrichments: Ref[F, Environment.Enrichments[F]],
blockingEC: ExecutionContext
): Stream[F, Unit] =
updatePeriod match {
case Some(interval) =>
val init = for {
_ <- Logger[F].info(show"Assets will be checked every $interval")
assets <- enrichments.get.map(_.configs.flatMap(_.filesToCache))
} yield updateStream[F](blocker, shifter, sem, assetsState, enrichments, interval, assets)
} yield updateStream[F](shifter, sem, assetsState, enrichments, interval, assets, blockingEC)
Stream.eval(init).flatten
case None =>
Stream.empty.covary[F]
Expand All @@ -155,28 +153,28 @@ object Assets {
* Creates an update stream that periodically checks if new versions of assets are available.
* If that's the case, updates them locally for the enrichments and updates the state.
*/
def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer](
blocker: Blocker,
def updateStream[F[_]: Async](
shifter: ShiftExecution[F],
sem: Semaphore[F],
state: State[F],
enrichments: Ref[F, Environment.Enrichments[F]],
interval: FiniteDuration,
assets: List[Asset]
assets: List[Asset],
blockingEC: ExecutionContext
): Stream[F, Unit] =
Stream.fixedDelay[F](interval).evalMap { _ =>
for {
_ <- Logger[F].info(show"Checking if following assets have been updated: ${assets.map(_._1).mkString(", ")}")
curDir <- getCurDir
curDir <- Files.forAsync[F].currentWorkingDirectory
currentHashes <- state.hashes.get
downloaded = downloadAll(blocker, curDir, state.clients, assets)
downloaded = downloadAll(curDir, state.clients, assets)
_ <- downloaded.use { files =>
val newAssets = findUpdates(currentHashes, files)
if (newAssets.isEmpty)
Logger[F].info("All the assets are still the same, no update")
else
sem.withPermit {
update(blocker, shifter, state, enrichments, newAssets)
sem.permit.use { _ =>
update(shifter, state, enrichments, newAssets, blockingEC)
}
}
} yield ()
Expand All @@ -187,23 +185,21 @@ object Assets {
* @return For each URI the temporary path and the hash of the file is returned,
* as well as the asset path on disk.
*/
def downloadAll[F[_]: ConcurrentEffect: ContextShift: Timer](
blocker: Blocker,
def downloadAll[F[_]: Async](
dir: Path,
clients: Clients[F],
assets: List[Asset]
): Resource[F, List[Downloaded]] =
assets.traverse(download(blocker, dir, clients, _))
assets.traverse(download(dir, clients, _))

def download[F[_]: ConcurrentEffect: ContextShift: Timer](
blocker: Blocker,
def download[F[_]: Async](
dir: Path,
clients: Clients[F],
asset: Asset
): Resource[F, Downloaded] =
tempFileResource[F](blocker, dir).evalMap { tmpPath =>
downloadAndHash(blocker, clients, asset._1, tmpPath)
.map(hash => Downloaded(asset._1, tmpPath, Paths.get(asset._2), hash))
Files.forAsync[F].tempFile(Some(dir), "", ".tmp", None).evalMap { tmpPath =>
downloadAndHash(clients, asset._1, tmpPath)
.map(hash => Downloaded(asset._1, tmpPath, Path(asset._2), hash))
}

/**
Expand All @@ -223,17 +219,17 @@ object Assets {
* 2. Updates the state of the assets with new hash(es)
* 3. Updates the enrichments config
*/
def update[F[_]: ConcurrentEffect: ContextShift](
blocker: Blocker,
def update[F[_]: Async](
shifter: ShiftExecution[F],
state: State[F],
enrichments: Ref[F, Environment.Enrichments[F]],
newAssets: List[Downloaded]
newAssets: List[Downloaded],
blockingEC: ExecutionContext
): F[Unit] =
for {
_ <- newAssets.traverse_ { a =>
Logger[F].info(s"Remote ${a.uri} has changed, updating it locally") *>
move(blocker, a.tpmPath, a.finalPath, List(StandardCopyOption.REPLACE_EXISTING))
Files.forAsync[F].move(a.tpmPath, a.finalPath, CopyFlags(CopyFlag.ReplaceExisting))
}

_ <- Logger[F].info("Refreshing the state of assets")
Expand All @@ -243,34 +239,30 @@ object Assets {

_ <- Logger[F].info("Reinitializing enrichments")
old <- enrichments.get
fresh <- old.reinitialize(blocker, shifter)
fresh <- old.reinitialize(blockingEC, shifter)
_ <- enrichments.set(fresh)
} yield ()

def getCurDir[F[_]: Sync]: F[Path] =
Sync[F].delay(Paths.get("").toAbsolutePath)

def downloadAndHash[F[_]: Concurrent: ContextShift: Timer](
blocker: Blocker,
def downloadAndHash[F[_]: Async](
clients: Clients[F],
uri: URI,
destination: Path
): F[Hash] = {
val stream = clients.download(uri).observe(writeAll[F](destination, blocker))
val stream = clients.download(uri).through(Files.forAsync[F].writeAll(destination))
Logger[F].info(s"Downloading $uri") *> retryDownload(Hash.fromStream(stream))
}

def retryDownload[F[_]: Sync: Timer, A](download: F[A]): F[A] =
retryingOnSomeErrors[A](retryPolicy[F], worthRetrying, onError[F])(download)
def retryDownload[F[_]: Sleep: Sync, A](download: F[A]): F[A] =
retryingOnSomeErrors[A](retryPolicy[F], worthRetrying[F], onError[F])(download)

def retryPolicy[F[_]: Applicative]: RetryPolicy[F] =
RetryPolicies.fullJitter[F](1500.milliseconds).join(RetryPolicies.limitRetries[F](5))

def worthRetrying(e: Throwable): Boolean =
def worthRetrying[F[_]: Applicative](e: Throwable): F[Boolean] =
e match {
case _: Clients.RetryableFailure => true
case _: IllegalArgumentException => false
case NonFatal(_) => false
case _: Clients.RetryableFailure => Applicative[F].pure(true)
case _: IllegalArgumentException => Applicative[F].pure(false)
case NonFatal(_) => Applicative[F].pure(false)
}

def onError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
Expand Down
Loading

0 comments on commit 43069e6

Please sign in to comment.