Compartir comentarios
Las respuestas se generan en base a la documentación.

Crear el proyecto Micronaut

Configurar el proyecto

Crea un proyecto Micronaut desde Micronaut Launch seleccionando las características kafka, data-jpa, mysql, awaitility, assertj y testcontainers.

Alternativamente, puedes clonar el repositorio de la guía.

Usarás la biblioteca Awaitility para verificar las expectativas de un flujo de proceso asíncrono.

Las dependencias clave en pom.xml son:

<parent>
    <groupId>io.micronaut.platform</groupId>
    <artifactId>micronaut-parent</artifactId>
    <version>4.1.4</version>
</parent>
<dependencies>
    <dependency>
        <groupId>io.micronaut.data</groupId>
        <artifactId>micronaut-data-hibernate-jpa</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>io.micronaut.kafka</groupId>
        <artifactId>micronaut-kafka</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>io.micronaut.serde</groupId>
        <artifactId>micronaut-serde-jackson</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>io.micronaut.sql</groupId>
        <artifactId>micronaut-jdbc-hikari</artifactId>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.awaitility</groupId>
        <artifactId>awaitility</artifactId>
        <version>4.2.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-junit-jupiter</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-kafka</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-mysql</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

El POM padre de Micronaut gestiona la BOM de Testcontainers, por lo que no necesitas especificar las versiones de los módulos de Testcontainers de forma individual.

Crear la entidad JPA

La aplicación escucha un tema llamado product-price-changes. Cuando llega un mensaje, extrae el código y el precio del producto de la carga útil (payload) del evento y actualiza el precio de ese producto en la base de datos MySQL.

Crea Product.java:

package com.testcontainers.demo;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;

@Entity
@Table(name = "products")
public class Product {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String code;

    @Column(nullable = false)
    private String name;

    @Column(nullable = false)
    private BigDecimal price;

    public Product() {}

    public Product(Long id, String code, String name, BigDecimal price) {
        this.id = id;
        this.code = code;
        this.name = name;
        this.price = price;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }
}

Crear el repositorio Micronaut Data JPA

Crea una interfaz de repositorio para la entidad Product con un método para buscar un producto por código y otro método para actualizar el precio de un código de producto determinado:

package com.testcontainers.demo;

import io.micronaut.data.annotation.Query;
import io.micronaut.data.annotation.Repository;
import io.micronaut.data.jpa.repository.JpaRepository;
import java.math.BigDecimal;
import java.util.Optional;

@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {

    Optional<Product> findByCode(String code);

    @Query("update Product p set p.price = :price where p.code = :productCode")
    void updateProductPrice(String productCode, BigDecimal price);
}

A diferencia de Spring Data JPA, Micronaut Data utiliza el procesamiento de anotaciones en tiempo de compilación para implementar los métodos del repositorio, evitando la reflexión en tiempo de ejecución.

Crear la carga útil del evento

Crea un record llamado ProductPriceChangedEvent que represente la estructura de la carga útil del evento recibida desde el tema de Kafka:

package com.testcontainers.demo;

import io.micronaut.serde.annotation.Serdeable;
import java.math.BigDecimal;

@Serdeable
public record ProductPriceChangedEvent(String productCode, BigDecimal price) {}

La anotación @Serdeable le indica a Micronaut Serialization que este tipo puede ser serializado y deserializado.

El emisor y el receptor acuerdan el siguiente formato JSON:

{
  "productCode": "P100",
  "price": 25.0
}

Implementar el listener de Kafka

Crea ProductPriceChangedEventHandler.java, que maneja los mensajes del tema product-price-changes y actualiza el precio del producto en la base de datos:

package com.testcontainers.demo;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import jakarta.inject.Singleton;
import jakarta.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
@Transactional
class ProductPriceChangedEventHandler {

    private static final Logger LOG = LoggerFactory.getLogger(ProductPriceChangedEventHandler.class);

    private final ProductRepository productRepository;

    ProductPriceChangedEventHandler(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    @Topic("product-price-changes")
    @KafkaListener(offsetReset = EARLIEST, groupId = "demo")
    public void handle(ProductPriceChangedEvent event) {
        LOG.info("Received a ProductPriceChangedEvent with productCode:{}: ", event.productCode());
        productRepository.updateProductPrice(event.productCode(), event.price());
    }
}

Detalles clave:

  • La anotación @KafkaListener marca esta clase como un receptor de mensajes de Kafka (listener). Establecer offsetReset en EARLIEST hace que el listener comience a consumir mensajes desde el principio de la partición, lo cual es útil durante las pruebas.
  • La anotación @Topic especifica a qué tema suscribirse.
  • Micronaut maneja la deserialización JSON de ProductPriceChangedEvent automáticamente usando Micronaut Serialization.

Configurar el origen de datos (datasource)

Añade las siguientes propiedades a src/main/resources/application.properties:

micronaut.application.name=tc-guide-testing-micronaut-kafka-listener
datasources.default.db-type=mysql
datasources.default.dialect=MYSQL
jpa.default.properties.hibernate.hbm2ddl.auto=update
jpa.default.entity-scan.packages=com.testcontainers.demo
datasources.default.driver-class-name=com.mysql.cj.jdbc.Driver

La propiedad de Hibernate hbm2ddl.auto=update crea y updates el esquema de la base de datos automáticamente. Para las pruebas, sobrescribirás esto a create-drop en el archivo de propiedades de prueba.

Crea src/test/resources/application-test.properties:

jpa.default.properties.hibernate.hbm2ddl.auto=create-drop