Skip to content

Commit

Permalink
feat: adds Monitoring queue and event #1638
Browse files Browse the repository at this point in the history
  • Loading branch information
mvanzalu committed Jan 22, 2025
1 parent fb47d47 commit eda1fdb
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@
import org.icij.datashare.Repository;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.openmetrics.StatusMapper;
import org.icij.datashare.extract.DocumentCollectionFactory;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.extract.queue.DocumentQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.io.IOException;

@Singleton
@Prefix("/api")
Expand All @@ -46,7 +42,7 @@ public StatusResource(PropertiesProvider propertiesProvider, Repository reposito
@ApiResponse(responseCode = "504", description = "proxy error when elasticsearch is down", useReturnTypeSchema = true)
@ApiResponse(responseCode = "503", description = "service unavailable when other services are down", useReturnTypeSchema = true)
@Get("/status")
public Payload getStatus(Context context) {
public Payload getStatus(Context context) throws IOException {
Status status = new Status(repository.getHealth(), indexer.getHealth(), taskManager.getHealth());
if ("openmetrics".equals(context.request().query().get("format"))) {
return new Payload("text/plain;version=0.0.4",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package org.icij.datashare.asynctasks;

import org.icij.datashare.asynctasks.bus.amqp.CancelledEvent;
import org.icij.datashare.asynctasks.bus.amqp.ErrorEvent;
import org.icij.datashare.asynctasks.bus.amqp.ProgressEvent;
import org.icij.datashare.asynctasks.bus.amqp.ResultEvent;
import org.icij.datashare.asynctasks.bus.amqp.TaskEvent;
import org.icij.datashare.asynctasks.bus.amqp.*;
import org.icij.datashare.batch.WebQueryPagination;
import org.icij.datashare.json.JsonObjectMapper;
import org.icij.datashare.user.User;
Expand Down Expand Up @@ -42,7 +38,7 @@ public interface TaskManager extends Closeable {

void clear() throws IOException;

boolean getHealth();
boolean getHealth() throws IOException;

default List<Task<?>> getTasks(User user) throws IOException {
return getTasks(user, new HashMap<>(), new WebQueryPagination());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package org.icij.datashare.asynctasks;

import org.icij.datashare.asynctasks.bus.amqp.AmqpConsumer;
import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor;
import org.icij.datashare.asynctasks.bus.amqp.AmqpQueue;
import org.icij.datashare.asynctasks.bus.amqp.CancelEvent;
import org.icij.datashare.asynctasks.bus.amqp.ShutdownEvent;
import org.icij.datashare.asynctasks.bus.amqp.TaskEvent;
import com.rabbitmq.client.ShutdownSignalException;
import org.icij.datashare.asynctasks.bus.amqp.*;

import org.icij.datashare.tasks.RoutingStrategy;

Expand Down Expand Up @@ -113,7 +109,14 @@ public void clear() {
}

@Override
public boolean getHealth() {
return amqp.getHealth();
public boolean getHealth() throws IOException {
try {
logger.info("sending monitoring event");
amqp.publish(AmqpQueue.MONITORING, new MonitoringEvent());
} catch (ShutdownSignalException e) {
logger.error("error sending monitoring event", e);
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ public enum AmqpQueue {
"x-consumer-timeout", 3600 * 1000), TASK_DLQ),
MANAGER_EVENT_DLQ ("exchangeDLQManagerEvents", BuiltinExchangeType.DIRECT,"routingKeyDLQManagerEvents"),
MANAGER_EVENT ("exchangeManagerEvents", BuiltinExchangeType.DIRECT,"routingKeyManagerEvents", new HashMap<>(), MANAGER_EVENT_DLQ),
WORKER_EVENT("exchangeWorkerEvents", BuiltinExchangeType.FANOUT, "routingKeyWorkerEvents");
WORKER_EVENT("exchangeWorkerEvents", BuiltinExchangeType.FANOUT, "routingKeyWorkerEvents"),
MONITORING("exchangeMonitoring", BuiltinExchangeType.DIRECT, "routingKeyMonitoring", Map.of(
"x-message-ttl", 5000,
"x-expires", 10000), null);

public final String exchange;
public final String routingKey;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.icij.datashare.asynctasks.bus.amqp;

public class MonitoringEvent extends Event { }
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@
import org.icij.extract.redis.RedissonClientFactory;
import org.icij.task.Options;
import org.junit.*;
import org.redisson.Redisson;
import org.redisson.RedissonMap;
import org.redisson.api.RedissonClient;
import org.redisson.command.CommandSyncService;
import org.redisson.liveobject.core.RedissonObjectBuilder;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.fest.assertions.Assertions.assertThat;

Expand Down Expand Up @@ -169,7 +168,7 @@ public void test_task_canceled() throws Exception {
}

@Test
public void test_health_ok() {
public void test_health_ok() throws Exception {
assertThat(taskManager.getHealth()).isTrue();
}

Expand All @@ -180,6 +179,7 @@ public void test_health_ko() throws Exception {
}}));
amqpKo.createAmqpChannelForPublish(AmqpQueue.TASK);
amqpKo.createAmqpChannelForPublish(AmqpQueue.MANAGER_EVENT);
amqpKo.createAmqpChannelForPublish(AmqpQueue.MONITORING);
RedissonClient redissonClientKo = new RedissonClientFactory().withOptions(
Options.from(new PropertiesProvider(Map.of("redisAddress", "redis://redis:6379")).getProperties())).create();
TaskManagerAmqp taskManagerAmqpKo = new TaskManagerAmqp(amqpKo, new TaskRepositoryRedis(redissonClientKo, "tasks:queue:test"), RoutingStrategy.UNIQUE, () -> nextMessage.countDown());
Expand All @@ -196,6 +196,7 @@ public static void beforeClass() throws Exception {
}}));
AMQP.createAmqpChannelForPublish(AmqpQueue.TASK);
AMQP.createAmqpChannelForPublish(AmqpQueue.MANAGER_EVENT);
AMQP.createAmqpChannelForPublish(AmqpQueue.MONITORING);
}

@Before
Expand Down

0 comments on commit eda1fdb

Please sign in to comment.