Compartir comentarios
Las respuestas se generan en base a la documentación.

Escribe pruebas con Testcontainers

Para probar el listener de Kafka, necesitas un broker de Kafka en ejecución y una base de datos MySQL, además de un contexto de aplicación de Micronaut iniciado. Testcontainers levanta ambos servicios en contenedores Docker y la interfaz TestPropertyProvider los conecta a Micronaut.

Crear un cliente de Kafka para pruebas

Primero, crea una interfaz @KafkaClient para publicar eventos en la prueba:

package com.testcontainers.demo;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaClient
public interface ProductPriceChangesClient {

    @Topic("product-price-changes")
    void send(@KafkaKey String productCode, ProductPriceChangedEvent event);
}

Detalles clave:

  • La anotación @KafkaClient designa esta interfaz como un productor de Kafka.
  • La anotación @Topic especifica el tema de destino.
  • La anotación @KafkaKey marca el parámetro utilizado como la clave del mensaje de Kafka. Si no existe tal parámetro, Micronaut envía el registro con una clave nula.

Escribir la prueba

Crea ProductPriceChangedEventHandlerTest.java:

package com.testcontainers.demo;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import io.micronaut.context.annotation.Property;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.test.support.TestPropertyProvider;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@MicronautTest(transactional = false)
@Property(name = "datasources.default.driver-class-name", value = "org.testcontainers.jdbc.ContainerDatabaseDriver")
@Property(name = "datasources.default.url", value = "jdbc:tc:mysql:8.0.32:///db")
@Testcontainers(disabledWithoutDocker = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ProductPriceChangedEventHandlerTest implements TestPropertyProvider {

    @Container
    static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0");

    @Override
    public @NonNull Map<String, String> getProperties() {
        if (!kafka.isRunning()) {
            kafka.start();
        }
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
    }

    @Test
    void shouldHandleProductPriceChangedEvent(
            ProductPriceChangesClient productPriceChangesClient, ProductRepository productRepository) {
        Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
        Long id = productRepository.save(product).getId();

        ProductPriceChangedEvent event = new ProductPriceChangedEvent("P100", new BigDecimal("14.50"));

        productPriceChangesClient.send(event.productCode(), event);

        await().pollInterval(Duration.ofSeconds(3)).atMost(10, SECONDS).untilAsserted(() -> {
            Optional<Product> optionalProduct = productRepository.findByCode("P100");
            assertThat(optionalProduct).isPresent();
            assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
            assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
        });

        productRepository.deleteById(id);
    }
}

Esto es lo que hace la prueba:

  • @MicronautTest inicializa el contexto de la aplicación Micronaut y el servidor embebido. Establecer transactional en false evita que cada método de prueba se ejecute dentro de una transacción con rollback, lo cual es necesario porque el listener de Kafka procesa los mensajes en un hilo separado.
  • Las anotaciones @Property sobrescriben el controlador del origen de datos (datasource driver) y la URL para usar la URL JDBC especial de Testcontainers (jdbc:tc:mysql:8.0.32:///db). Esto levanta un contenedor de MySQL y lo configura automáticamente como origen de datos.
  • @Testcontainers y @Container gestionan el ciclo de vida del contenedor de Kafka. La interfaz TestPropertyProvider registra los servidores de arranque (bootstrap servers) de Kafka con Micronaut para que el productor y el consumidor se conecten al contenedor de prueba.
  • @TestInstance(TestInstance.Lifecycle.PER_CLASS) crea una única instancia de prueba para todos los métodos de prueba, lo cual es obligatorio al implementar TestPropertyProvider.
  • La prueba crea un registro Product en la base de datos, luego envía un ProductPriceChangedEvent al tema product-price-changes utilizando ProductPriceChangesClient.
  • Debido a que el procesamiento de mensajes de Kafka es asíncrono, la prueba utiliza Awaitility para realizar consultas (poll) cada 3 segundos (hasta un máximo de 10 segundos) hasta que el precio del producto en la base de datos coincida con el valor esperado.