Crea el proyecto Spring Boot
Configura el proyecto
Crea un proyecto Spring Boot desde Spring Initializr seleccionando los iniciadores Spring for Apache Kafka, Spring Data JPA, MySQL Driver y Testcontainers.
Como alternativa, clona el repositorio de la guía.
Después de generar la aplicación, añade la biblioteca Awaitility como una dependencia de prueba. La usarás más adelante para verificar las expectativas de un flujo de proceso asíncrono.
Las dependencias clave en pom.xml son:
<properties>
<java.version>17</java.version>
<testcontainers.version>2.0.4</testcontainers.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>Se recomienda usar la BOM (Bill of Materials) de Testcontainers para que no tengas que repetir la versión para cada dependencia de módulo de Testcontainers.
Crea la entidad JPA
La aplicación escucha en un tema (topic) llamado product-price-changes. Cuando llega un
mensaje, extrae el código del producto y el precio del payload del evento
y actualiza el precio de ese producto en la base de datos MySQL.
Crea Product.java:
package com.testcontainers.demo;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;
@Entity
@Table(name = "products")
class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String code;
@Column(nullable = false)
private String name;
@Column(nullable = false)
private BigDecimal price;
public Product() {}
public Product(Long id, String code, String name, BigDecimal price) {
this.id = id;
this.code = code;
this.name = name;
this.price = price;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
}Crea el repositorio de Spring Data JPA
Crea una interfaz de repositorio para la entidad Product con un método para buscar un
producto por código y un método para actualizar el precio de un código de producto determinado:
package com.testcontainers.demo;
import java.math.BigDecimal;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
interface ProductRepository extends JpaRepository<Product, Long> {
Optional<Product> findByCode(String code);
@Modifying
@Query("update Product p set p.price = :price where p.code = :productCode")
void updateProductPrice(
@Param("productCode") String productCode,
@Param("price") BigDecimal price
);
}Añade un script de creación de esquema
Dado que la aplicación no utiliza una base de datos en memoria, necesitas crear las tablas de MySQL. El enfoque recomendado para producción es una herramienta de migración como Flyway o Liquibase, pero para esta guía la inicialización de esquema integrada de Spring Boot es suficiente.
Crea src/main/resources/schema.sql:
create table products (
id int NOT NULL AUTO_INCREMENT,
code varchar(255) not null,
name varchar(255) not null,
price numeric(5,2) not null,
PRIMARY KEY (id),
UNIQUE (code)
);Habilita la inicialización del esquema en src/main/resources/application.properties:
spring.sql.init.mode=alwaysCrea el payload del evento
Crea un record llamado ProductPriceChangedEvent que represente la estructura
del payload del evento recibido desde el tema de Kafka:
package com.testcontainers.demo;
import java.math.BigDecimal;
record ProductPriceChangedEvent(String productCode, BigDecimal price) {}El emisor y el receptor acuerdan el siguiente formato JSON:
{
"productCode": "P100",
"price": 25.0
}Implementa el listener de Kafka
Crea ProductPriceChangedEventHandler.java, que maneja los mensajes del
tema product-price-changes y actualiza el precio del producto en la base de datos:
package com.testcontainers.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@Transactional
class ProductPriceChangedEventHandler {
private static final Logger log = LoggerFactory.getLogger(
ProductPriceChangedEventHandler.class
);
private final ProductRepository productRepository;
ProductPriceChangedEventHandler(ProductRepository productRepository) {
this.productRepository = productRepository;
}
@KafkaListener(topics = "product-price-changes", groupId = "demo")
public void handle(ProductPriceChangedEvent event) {
log.info(
"Received a ProductPriceChangedEvent with productCode:{}: ",
event.productCode()
);
productRepository.updateProductPrice(event.productCode(), event.price());
}
}La anotación @KafkaListener especifica el nombre del tema a escuchar. Spring
Kafka maneja la serialización y deserialización basándose en las propiedades
configuradas en application.properties.
Configura la serialización de Kafka
Añade las siguientes propiedades de Kafka a
src/main/resources/application.properties:
######## Configuración de Kafka #########
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.group-id=demo
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.testcontainers.demoLa clave productCode se (des)serializa usando StringSerializer/StringDeserializer,
y el valor ProductPriceChangedEvent se (des)serializa usando
JsonSerializer/JsonDeserializer.