From eda1fdbbe7f2bff273e42c3f830a30e1598be48e Mon Sep 17 00:00:00 2001 From: mvanzalu Date: Wed, 22 Jan 2025 14:35:56 +0000 Subject: [PATCH] feat: adds Monitoring queue and event #1638 --- .../icij/datashare/web/StatusResource.java | 8 ++------ .../datashare/asynctasks/TaskManager.java | 8 ++------ .../datashare/asynctasks/TaskManagerAmqp.java | 19 +++++++++++-------- .../asynctasks/bus/amqp/AmqpQueue.java | 5 ++++- .../asynctasks/bus/amqp/MonitoringEvent.java | 3 +++ .../asynctasks/TaskManagerAmqpTest.java | 13 +++++++------ 6 files changed, 29 insertions(+), 27 deletions(-) create mode 100644 datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/MonitoringEvent.java diff --git a/datashare-app/src/main/java/org/icij/datashare/web/StatusResource.java b/datashare-app/src/main/java/org/icij/datashare/web/StatusResource.java index ee1d4d59e..e6f7d8be5 100644 --- a/datashare-app/src/main/java/org/icij/datashare/web/StatusResource.java +++ b/datashare-app/src/main/java/org/icij/datashare/web/StatusResource.java @@ -16,13 +16,9 @@ import org.icij.datashare.Repository; import org.icij.datashare.asynctasks.TaskManager; import org.icij.datashare.openmetrics.StatusMapper; -import org.icij.datashare.extract.DocumentCollectionFactory; import org.icij.datashare.text.indexing.Indexer; -import org.icij.extract.queue.DocumentQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.nio.file.Path; +import java.io.IOException; @Singleton @Prefix("/api") @@ -46,7 +42,7 @@ public StatusResource(PropertiesProvider propertiesProvider, Repository reposito @ApiResponse(responseCode = "504", description = "proxy error when elasticsearch is down", useReturnTypeSchema = true) @ApiResponse(responseCode = "503", description = "service unavailable when other services are down", useReturnTypeSchema = true) @Get("/status") - public Payload getStatus(Context context) { + public Payload getStatus(Context context) throws IOException { Status status = new Status(repository.getHealth(), indexer.getHealth(), taskManager.getHealth()); if ("openmetrics".equals(context.request().query().get("format"))) { return new Payload("text/plain;version=0.0.4", diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java index 0b14b5c30..473738c4d 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManager.java @@ -1,10 +1,6 @@ package org.icij.datashare.asynctasks; -import org.icij.datashare.asynctasks.bus.amqp.CancelledEvent; -import org.icij.datashare.asynctasks.bus.amqp.ErrorEvent; -import org.icij.datashare.asynctasks.bus.amqp.ProgressEvent; -import org.icij.datashare.asynctasks.bus.amqp.ResultEvent; -import org.icij.datashare.asynctasks.bus.amqp.TaskEvent; +import org.icij.datashare.asynctasks.bus.amqp.*; import org.icij.datashare.batch.WebQueryPagination; import org.icij.datashare.json.JsonObjectMapper; import org.icij.datashare.user.User; @@ -42,7 +38,7 @@ public interface TaskManager extends Closeable { void clear() throws IOException; - boolean getHealth(); + boolean getHealth() throws IOException; default List> getTasks(User user) throws IOException { return getTasks(user, new HashMap<>(), new WebQueryPagination()); diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java index 7fadf4cf4..51a2ae570 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskManagerAmqp.java @@ -1,11 +1,7 @@ package org.icij.datashare.asynctasks; -import org.icij.datashare.asynctasks.bus.amqp.AmqpConsumer; -import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor; -import org.icij.datashare.asynctasks.bus.amqp.AmqpQueue; -import org.icij.datashare.asynctasks.bus.amqp.CancelEvent; -import org.icij.datashare.asynctasks.bus.amqp.ShutdownEvent; -import org.icij.datashare.asynctasks.bus.amqp.TaskEvent; +import com.rabbitmq.client.ShutdownSignalException; +import org.icij.datashare.asynctasks.bus.amqp.*; import org.icij.datashare.tasks.RoutingStrategy; @@ -113,7 +109,14 @@ public void clear() { } @Override - public boolean getHealth() { - return amqp.getHealth(); + public boolean getHealth() throws IOException { + try { + logger.info("sending monitoring event"); + amqp.publish(AmqpQueue.MONITORING, new MonitoringEvent()); + } catch (ShutdownSignalException e) { + logger.error("error sending monitoring event", e); + return false; + } + return true; } } diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/AmqpQueue.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/AmqpQueue.java index 80d7fbee4..ab7248df8 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/AmqpQueue.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/AmqpQueue.java @@ -19,7 +19,10 @@ public enum AmqpQueue { "x-consumer-timeout", 3600 * 1000), TASK_DLQ), MANAGER_EVENT_DLQ ("exchangeDLQManagerEvents", BuiltinExchangeType.DIRECT,"routingKeyDLQManagerEvents"), MANAGER_EVENT ("exchangeManagerEvents", BuiltinExchangeType.DIRECT,"routingKeyManagerEvents", new HashMap<>(), MANAGER_EVENT_DLQ), - WORKER_EVENT("exchangeWorkerEvents", BuiltinExchangeType.FANOUT, "routingKeyWorkerEvents"); + WORKER_EVENT("exchangeWorkerEvents", BuiltinExchangeType.FANOUT, "routingKeyWorkerEvents"), + MONITORING("exchangeMonitoring", BuiltinExchangeType.DIRECT, "routingKeyMonitoring", Map.of( + "x-message-ttl", 5000, + "x-expires", 10000), null); public final String exchange; public final String routingKey; diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/MonitoringEvent.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/MonitoringEvent.java new file mode 100644 index 000000000..803198c7f --- /dev/null +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/MonitoringEvent.java @@ -0,0 +1,3 @@ +package org.icij.datashare.asynctasks.bus.amqp; + +public class MonitoringEvent extends Event { } diff --git a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java index 4e74a3c4e..d4ef21739 100644 --- a/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java +++ b/datashare-tasks/src/test/java/org/icij/datashare/asynctasks/TaskManagerAmqpTest.java @@ -10,17 +10,16 @@ import org.icij.extract.redis.RedissonClientFactory; import org.icij.task.Options; import org.junit.*; -import org.redisson.Redisson; -import org.redisson.RedissonMap; import org.redisson.api.RedissonClient; -import org.redisson.command.CommandSyncService; -import org.redisson.liveobject.core.RedissonObjectBuilder; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import static org.fest.assertions.Assertions.assertThat; @@ -169,7 +168,7 @@ public void test_task_canceled() throws Exception { } @Test - public void test_health_ok() { + public void test_health_ok() throws Exception { assertThat(taskManager.getHealth()).isTrue(); } @@ -180,6 +179,7 @@ public void test_health_ko() throws Exception { }})); amqpKo.createAmqpChannelForPublish(AmqpQueue.TASK); amqpKo.createAmqpChannelForPublish(AmqpQueue.MANAGER_EVENT); + amqpKo.createAmqpChannelForPublish(AmqpQueue.MONITORING); RedissonClient redissonClientKo = new RedissonClientFactory().withOptions( Options.from(new PropertiesProvider(Map.of("redisAddress", "redis://redis:6379")).getProperties())).create(); TaskManagerAmqp taskManagerAmqpKo = new TaskManagerAmqp(amqpKo, new TaskRepositoryRedis(redissonClientKo, "tasks:queue:test"), RoutingStrategy.UNIQUE, () -> nextMessage.countDown()); @@ -196,6 +196,7 @@ public static void beforeClass() throws Exception { }})); AMQP.createAmqpChannelForPublish(AmqpQueue.TASK); AMQP.createAmqpChannelForPublish(AmqpQueue.MANAGER_EVENT); + AMQP.createAmqpChannelForPublish(AmqpQueue.MONITORING); } @Before