# Escribe pruebas con Testcontainers


Para probar el listener de Kafka, necesitas un broker de Kafka en ejecución y una base de datos MySQL, además de un contexto de aplicación de Micronaut iniciado. Testcontainers levanta ambos servicios en contenedores Docker y la interfaz `TestPropertyProvider` los conecta a Micronaut.

## Crear un cliente de Kafka para pruebas

Primero, crea una interfaz `@KafkaClient` para publicar eventos en la prueba:

```java
package com.testcontainers.demo;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.Topic;

@KafkaClient
public interface ProductPriceChangesClient {

    @Topic("product-price-changes")
    void send(@KafkaKey String productCode, ProductPriceChangedEvent event);
}
```

Detalles clave:

- La anotación `@KafkaClient` designa esta interfaz como un productor de Kafka.
- La anotación `@Topic` especifica el tema de destino.
- La anotación `@KafkaKey` marca el parámetro utilizado como la clave del mensaje de Kafka. Si no existe tal parámetro, Micronaut envía el registro con una clave nula.

## Escribir la prueba

Crea `ProductPriceChangedEventHandlerTest.java`:

```java
package com.testcontainers.demo;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import io.micronaut.context.annotation.Property;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.test.support.TestPropertyProvider;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@MicronautTest(transactional = false)
@Property(name = "datasources.default.driver-class-name", value = "org.testcontainers.jdbc.ContainerDatabaseDriver")
@Property(name = "datasources.default.url", value = "jdbc:tc:mysql:8.0.32:///db")
@Testcontainers(disabledWithoutDocker = true)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ProductPriceChangedEventHandlerTest implements TestPropertyProvider {

    @Container
    static final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0");

    @Override
    public @NonNull Map<String, String> getProperties() {
        if (!kafka.isRunning()) {
            kafka.start();
        }
        return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
    }

    @Test
    void shouldHandleProductPriceChangedEvent(
            ProductPriceChangesClient productPriceChangesClient, ProductRepository productRepository) {
        Product product = new Product(null, "P100", "Product One", BigDecimal.TEN);
        Long id = productRepository.save(product).getId();

        ProductPriceChangedEvent event = new ProductPriceChangedEvent("P100", new BigDecimal("14.50"));

        productPriceChangesClient.send(event.productCode(), event);

        await().pollInterval(Duration.ofSeconds(3)).atMost(10, SECONDS).untilAsserted(() -> {
            Optional<Product> optionalProduct = productRepository.findByCode("P100");
            assertThat(optionalProduct).isPresent();
            assertThat(optionalProduct.get().getCode()).isEqualTo("P100");
            assertThat(optionalProduct.get().getPrice()).isEqualTo(new BigDecimal("14.50"));
        });

        productRepository.deleteById(id);
    }
}
```

Esto es lo que hace la prueba:

- `@MicronautTest` inicializa el contexto de la aplicación Micronaut y el servidor embebido. Establecer `transactional` en `false` evita que cada método de prueba se ejecute dentro de una transacción con rollback, lo cual es necesario porque el listener de Kafka procesa los mensajes en un hilo separado.
- Las anotaciones `@Property` sobrescriben el controlador del origen de datos (datasource driver) y la URL para usar la URL JDBC especial de Testcontainers (`jdbc:tc:mysql:8.0.32:///db`). Esto levanta un contenedor de MySQL y lo configura automáticamente como origen de datos.
- `@Testcontainers` y `@Container` gestionan el ciclo de vida del contenedor de Kafka. La interfaz `TestPropertyProvider` registra los servidores de arranque (bootstrap servers) de Kafka con Micronaut para que el productor y el consumidor se conecten al contenedor de prueba.
- `@TestInstance(TestInstance.Lifecycle.PER_CLASS)` crea una única instancia de prueba para todos los métodos de prueba, lo cual es obligatorio al implementar `TestPropertyProvider`.
- La prueba crea un registro `Product` en la base de datos, luego envía un `ProductPriceChangedEvent` al tema `product-price-changes` utilizando `ProductPriceChangesClient`.
- Debido a que el procesamiento de mensajes de Kafka es asíncrono, la prueba utiliza [Awaitility](http://www.awaitility.org/) para realizar consultas (poll) cada 3 segundos (hasta un máximo de 10 segundos) hasta que el precio del producto en la base de datos coincida con el valor esperado.

