Compartir comentarios
Las respuestas se generan en base a la documentación.

Crea el proyecto Spring Boot

Configura el proyecto

Crea un proyecto Spring Boot desde Spring Initializr seleccionando los iniciadores Spring for Apache Kafka, Spring Data JPA, MySQL Driver y Testcontainers.

Como alternativa, clona el repositorio de la guía.

Después de generar la aplicación, añade la biblioteca Awaitility como una dependencia de prueba. La usarás más adelante para verificar las expectativas de un flujo de proceso asíncrono.

Las dependencias clave en pom.xml son:

<properties>
    <java.version>17</java.version>
    <testcontainers.version>2.0.4</testcontainers.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-junit-jupiter</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-kafka</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-mysql</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.awaitility</groupId>
        <artifactId>awaitility</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Se recomienda usar la BOM (Bill of Materials) de Testcontainers para que no tengas que repetir la versión para cada dependencia de módulo de Testcontainers.

Crea la entidad JPA

La aplicación escucha en un tema (topic) llamado product-price-changes. Cuando llega un mensaje, extrae el código del producto y el precio del payload del evento y actualiza el precio de ese producto en la base de datos MySQL.

Crea Product.java:

package com.testcontainers.demo;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;

@Entity
@Table(name = "products")
class Product {

  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id;

  @Column(nullable = false, unique = true)
  private String code;

  @Column(nullable = false)
  private String name;

  @Column(nullable = false)
  private BigDecimal price;

  public Product() {}

  public Product(Long id, String code, String name, BigDecimal price) {
    this.id = id;
    this.code = code;
    this.name = name;
    this.price = price;
  }

  public Long getId() {
    return id;
  }

  public void setId(Long id) {
    this.id = id;
  }

  public String getCode() {
    return code;
  }

  public void setCode(String code) {
    this.code = code;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public BigDecimal getPrice() {
    return price;
  }

  public void setPrice(BigDecimal price) {
    this.price = price;
  }
}

Crea el repositorio de Spring Data JPA

Crea una interfaz de repositorio para la entidad Product con un método para buscar un producto por código y un método para actualizar el precio de un código de producto determinado:

package com.testcontainers.demo;

import java.math.BigDecimal;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

interface ProductRepository extends JpaRepository<Product, Long> {
  Optional<Product> findByCode(String code);

  @Modifying
  @Query("update Product p set p.price = :price where p.code = :productCode")
  void updateProductPrice(
    @Param("productCode") String productCode,
    @Param("price") BigDecimal price
  );
}

Añade un script de creación de esquema

Dado que la aplicación no utiliza una base de datos en memoria, necesitas crear las tablas de MySQL. El enfoque recomendado para producción es una herramienta de migración como Flyway o Liquibase, pero para esta guía la inicialización de esquema integrada de Spring Boot es suficiente.

Crea src/main/resources/schema.sql:

create table products (
      id int NOT NULL AUTO_INCREMENT,
      code varchar(255) not null,
      name varchar(255) not null,
      price numeric(5,2) not null,
      PRIMARY KEY (id),
      UNIQUE (code)
);

Habilita la inicialización del esquema en src/main/resources/application.properties:

spring.sql.init.mode=always

Crea el payload del evento

Crea un record llamado ProductPriceChangedEvent que represente la estructura del payload del evento recibido desde el tema de Kafka:

package com.testcontainers.demo;

import java.math.BigDecimal;

record ProductPriceChangedEvent(String productCode, BigDecimal price) {}

El emisor y el receptor acuerdan el siguiente formato JSON:

{
  "productCode": "P100",
  "price": 25.0
}

Implementa el listener de Kafka

Crea ProductPriceChangedEventHandler.java, que maneja los mensajes del tema product-price-changes y actualiza el precio del producto en la base de datos:

package com.testcontainers.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@Transactional
class ProductPriceChangedEventHandler {

  private static final Logger log = LoggerFactory.getLogger(
    ProductPriceChangedEventHandler.class
  );

  private final ProductRepository productRepository;

  ProductPriceChangedEventHandler(ProductRepository productRepository) {
    this.productRepository = productRepository;
  }

  @KafkaListener(topics = "product-price-changes", groupId = "demo")
  public void handle(ProductPriceChangedEvent event) {
    log.info(
      "Received a ProductPriceChangedEvent with productCode:{}: ",
      event.productCode()
    );
    productRepository.updateProductPrice(event.productCode(), event.price());
  }
}

La anotación @KafkaListener especifica el nombre del tema a escuchar. Spring Kafka maneja la serialización y deserialización basándose en las propiedades configuradas en application.properties.

Configura la serialización de Kafka

Añade las siguientes propiedades de Kafka a src/main/resources/application.properties:

######## Configuración de Kafka  #########
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.group-id=demo
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.testcontainers.demo

La clave productCode se (des)serializa usando StringSerializer/StringDeserializer, y el valor ProductPriceChangedEvent se (des)serializa usando JsonSerializer/JsonDeserializer.