# Kickoff Java Project — Kafka CDC Module
Kafka CDC module (conditional: Kafka=Yes). Scaffolds the module skeleton only.
> **Important**: This file scaffolds the kafka module structure and configuration. To add actual CDC consumers (event models, listeners, services, DLT handling, integration tests), use the **`/add-java-kafka-cdc-consumer`** skill after the project is scaffolded.
Reference: Janus `janus-kafka/` module.
---
## POM dependencies
```xml
${project.groupId}-common
${project.groupId}-model
${project.groupId}-core
org.springframework.bootspring-boot-starter
org.springframework.bootspring-boot-starter-validation
org.springframework.bootspring-boot-starter-kafka
org.springframework.bootspring-boot-starter-json
com.fasterxml.jackson.corejackson-databind
org.springframework.retryspring-retry
org.springframework.bootspring-boot-starter-aspectj
software.amazon.mskaws-msk-iam-auth
com.datadoghqdd-trace-api
org.springframework.bootspring-boot-starter-testtest
org.springframework.bootspring-boot-starter-kafka-testtest
org.testcontainerstestcontainers-kafkatest
org.testcontainerstestcontainers-junit-jupitertest
```
---
## KafkaConfiguration skeleton
Must be **public** for SB4 cross-package imports. Discovered by `scanBasePackages` on Application class.
```java
@Configuration
@EnableRetry
public class KafkaConfiguration {
@Bean
ProducerFactory dltProducerFactory(final KafkaProperties kafkaProperties) {
final Map props = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(props, new StringSerializer(), new JacksonJsonSerializer<>());
}
@Bean
KafkaTemplate dltTemplate(final ProducerFactory dltProducerFactory) {
return new KafkaTemplate<>(dltProducerFactory);
}
}
```
`KafkaProperties` import in SB4: `org.springframework.boot.kafka.autoconfigure.KafkaProperties` (changed from `org.springframework.boot.autoconfigure.kafka.KafkaProperties` in SB3).
Consumer/producer/DLT factories for each CDC entity are added by `/add-java-kafka-cdc-consumer`.
---
## application.yml kafka block
```yaml
spring:
kafka:
admin:
auto-create: true
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
group-id:
client-id: -client-id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JacksonJsonDeserializer
properties:
spring:
json:
trusted:
packages: co.humand..kafka.model
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 1000
heartbeat-interval: 3000
max-poll-records: 10
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JacksonJsonSerializer
listener:
concurrency: 1
topics:
placeholder-cdc: humand.cdc.placeholder
placeholder-dlt: humand.cdc.placeholder-dlt
```
---
## application-test.yml kafka overrides
```yaml
spring:
kafka:
admin:
auto-create: true
consumer:
group-id: -test
client-id: -test-client
auto-offset-reset: earliest
enable-auto-commit: true
topics:
placeholder-cdc: humand.cdc.placeholder.test
placeholder-dlt: humand.cdc.placeholder-dlt.test
testcontainers:
kafka:
enabled: true
```
---
## TestContainersConfiguration additions
```java
@Bean
@ServiceConnection(name = "kafka")
@ConditionalOnProperty(name = "testcontainers.kafka.enabled", havingValue = "true")
ConfluentKafkaContainer kafkaContainer() {
return new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.8.0"));
}
@Bean
@ConditionalOnProperty(name = "testcontainers.kafka.enabled", havingValue = "true")
DynamicPropertyRegistrar kafkaPropertiesRegistrar(final ConfluentKafkaContainer kafkaContainer) {
return registry -> registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
```
---
## KafkaDLTTestListenerConfig skeleton
```java
@TestConfiguration
public class KafkaDLTTestListenerConfig {
private final BlockingQueue> placeholderQueue = new LinkedBlockingQueue<>();
@KafkaListener(topics = "${spring.kafka.topics.placeholder-dlt}")
public void placeholderDLTListener(final ConsumerRecord record) {
placeholderQueue.add(record);
}
@Bean
BlockingQueue> placeholderDLTMessagesQueue() {
return placeholderQueue;
}
}
```
Additional DLT listeners are added per-entity by `/add-java-kafka-cdc-consumer`.
---
## Worker toggle (Terraform)
In the project module's `type_environments` local:
```hcl
type_environments = {
api = [{ name = "HUMAND__KAFKA_CDC_LISTENER_ENABLED", value = "false" }]
worker = [{ name = "HUMAND__KAFKA_CDC_LISTENER_ENABLED", value = "true" }]
}
```
---
## Next step
After scaffolding, run **`/add-java-kafka-cdc-consumer`** to add your first CDC consumer. It will analyze reference repos, scan your Liquibase migrations, and generate all required artifacts (event models, listeners, services, DLT handling, integration tests, Terraform SSM/IAM).