Skip to content

Commit

Permalink
Documented Worker C8 example.
Browse files Browse the repository at this point in the history
  • Loading branch information
pme123 committed Jan 23, 2025
1 parent fb4bfed commit 225e7e1
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 81 deletions.
39 changes: 0 additions & 39 deletions 03-worker/src/main/scala/camundala/worker/JobWorker.scala

This file was deleted.

16 changes: 9 additions & 7 deletions 03-worker/src/main/scala/camundala/worker/WorkerApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import zio.ZIO.*
import scala.compiletime.uninitialized

trait WorkerApp extends ZIOAppDefault:
// a list of registries for each worker implementation
def workerRegistries: Seq[WorkerRegistry[?]]
protected var theWorkers: Set[JobWorker] = uninitialized

override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = ZioLogger.logger

def workers(dWorkers: (JobWorker | Seq[JobWorker])*): Unit =
// list all the workers you want to register
def workers(dWorkers: (WorkerDsl[?, ?] | Seq[WorkerDsl[?, ?]])*): Unit =
theWorkers = dWorkers
.flatMap:
case d: JobWorker => Seq(d)
case s: Seq[?] => s.collect{case d: JobWorker => d}
case d: WorkerDsl[?, ?] => Seq(d)
case s: Seq[?] => s.collect{case d: WorkerDsl[?, ?] => d}
.toSet

protected var theWorkers: Set[WorkerDsl[?, ?]] = uninitialized

override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] = ZioLogger.logger

override def run: ZIO[Any, Any, Any] =
for
_ <- logInfo("Starting WorkerApp")
Expand Down
35 changes: 33 additions & 2 deletions 03-worker/src/main/scala/camundala/worker/WorkerDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ package worker
import camundala.bpmn.*
import camundala.domain.*
import camundala.worker.CamundalaWorkerError.*

import scala.concurrent.duration.*
import scala.reflect.ClassTag

trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]
extends JobWorker:
trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]:

// needed that it can be called from CSubscriptionPostProcessor
def worker: Worker[In, Out, ?]
def topic: String = worker.topic
def timeout: Duration = 10.seconds

def runWorkFromWorker(in: In)(using EngineRunContext): Option[Either[RunWorkError, Out]] =
worker.runWorkHandler
Expand All @@ -22,6 +24,35 @@ trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]
runWorkFromWorker(in)
.get // only if you are sure that there is a handler

protected def errorHandled(error: CamundalaWorkerError, handledErrors: Seq[String]): Boolean =
error.isMock || // if it is mocked, it is handled in the error, as it also could be a successful output
handledErrors.contains(error.errorCode.toString) || handledErrors.map(
_.toLowerCase
).contains("catchall")

protected def regexMatchesAll(
errorHandled: Boolean,
error: CamundalaWorkerError,
regexHandledErrors: Seq[String]
) =
val errorMsg = error.errorMsg.replace("\n", "")
errorHandled && regexHandledErrors.forall(regex =>
errorMsg.matches(s".*$regex.*")
)

protected def filteredOutput(
outputVariables: Seq[String],
allOutputs: Map[String, Any]
): Map[String, Any] =
outputVariables match
case filter if filter.isEmpty => allOutputs
case filter =>
allOutputs
.filter:
case k -> _ => filter.contains(k)

end filteredOutput

extension [T](option: Option[T])
def toEither[E <: CamundalaWorkerError](
error: E
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package camundala.worker
import zio.ZIO
import zio.ZIO.*

trait WorkerRegistry[T <: JobWorker]:
def register(workers: Set[JobWorker]): ZIO[Any, Any, Any] =
trait WorkerRegistry[T <: WorkerDsl[?, ?]]:
def register(workers: Set[WorkerDsl[?, ?]]): ZIO[Any, Any, Any] =
logInfo(s"Registering Workers for ${getClass.getSimpleName}") *>
registerWorkers(workers.collect { case w: T => w })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ import zio.ZIO.*
import java.util.Date
import scala.jdk.CollectionConverters.*

trait C7Worker[In: InOutDecoder, Out: InOutEncoder] extends JobWorker, camunda.ExternalTaskHandler:
trait C7Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec]
extends WorkerDsl[In, Out], camunda.ExternalTaskHandler:

protected def c7Context: C7Context

private lazy val runtime = Runtime.default


