diff --git a/.github/workflows/lacework.yml b/.github/workflows/lacework.yml index 615dd4f31..77b6d0135 100644 --- a/.github/workflows/lacework.yml +++ b/.github/workflows/lacework.yml @@ -74,20 +74,6 @@ jobs: LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} run: ./lw-scanner image evaluate snowplow/snowplow-enrich-kafka ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull - - name: Scan enrich-rabbitmq - env: - LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }} - LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }} - LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} - run: ./lw-scanner image evaluate snowplow/snowplow-enrich-rabbitmq ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull - - - name: Scan enrich-rabbitmq distroless - env: - LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }} - LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }} - LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} - run: ./lw-scanner image evaluate snowplow/snowplow-enrich-rabbitmq ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull - - name: Scan enrich-nsq env: LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 30a694673..003118d26 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,7 +25,6 @@ jobs: run: | sbt "project pubsub; set assembly / test := {}; assembly" \ "project kinesis; set assembly / test := {}; assembly" \ - "project rabbitmq; set assembly / test := {}; assembly" \ "project kafka; set assembly / test := {}; assembly" \ "project nsq; set assembly / test := {}; assembly" - name: Create GitHub release and attach artifacts @@ -38,7 +37,6 @@ jobs: files: | modules/pubsub/target/scala-2.12/snowplow-enrich-pubsub-${{ steps.ver.outputs.tag }}.jar modules/kinesis/target/scala-2.12/snowplow-enrich-kinesis-${{ steps.ver.outputs.tag }}.jar - modules/rabbitmq/target/scala-2.12/snowplow-enrich-rabbitmq-${{ steps.ver.outputs.tag }}.jar modules/kafka/target/scala-2.12/snowplow-enrich-kafka-${{ steps.ver.outputs.tag }}.jar modules/nsq/target/scala-2.12/snowplow-enrich-nsq-${{ steps.ver.outputs.tag }}.jar env: @@ -54,11 +52,7 @@ jobs: - kinesis - kafka - nsq - - rabbitmq include: - - suffix: "" - - suffix: -experimental - app: rabbitmq - app: kinesis run_snyk: ${{ !contains(github.ref, 'rc') }} - app: pubsub @@ -94,7 +88,7 @@ jobs: - name: Get app package name id: packageName run: | - export PACKAGE_NAME=$(sbt "project ${{ matrix.app }}" dockerAlias -Dsbt.log.noformat=true | sed -n '/\[info\]/ s/\[info\] //p' | tail -1 | tr -d '\n' | cut -d":" -f1)${{ matrix.suffix }} + export PACKAGE_NAME=$(sbt "project ${{ matrix.app }}" dockerAlias -Dsbt.log.noformat=true | sed -n '/\[info\]/ s/\[info\] //p' | tail -1 | tr -d '\n' | cut -d":" -f1) echo "::set-output name=package_name::$PACKAGE_NAME" - name: Get app base directory id: baseDirectory diff --git a/build.sbt b/build.sbt index dbd4d6a01..072cd428a 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ lazy val root = project.in(file(".")) .settings(projectSettings) .settings(compilerSettings) .settings(resolverSettings) - .aggregate(common, commonFs2, pubsub, kinesis, kafka, rabbitmq, nsq) + .aggregate(common, commonFs2, pubsub, kinesis, kafka, nsq) lazy val common = project .in(file("modules/common")) @@ -113,25 +113,6 @@ lazy val kafkaDistroless = project .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2) -lazy val rabbitmq = project - .in(file("modules/rabbitmq")) - .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) - .settings(rabbitmqBuildSettings) - .settings(libraryDependencies ++= rabbitmqDependencies) - .settings(excludeDependencies ++= exclusions) - .settings(addCompilerPlugin(betterMonadicFor)) - .dependsOn(commonFs2) - -lazy val rabbitmqDistroless = project - .in(file("modules/distroless/rabbitmq")) - .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDistrolessDockerPlugin) - .settings(sourceDirectory := (rabbitmq / sourceDirectory).value) - .settings(rabbitmqDistrolessBuildSettings) - .settings(libraryDependencies ++= rabbitmqDependencies) - .settings(excludeDependencies ++= exclusions) - .settings(addCompilerPlugin(betterMonadicFor)) - .dependsOn(commonFs2) - lazy val nsq = project .in(file("modules/nsq")) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, SnowplowDockerPlugin) diff --git a/config/config.rabbitmq.extended.hocon b/config/config.rabbitmq.extended.hocon deleted file mode 100644 index 94c419a2e..000000000 --- a/config/config.rabbitmq.extended.hocon +++ /dev/null @@ -1,284 +0,0 @@ -{ - # Where to read collector payloads from - "input": { - "type": "RabbitMQ" - - "cluster": { - # Nodes of RabbitMQ cluster - "nodes": [ - { - "host": "localhost" - "port": 5672 - } - ] - # Username to connect to the cluster - "username": "guest" - # Password to connect to the cluster - "password": "guest" - # Virtual host to use when connecting to the cluster - "virtualHost": "/" - - # Optional. Whether to use SSL or not to communicate with the cluster - "ssl": false - # Optional. Timeout for the connection to the cluster (in seconds) - "connectionTimeout": 5 - # Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver - "internalQueueSize": 1000 - # Optional. Whether the AMQP Java driver should try to recover broken connections - "automaticRecovery": true - # Optional. Interval to check that the TCP connection to the cluster is still alive - "requestedHeartbeat": 100 - } - - # Queue to read collector payloads from - "queue": "raw" - - # Optional. Settings for backoff policy for checkpointing. - # Records are checkpointed after all the records of the same chunk have been enriched - "checkpointBackoff": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - "maxRetries": 10 - } - } - - "output": { - # Enriched events output - "good": { - "type": "RabbitMQ" - - "cluster": { - # Nodes of RabbitMQ cluster - "nodes": [ - { - "host": "localhost" - "port": 5672 - } - ] - # Username to connect to the cluster - "username": "guest" - # Password to connect to the cluster - "password": "guest" - # Virtual host to use when connecting to the cluster - "virtualHost": "/" - - # Optional. Whether to use SSL or not to communicate with the cluster - "ssl": false - # Optional. Timeout for the connection to the cluster (in seconds) - "connectionTimeout": 5 - # Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver - "internalQueueSize": 1000 - # Optional. Whether the AMQP Java driver should try to recover broken connections - "automaticRecovery": true - # Optional. Interval to check that the TCP connection to the cluster is still alive - "requestedHeartbeat": 100 - } - - # Exchange to send the enriched events to - "exchange": "enriched" - # Routing key to use when sending the enriched events to the exchange - "routingKey": "enriched" - - # Optional. Policy to retry if writing to RabbitMQ fails - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - "maxRetries": 10 - } - } - - # Bad rows output - "bad": { - "type": "RabbitMQ" - - "cluster": { - # Nodes of RabbitMQ cluster - "nodes": [ - { - "host": "localhost" - "port": 5672 - } - ] - # Username to connect to the cluster - "username": "guest" - # Password to connect to the cluster - "password": "guest" - # Virtual host to use when connecting to the cluster - "virtualHost": "/" - - # Optional. Whether to use SSL or not to communicate with the cluster - "ssl": false - # Optional. Timeout for the connection to the cluster (in seconds) - "connectionTimeout": 5 - # Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver - "internalQueueSize": 1000 - # Optional. Whether the AMQP Java driver should try to recover broken connections - "automaticRecovery": true - # Optional. Interval to check that the TCP connection to the cluster is still alive - "requestedHeartbeat": 100 - } - - # Exchange to send the bad rows to - "exchange": "bad-1" - # Routing key to use when sending the bad rows to the exchange - "routingKey": "bad-1" - - # Optional. Policy to retry if writing to RabbitMQ fails - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - "maxRetries": 10 - } - } - } - - # Optional. Concurrency of the app - "concurrency" : { - # Number of events that can get enriched at the same time within a chunk - "enrich": 256 - # Number of chunks that can get sunk at the same time - "sink": 3 - } - - # Optional, period after which enrich assets should be checked for updates - # no assets will be updated if the key is absent - "assetsUpdatePeriod": "7 days" - - # Optional, configuration of remote adapters - "remoteAdapters": { - # how long enrich waits to establish a connection to remote adapters - "connectionTimeout": "10 seconds" - # how long enrich waits to get a response from remote adapters - "readTimeout": "45 seconds" - # how many connections enrich opens at maximum for remote adapters - # increasing this could help with throughput in case of adapters with high latency - "maxConnections": 10 - # a list of remote adapter configs - "configs": [ - { - "vendor": "com.example" - "version": "v1" - "url": "https://remote-adapter.com" - } - ] - } - - "monitoring": { - - # Optional, for tracking runtime exceptions - "sentry": { - "dsn": "http://sentry.acme.com" - } - - # Optional, configure how metrics are reported - "metrics": { - - # Optional. Send metrics to a StatsD server on localhost - "statsd": { - "hostname": "localhost" - "port": 8125 - - # Required, how frequently to report metrics - "period": "10 seconds" - - # Any key-value pairs to be tagged on every StatsD metric - "tags": { - "app": enrich - } - - # Optional, override the default metric prefix - # "prefix": "snowplow.enrich." - } - - # Optional. Log to stdout using Slf4j - "stdout": { - "period": "10 seconds" - - # Optional, override the default metric prefix - # "prefix": "snowplow.enrich." - } - - # Optional. Send KCL and KPL metrics to Cloudwatch - "cloudwatch": true - } - } - - # Optional, configure telemetry - # All the fields are optional - "telemetry": { - - # Set to true to disable telemetry - "disable": false - - # Interval for the heartbeat event - "interval": 15 minutes - - # HTTP method used to send the heartbeat event - "method": POST - - # URI of the collector receiving the heartbeat event - "collectorUri": collector-g.snowplowanalytics.com - - # Port of the collector receiving the heartbeat event - "collectorPort": 443 - - # Whether to use https or not - "secure": true - - # Identifier intended to tie events together across modules, - # infrastructure and apps when used consistently - "userProvidedId": my_pipeline - - # ID automatically generated upon running a modules deployment script - # Intended to identify each independent module, and the infrastructure it controls - "autoGeneratedId": hfy67e5ydhtrd - - # Unique identifier for the VM instance - # Unique for each instance of the app running within a module - "instanceId": 665bhft5u6udjf - - # Name of the terraform module that deployed the app - "moduleName": enrich-rabbitmq-ce - - # Version of the terraform module that deployed the app - "moduleVersion": 1.0.0 - } - - # Optional. To activate/deactive enrich features that are still in beta - # or that are here for transition. - # This section might change in future versions - "featureFlags" : { - - # Enrich 3.0.0 introduces the validation of the enriched events against atomic schema - # before emitting. - # If set to false, a bad row will be emitted instead of the enriched event - # if validation fails. - # If set to true, invalid enriched events will be emitted, as before. - # WARNING: this feature flag will be removed in a future version - # and it will become impossible to emit invalid enriched events. - # More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690 - "acceptInvalid": false - - # In early versions of enrich-kinesis and enrich-pubsub (pre-3.1.4), the Javascript enrichment - # incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true - # to keep the erroneous behaviour of those previous versions. This flag will be removed in a - # future version. - # More details: https://github.com/snowplow/enrich/issues/619 - "legacyEnrichmentOrder": false - - # Try to base64 decode event if initial Thrift serialization fail - "tryBase64Decoding": false - } - - # Optional. Configuration for experimental/preview features - "experimental": { - # Whether to export metadata using a webhook URL. - # Follows iglu-webhook protocol. - "metadata": { - "endpoint": "https://my_pipeline.my_domain.com/iglu" - "interval": 5 minutes - "organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455" - "pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784" - } - } -} diff --git a/config/config.rabbitmq.minimal.hocon b/config/config.rabbitmq.minimal.hocon deleted file mode 100644 index 35eb287b9..000000000 --- a/config/config.rabbitmq.minimal.hocon +++ /dev/null @@ -1,48 +0,0 @@ -{ - "input": { - "cluster": { - "nodes": [ - { - "host": "localhost" - "port": 5672 - } - ] - "username": "guest" - "password": "guest" - "virtualHost": "/" - } - "queue": "raw" - } - - "output": { - "good": { - "cluster": { - "nodes": [ - { - "host": "localhost" - "port": 5672 - } - ] - "username": "guest" - "password": "guest" - "virtualHost": "/" - } - "exchange": "enriched" - } - - "bad": { - "cluster": { - "nodes": [ - { - "host": "localhost" - "port": 5672 - } - ] - "username": "guest" - "password": "guest" - "virtualHost": "/" - } - "exchange": "bad-1" - } - } -} diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index eb07870a9..6cef4f1df 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2020-2023 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -18,7 +18,6 @@ import java.net.URI import java.util.UUID import cats.syntax.either._ -import cats.data.NonEmptyList import scala.concurrent.duration.{Duration, FiniteDuration} @@ -76,31 +75,6 @@ object io { val checkpointBackoff: BackoffPolicy } - case class RabbitMQNode( - host: String, - port: Int - ) - object RabbitMQNode { - implicit val rabbitMQNodeDecoder: Decoder[RabbitMQNode] = deriveConfiguredDecoder[RabbitMQNode] - implicit val rabbitMQNodeEncoder: Encoder[RabbitMQNode] = deriveConfiguredEncoder[RabbitMQNode] - } - - case class RabbitMQConfig( - nodes: NonEmptyList[RabbitMQNode], - username: String, - password: String, - virtualHost: String, - connectionTimeout: Int, - ssl: Boolean, - internalQueueSize: Int, - requestedHeartbeat: Int, - automaticRecovery: Boolean - ) - object RabbitMQConfig { - implicit val rabbitMQConfigDecoder: Decoder[RabbitMQConfig] = deriveConfiguredDecoder[RabbitMQConfig] - implicit val rabbitMQConfigEncoder: Encoder[RabbitMQConfig] = deriveConfiguredEncoder[RabbitMQConfig] - } - sealed trait Input object Input { @@ -151,12 +125,6 @@ object io { cloudwatchCustomEndpoint: Option[URI] ) extends Input with RetryCheckpointing - case class RabbitMQ( - cluster: RabbitMQConfig, - queue: String, - checkpointBackoff: BackoffPolicy - ) extends Input - with RetryCheckpointing object Kinesis { sealed trait InitPosition @@ -310,12 +278,6 @@ object io { byteLimit: Int, customEndpoint: Option[URI] ) extends Output - case class RabbitMQ( - cluster: RabbitMQConfig, - exchange: String, - routingKey: String, - backoffPolicy: BackoffPolicy - ) extends Output implicit val outputDecoder: Decoder[Output] = deriveConfiguredDecoder[Output] diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index 32fddd28d..152a269b5 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2020-2023 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -19,7 +19,6 @@ import java.nio.file.Paths import scala.concurrent.duration._ import cats.syntax.either._ -import cats.data.NonEmptyList import cats.effect.IO import cats.effect.testing.specs2.CatsIO @@ -138,118 +137,6 @@ class ConfigFileSpec extends Specification with CatsIO { ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) } - "parse reference example for RabbitMQ" in { - val configPath = Paths.get(getClass.getResource("/config.rabbitmq.extended.hocon").toURI) - val expected = ConfigFile( - io.Input.RabbitMQ( - io.RabbitMQConfig( - NonEmptyList.one( - io.RabbitMQNode("localhost", 5672) - ), - "guest", - "guest", - "/", - 5, - false, - 1000, - 100, - true - ), - "raw", - io.BackoffPolicy(100.millis, 10.seconds, Some(10)) - ), - io.Outputs( - io.Output.RabbitMQ( - io.RabbitMQConfig( - NonEmptyList.one( - io.RabbitMQNode("localhost", 5672) - ), - "guest", - "guest", - "/", - 5, - false, - 1000, - 100, - true - ), - "enriched", - "enriched", - io.BackoffPolicy(100.millis, 10.seconds, Some(10)) - ), - None, - io.Output.RabbitMQ( - io.RabbitMQConfig( - NonEmptyList.one( - io.RabbitMQNode("localhost", 5672) - ), - "guest", - "guest", - "/", - 5, - false, - 1000, - 100, - true - ), - "bad-1", - "bad-1", - io.BackoffPolicy(100.millis, 10.seconds, Some(10)) - ) - ), - io.Concurrency(256, 3), - Some(7.days), - io.RemoteAdapterConfigs( - 10.seconds, - 45.seconds, - 10, - List( - io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") - ) - ), - io.Monitoring( - Some(Sentry(URI.create("http://sentry.acme.com"))), - io.MetricsReporters( - Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), - Some(io.MetricsReporters.Stdout(10.seconds, None)), - true - ) - ), - io.Telemetry( - false, - 15.minutes, - "POST", - "collector-g.snowplowanalytics.com", - 443, - true, - Some("my_pipeline"), - Some("hfy67e5ydhtrd"), - Some("665bhft5u6udjf"), - Some("enrich-rabbitmq-ce"), - Some("1.0.0") - ), - io.FeatureFlags( - false, - false, - false - ), - Some( - io.Experimental( - Some( - io.Metadata( - Uri.uri("https://my_pipeline.my_domain.com/iglu"), - 5.minutes, - UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), - UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") - ) - ) - ) - ), - adaptersSchemas - ) - ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) - } - "parse reference example for NSQ" in { val configPath = Paths.get(getClass.getResource("/config.nsq.extended.hocon").toURI) val expected = ConfigFile( diff --git a/modules/rabbitmq/src/main/resources/application.conf b/modules/rabbitmq/src/main/resources/application.conf deleted file mode 100644 index 933f2adbf..000000000 --- a/modules/rabbitmq/src/main/resources/application.conf +++ /dev/null @@ -1,86 +0,0 @@ -{ - "input": { - "type": "RabbitMQ" - "cluster": { - "ssl": false - "connectionTimeout": 5 - "internalQueueSize": 1000 - "automaticRecovery": true - "requestedHeartbeat": 100 - } - "checkpointBackoff": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - "maxRetries": 10 - } - } - - "output": { - "good": { - "type": "RabbitMQ" - "cluster": { - "ssl": false - "connectionTimeout": 5 - "internalQueueSize": 1000 - "automaticRecovery": true - "requestedHeartbeat": 100 - } - "routingKey": "enriched" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - "maxRetries": 10 - } - } - - "bad": { - "type": "RabbitMQ" - "cluster": { - "ssl": false - "connectionTimeout": 5 - "internalQueueSize": 1000 - "automaticRecovery": true - "requestedHeartbeat": 100 - } - "routingKey": "bad-1" - "backoffPolicy": { - "minBackoff": 100 milliseconds - "maxBackoff": 10 seconds - "maxRetries": 10 - } - } - } - - "concurrency" : { - "enrich": 256 - "sink": 3 - } - - "remoteAdapters" : { - "connectionTimeout": 10 seconds - "readTimeout": 45 seconds - "maxConnections": 10 - "configs" : [] - } - - "monitoring": { - "metrics": { - "cloudwatch": false - } - } - - "telemetry": { - "disable": false - "interval": 15 minutes - "method": POST - "collectorUri": collector-g.snowplowanalytics.com - "collectorPort": 443 - "secure": true - } - - "featureFlags" : { - "acceptInvalid": false - "legacyEnrichmentOrder": false - "tryBase64Decoding": false - } -} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala deleted file mode 100644 index 44584653e..000000000 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.rabbitmq - -import cats.Parallel -import cats.implicits._ - -import cats.effect.{ExitCode, IO, IOApp, Resource, Sync, SyncIO} - -import java.util.concurrent.{Executors, TimeUnit} - -import scala.concurrent.ExecutionContext - -import com.snowplowanalytics.snowplow.enrich.common.fs2.Run - -import com.snowplowanalytics.snowplow.enrich.rabbitmq.generated.BuildInfo - -object Main extends IOApp.WithContext { - - private val MaxRecordSize = 128000000 - - /** - * An execution context matching the cats effect IOApp default. We create it explicitly so we can - * also use it for our Blaze client. - */ - override protected val executionContextResource: Resource[SyncIO, ExecutionContext] = { - val poolSize = math.max(2, Runtime.getRuntime().availableProcessors()) - Resource - .make(SyncIO(Executors.newFixedThreadPool(poolSize)))(pool => - SyncIO { - pool.shutdown() - pool.awaitTermination(10, TimeUnit.SECONDS) - () - } - ) - .map(ExecutionContext.fromExecutorService) - } - - def run(args: List[String]): IO[ExitCode] = - Run.run[IO, Record[IO]]( - args, - BuildInfo.name, - """(\d.\d.\d(-\w*\d*)?)""".r.findFirstIn(BuildInfo.version).getOrElse(BuildInfo.version), - BuildInfo.description, - executionContext, - (_, cliConfig) => IO(cliConfig), - (blocker, input, _) => Source.init(blocker, input), - (blocker, out) => Sink.initAttributed(blocker, out), - (blocker, out) => Sink.initAttributed(blocker, out), - (blocker, out) => Sink.init(blocker, out), - checkpoint, - Nil, - _.data, - MaxRecordSize, - None, - None - ) - - private def checkpoint[F[_]: Parallel: Sync](records: List[Record[F]]): F[Unit] = - records.parTraverse_(_.ack) -} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Record.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Record.scala deleted file mode 100644 index 302d60850..000000000 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Record.scala +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.rabbitmq - -final case class Record[F[_]](data: Array[Byte], ack: F[Unit]) diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala deleted file mode 100644 index d6bf282b4..000000000 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.rabbitmq - -import cats.implicits._ -import cats.Parallel - -import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync, Timer} - -import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig -import dev.profunktor.fs2rabbit.model._ - -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger - -import retry.syntax.all._ - -import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output -import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Retries - -object Sink { - - private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = - Slf4jLogger.getLogger[F] - - def init[F[_]: ConcurrentEffect: ContextShift: Parallel: Sync: Timer]( - blocker: Blocker, - output: Output - ): Resource[F, ByteSink[F]] = - for { - sink <- initAttributed(blocker, output) - } yield records => sink(records.map(AttributedData(_, "", Map.empty))) - - def initAttributed[F[_]: ConcurrentEffect: ContextShift: Parallel: Sync: Timer]( - blocker: Blocker, - output: Output - ): Resource[F, AttributedByteSink[F]] = - output match { - case o: Output.RabbitMQ => - val mapped = mapConfig(o.cluster) - initSink[F](blocker, o, mapped) - case o => - Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"Output $o is not RabbitMQ"))) - } - - private def initSink[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( - blocker: Blocker, - rawConfig: Output.RabbitMQ, - config: Fs2RabbitConfig - ): Resource[F, AttributedByteSink[F]] = - for { - client <- Resource.eval(createClient[F](blocker, config)) - channel <- client.createConnectionChannel - publisher <- Resource.eval { - implicit val ch = channel - val exchangeName = ExchangeName(rawConfig.exchange) - client.declareExchangePassive(exchangeName) *> - client.createPublisher[String](exchangeName, RoutingKey(rawConfig.routingKey)) - } - sink = (records: List[AttributedData[Array[Byte]]]) => - records - .map(_.data) - .parTraverse_ { bytes => - publisher(new String(bytes)) - .retryingOnAllErrors( - policy = Retries.fullJitter[F](rawConfig.backoffPolicy), - onError = (exception, retryDetails) => - Logger[F] - .error(exception)( - s"Writing to ${rawConfig.exchange} errored (${retryDetails.retriesSoFar} retries from cats-retry)" - ) - ) - } - } yield sink - -} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Source.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Source.scala deleted file mode 100644 index 78264ef49..000000000 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Source.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.rabbitmq - -import cats.Applicative -import cats.data.Kleisli -import cats.implicits._ - -import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync} - -import fs2.Stream - -import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig -import dev.profunktor.fs2rabbit.model._ -import dev.profunktor.fs2rabbit.interpreter.RabbitClient -import dev.profunktor.fs2rabbit.effects.EnvelopeDecoder - -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input - -object Source { - - def init[F[_]: ConcurrentEffect: ContextShift]( - blocker: Blocker, - input: Input - ): Stream[F, Record[F]] = - input match { - case r: Input.RabbitMQ => - val mapped = mapConfig(r.cluster) - initSource[F](blocker, r, mapped) - case i => - Stream.raiseError[F](new IllegalArgumentException(s"Input $i is not RabbitMQ")) - } - - private def initSource[F[_]: ConcurrentEffect: ContextShift]( - blocker: Blocker, - rawConfig: Input.RabbitMQ, - config: Fs2RabbitConfig - ): Stream[F, Record[F]] = - for { - client <- Stream.eval[F, RabbitClient[F]](createClient[F](blocker, config)) - records <- createStreamFromClient(client, rawConfig) - } yield records - - private def createStreamFromClient[F[_]: Sync]( - client: RabbitClient[F], - rawConfig: Input.RabbitMQ - ): Stream[F, Record[F]] = - Stream.resource(client.createConnectionChannel).flatMap { implicit channel => - val queueName = QueueName(rawConfig.queue) - for { - _ <- Stream.eval(client.declareQueuePassive(queueName)) - (acker, stream) <- Stream.eval(client.createAckerConsumer[Array[Byte]](queueName)) - records <- stream.map(envelope => Record(envelope.payload, acker(AckResult.Ack(envelope.deliveryTag)))) - } yield records - } - - implicit def bytesDecoder[F[_]: Applicative]: EnvelopeDecoder[F, Array[Byte]] = - Kleisli(_.payload.pure[F]) -} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/package.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/package.scala deleted file mode 100644 index c30c07e12..000000000 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/package.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich - -import cats.effect.{Blocker, ConcurrentEffect, ContextShift} - -import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} -import dev.profunktor.fs2rabbit.interpreter.RabbitClient - -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.RabbitMQConfig - -package object rabbitmq { - - def mapConfig(raw: RabbitMQConfig): Fs2RabbitConfig = - Fs2RabbitConfig( - nodes = raw.nodes.map(node => Fs2RabbitNodeConfig(node.host, node.port)), - username = Some(raw.username), - password = Some(raw.password), - virtualHost = raw.virtualHost, - ssl = raw.ssl, - connectionTimeout = raw.connectionTimeout, - requeueOnNack = false, // nack is never used in the app - requeueOnReject = false, // reject is never used in the app - internalQueueSize = Some(raw.internalQueueSize), - automaticRecovery = raw.automaticRecovery, - requestedHeartbeat = raw.requestedHeartbeat - ) - - def createClient[F[_]: ConcurrentEffect: ContextShift]( - blocker: Blocker, - config: Fs2RabbitConfig - ): F[RabbitClient[F]] = - RabbitClient[F](config, blocker) -} diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 823933f43..7812d1ada 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -66,14 +66,6 @@ object BuildSettings { buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.kinesis.generated" ) - lazy val rabbitmqProjectSettings = projectSettings ++ Seq( - name := "snowplow-enrich-rabbitmq", - moduleName := "snowplow-enrich-rabbitmq", - description := "High-performance streaming enrich app for RabbitMQ, built on top of functional streams", - buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description), - buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.rabbitmq.generated" - ) - lazy val kafkaProjectSettings = projectSettings ++ Seq( name := "snowplow-enrich-kafka", moduleName := "snowplow-enrich-kafka", @@ -243,16 +235,6 @@ object BuildSettings { lazy val kinesisDistrolessBuildSettings = kinesisBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless - lazy val rabbitmqBuildSettings = { - // Project - rabbitmqProjectSettings ++ buildSettings ++ - // Build and publish - assemblySettings ++ dockerSettingsFocal ++ - Seq(Docker / packageName := "snowplow-enrich-rabbitmq") - } - - lazy val rabbitmqDistrolessBuildSettings = rabbitmqBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless - lazy val kafkaBuildSettings = { // Project kafkaProjectSettings ++ buildSettings ++ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 601a83fe9..b3e116a48 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -91,7 +91,6 @@ object Dependencies { val catsRetry = "2.1.0" val specsDiff = "0.6.0" val eventGen = "0.2.0" - val fs2RabbitMQ = "3.0.1" // latest version without CE3 val snowplowTracker = "1.0.0" @@ -203,7 +202,6 @@ object Dependencies { val http4sServer = "org.http4s" %% "http4s-blaze-server" % V.http4s % Test val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.snowplowTracker val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.snowplowTracker - val fs2RabbitMQ = "dev.profunktor" %% "fs2-rabbit" % V.fs2RabbitMQ // compiler plugins val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor @@ -306,9 +304,6 @@ object Dependencies { sts, specs2 ) - val rabbitmqDependencies = Seq( - fs2RabbitMQ - ) val kafkaDependencies = Seq( fs2Kafka,