Skip to content

Commit

Permalink
feat: adds getHealth function in TaskManagerAmqp #1638
Browse files Browse the repository at this point in the history
  • Loading branch information
mvanzalu committed Jan 27, 2025
1 parent 0ff4b35 commit f858417
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,9 @@ public void close() throws IOException {
public void clear() {
tasks.clear();
}

@Override
public boolean getHealth() {
return amqp.getHealth();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ public void deleteQueues(AmqpQueue... amqpQueues) throws IOException {
}
}

public boolean getHealth() {
if (!connection.isOpen()) {
logger.error("TaskManager AMQP Health error : Connection is closed");
return false;
}
if (connection.getHeartbeat() <= 0) {
logger.error("TaskManager AMQP Health error : heartbeat equals 0");
return false;
}
return true;
}

private static class UnknownChannelException extends RuntimeException {
public UnknownChannelException(AmqpQueue queue) {
super("Unknown channel for queue " + queue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,27 @@ public void test_task_canceled() throws Exception {
assertThat(taskManager.getTask(task.id).getState()).isEqualTo(Task.State.CANCELLED);
}

@Test
public void test_health_ok() {
assertThat(taskManager.getHealth()).isTrue();
}

@Test
public void test_health_ko() throws Exception {
AmqpInterlocutor amqpKo = new AmqpInterlocutor(new PropertiesProvider(new HashMap<>() {{
put("messageBusAddress", "amqp://admin:admin@localhost?rabbitMq=false");
}}));
amqpKo.createAmqpChannelForPublish(AmqpQueue.TASK);
amqpKo.createAmqpChannelForPublish(AmqpQueue.MANAGER_EVENT);
RedissonClient redissonClientKo = new RedissonClientFactory().withOptions(
Options.from(new PropertiesProvider(Map.of("redisAddress", "redis://redis:6379")).getProperties())).create();
TaskManagerAmqp taskManagerAmqpKo = new TaskManagerAmqp(amqpKo, new TaskRepositoryRedis(redissonClientKo, "tasks:queue:test"), RoutingStrategy.UNIQUE, () -> nextMessage.countDown());

amqpKo.close();

assertThat(taskManagerAmqpKo.getHealth()).isFalse();
}

@BeforeClass
public static void beforeClass() throws Exception {
AMQP = new AmqpInterlocutor(new PropertiesProvider(new HashMap<>() {{
Expand Down

0 comments on commit f858417

Please sign in to comment.