def logger: WorkerLogger = Slf4JLogger.logger(getClass.getName)

override def execute(
Expand Down Expand Up @@ -154,5 +153,6 @@ trait C7Worker[In: InOutDecoder, Out: InOutEncoder] extends JobWorker, camunda.E
end handleError

end extension
private lazy val runtime = Runtime.default

end C7Worker
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.time
import scala.jdk.CollectionConverters.*
import java.util.Date

trait C8Worker[In: InOutDecoder, Out: InOutEncoder] extends JobWorker, JobHandler:
trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends WorkerDsl[In, Out], JobHandler:
protected def c8Context: C8Context
private lazy val runtime = Runtime.default

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package camundala.worker.c8zio

import camundala.worker.{JobWorker, WorkerRegistry}
import camundala.worker.WorkerRegistry
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProviderBuilder
import zio.ZIO.*
import zio.{Console, *}

import java.net.URI

class C8WorkerRegistry(client: C8Client)
extends WorkerRegistry[C8Worker[?, ?]]:

Expand Down
83 changes: 83 additions & 0 deletions 05-examples/demos/03-worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# PoC Worker C7/C8
## Setup
```scala
03-worker
^ ^
04-worker-c7zio 04-worker-c8zio
^ ^
05-examples / demos / 03-worker
```
### 03-worker
Interfaces like:

#### WorkerApp
Register the Workers in the configured WorkerRegistries.
There is a registry for each Worker Implementation (different Camunda versions).

```scala
trait WorkerApp extends ZIOAppDefault:
// a list of registries for each worker implementation
def workerRegistries: Seq[WorkerRegistry[?]]
// function that registers all the workers
def workers(dWorkers: (WorkerDsl[?, ?] | Seq[WorkerDsl[?, ?]])*): Unit = ...
...
```
#### WorkerRegistry
Interface for a registry to register Workers for a certain Worker implementation.

```scala
trait WorkerRegistry[T <: WorkerDsl[?, ?]]:
def register(workers: Set[WorkerDsl[?, ?]]) =
logInfo(s"Registering Workers for ${getClass.getSimpleName}") *>
registerWorkers(workers.collect { case w: T => w })

protected def registerWorkers(workers: Set[T]): ZIO[Any, Any, Any]
```
#### WorkerDsl[In, Out]
Interface for a Worker that does the work.

```scala
trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]:
// needed that it can be called from CSubscriptionPostProcessor
def worker: Worker[In, Out, ?]
def topic: String = worker.topic
...
```

### 04-worker-c7zio / 04-worker-c8zio
Implementations like:

#### C7WorkerRegistry / C8WorkerRegistry
```scala
class C7WorkerRegistry(client: C7Client)
extends WorkerRegistry[C7Worker[?, ?]]:

def registerWorkers(workers: Set[C7Worker[?, ?]]): ZIO[Any, Any, Any] = ???
...
```
#### C7Worker[In, Out] / C8Worker[In, Out]
Implementation of the Worker Client of the BPMN Engine. In this case, Camunda 7 or 8.

```scala
trait C7Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec]
extends WorkerDsl[In, Out], camunda.ExternalTaskHandler:

protected def c7Context: C7Context

def logger: WorkerLogger = Slf4JLogger.logger(getClass.getName)

override def execute(
externalTask: camunda.ExternalTask,
externalTaskService: camunda.ExternalTaskService
): Unit = ???
...
```
### 05-examples / demos / 03-worker
Example for the new Worker implementation.

### Usage
- Run twitter-auto.bpmn on Camunda 8 -> I use Saas community account.
- Run twitter-auto-c7.bpmn on Camunda 7 -> I use _DemosExampleApplication_.
- Run the TestWorkerApp.

You find the test data in _testdataC7.json_ and _testdataC8.json_.
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,3 @@ object ExampleJob extends CompanyBpmnCustomTaskDsl:
Out()
)
end ExampleJob
/*
{
"approved" : true,
"clientKey" : 74854564837991,
"myMessage" : "hello",
"myTypes" : [
{
"doit" : "no",
"why" : 12
},
{
"doit" : "yes",
"why" : 42
},
{
"doit" : "yes",
"why" : 42
}
],
"businessKey": "MY_BUSINESS_KEY"
}
*/

0 comments on commit 225e7e1

Please sign in to comment.