# Crea el proyecto Spring Boot


## Configura el proyecto

Crea un proyecto Spring Boot desde [Spring Initializr](https://start.spring.io)
seleccionando los iniciadores **Spring for Apache Kafka**, **Spring Data JPA**, **MySQL Driver**
y **Testcontainers**.

Como alternativa, clona el
[repositorio de la guía](https://github.com/testcontainers/tc-guide-testing-spring-boot-kafka-listener).

Después de generar la aplicación, añade la biblioteca Awaitility como una dependencia
de prueba. La usarás más adelante para verificar las expectativas de un flujo de proceso
asíncrono.

Las dependencias clave en `pom.xml` son:

```xml
<properties>
    <java.version>17</java.version>
    <testcontainers.version>2.0.4</testcontainers.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-junit-jupiter</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-kafka</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-mysql</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.awaitility</groupId>
        <artifactId>awaitility</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
```

Se recomienda usar la BOM (Bill of Materials) de Testcontainers para que no
tengas que repetir la versión para cada dependencia de módulo de Testcontainers.

## Crea la entidad JPA

La aplicación escucha en un tema (topic) llamado `product-price-changes`. Cuando llega un
mensaje, extrae el código del producto y el precio del payload del evento
y actualiza el precio de ese producto en la base de datos MySQL.

Crea `Product.java`:

```java
package com.testcontainers.demo;

import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;

@Entity
@Table(name = "products")
class Product {

  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  private Long id;

  @Column(nullable = false, unique = true)
  private String code;

  @Column(nullable = false)
  private String name;

  @Column(nullable = false)
  private BigDecimal price;

  public Product() {}

  public Product(Long id, String code, String name, BigDecimal price) {
    this.id = id;
    this.code = code;
    this.name = name;
    this.price = price;
  }

  public Long getId() {
    return id;
  }

  public void setId(Long id) {
    this.id = id;
  }

  public String getCode() {
    return code;
  }

  public void setCode(String code) {
    this.code = code;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public BigDecimal getPrice() {
    return price;
  }

  public void setPrice(BigDecimal price) {
    this.price = price;
  }
}
```

## Crea el repositorio de Spring Data JPA

Crea una interfaz de repositorio para la entidad `Product` con un método para buscar un
producto por código y un método para actualizar el precio de un código de producto determinado:

```java
package com.testcontainers.demo;

import java.math.BigDecimal;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

interface ProductRepository extends JpaRepository<Product, Long> {
  Optional<Product> findByCode(String code);

  @Modifying
  @Query("update Product p set p.price = :price where p.code = :productCode")
  void updateProductPrice(
    @Param("productCode") String productCode,
    @Param("price") BigDecimal price
  );
}
```

## Añade un script de creación de esquema

Dado que la aplicación no utiliza una base de datos en memoria, necesitas crear
las tablas de MySQL. El enfoque recomendado para producción es una herramienta de migración
como Flyway o Liquibase, pero para esta guía la inicialización de esquema
integrada de Spring Boot es suficiente.

Crea `src/main/resources/schema.sql`:

```sql
create table products (
      id int NOT NULL AUTO_INCREMENT,
      code varchar(255) not null,
      name varchar(255) not null,
      price numeric(5,2) not null,
      PRIMARY KEY (id),
      UNIQUE (code)
);
```

Habilita la inicialización del esquema en `src/main/resources/application.properties`:

```properties
spring.sql.init.mode=always
```

## Crea el payload del evento

Crea un record llamado `ProductPriceChangedEvent` que represente la estructura
del payload del evento recibido desde el tema de Kafka:

```java
package com.testcontainers.demo;

import java.math.BigDecimal;

record ProductPriceChangedEvent(String productCode, BigDecimal price) {}
```

El emisor y el receptor acuerdan el siguiente formato JSON:

```json
{
  "productCode": "P100",
  "price": 25.0
}
```

## Implementa el listener de Kafka

Crea `ProductPriceChangedEventHandler.java`, que maneja los mensajes del
tema `product-price-changes` y actualiza el precio del producto en la base de datos:

```java
package com.testcontainers.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
@Transactional
class ProductPriceChangedEventHandler {

  private static final Logger log = LoggerFactory.getLogger(
    ProductPriceChangedEventHandler.class
  );

  private final ProductRepository productRepository;

  ProductPriceChangedEventHandler(ProductRepository productRepository) {
    this.productRepository = productRepository;
  }

  @KafkaListener(topics = "product-price-changes", groupId = "demo")
  public void handle(ProductPriceChangedEvent event) {
    log.info(
      "Received a ProductPriceChangedEvent with productCode:{}: ",
      event.productCode()
    );
    productRepository.updateProductPrice(event.productCode(), event.price());
  }
}
```

La anotación `@KafkaListener` especifica el nombre del tema a escuchar. Spring
Kafka maneja la serialización y deserialización basándose en las propiedades
configuradas en `application.properties`.

## Configura la serialización de Kafka

Añade las siguientes propiedades de Kafka a
`src/main/resources/application.properties`:

```properties
######## Configuración de Kafka  #########
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.group-id=demo
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.testcontainers.demo
```

La clave `productCode` se (des)serializa usando `StringSerializer`/`StringDeserializer`,
y el valor `ProductPriceChangedEvent` se (des)serializa usando
`JsonSerializer`/`JsonDeserializer`.

