# Kickoff Java Project — Kafka CDC Module Kafka CDC module (conditional: Kafka=Yes). Scaffolds the module skeleton only. > **Important**: This file scaffolds the kafka module structure and configuration. To add actual CDC consumers (event models, listeners, services, DLT handling, integration tests), use the **`/add-java-kafka-cdc-consumer`** skill after the project is scaffolded. Reference: Janus `janus-kafka/` module. --- ## POM dependencies ```xml ${project.groupId}-common ${project.groupId}-model ${project.groupId}-core org.springframework.bootspring-boot-starter org.springframework.bootspring-boot-starter-validation org.springframework.bootspring-boot-starter-kafka org.springframework.bootspring-boot-starter-json com.fasterxml.jackson.corejackson-databind org.springframework.retryspring-retry org.springframework.bootspring-boot-starter-aspectj software.amazon.mskaws-msk-iam-auth com.datadoghqdd-trace-api org.springframework.bootspring-boot-starter-testtest org.springframework.bootspring-boot-starter-kafka-testtest org.testcontainerstestcontainers-kafkatest org.testcontainerstestcontainers-junit-jupitertest ``` --- ## KafkaConfiguration skeleton Must be **public** for SB4 cross-package imports. Discovered by `scanBasePackages` on Application class. ```java @Configuration @EnableRetry public class KafkaConfiguration { @Bean ProducerFactory dltProducerFactory(final KafkaProperties kafkaProperties) { final Map props = kafkaProperties.buildProducerProperties(); return new DefaultKafkaProducerFactory<>(props, new StringSerializer(), new JacksonJsonSerializer<>()); } @Bean KafkaTemplate dltTemplate(final ProducerFactory dltProducerFactory) { return new KafkaTemplate<>(dltProducerFactory); } } ``` `KafkaProperties` import in SB4: `org.springframework.boot.kafka.autoconfigure.KafkaProperties` (changed from `org.springframework.boot.autoconfigure.kafka.KafkaProperties` in SB3). Consumer/producer/DLT factories for each CDC entity are added by `/add-java-kafka-cdc-consumer`. --- ## application.yml kafka block ```yaml spring: kafka: admin: auto-create: true bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} consumer: group-id: client-id: -client-id key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JacksonJsonDeserializer properties: spring: json: trusted: packages: co.humand..kafka.model auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 1000 heartbeat-interval: 3000 max-poll-records: 10 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JacksonJsonSerializer listener: concurrency: 1 topics: placeholder-cdc: humand.cdc.placeholder placeholder-dlt: humand.cdc.placeholder-dlt ``` --- ## application-test.yml kafka overrides ```yaml spring: kafka: admin: auto-create: true consumer: group-id: -test client-id: -test-client auto-offset-reset: earliest enable-auto-commit: true topics: placeholder-cdc: humand.cdc.placeholder.test placeholder-dlt: humand.cdc.placeholder-dlt.test testcontainers: kafka: enabled: true ``` --- ## TestContainersConfiguration additions ```java @Bean @ServiceConnection(name = "kafka") @ConditionalOnProperty(name = "testcontainers.kafka.enabled", havingValue = "true") ConfluentKafkaContainer kafkaContainer() { return new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.8.0")); } @Bean @ConditionalOnProperty(name = "testcontainers.kafka.enabled", havingValue = "true") DynamicPropertyRegistrar kafkaPropertiesRegistrar(final ConfluentKafkaContainer kafkaContainer) { return registry -> registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); } ``` --- ## KafkaDLTTestListenerConfig skeleton ```java @TestConfiguration public class KafkaDLTTestListenerConfig { private final BlockingQueue> placeholderQueue = new LinkedBlockingQueue<>(); @KafkaListener(topics = "${spring.kafka.topics.placeholder-dlt}") public void placeholderDLTListener(final ConsumerRecord record) { placeholderQueue.add(record); } @Bean BlockingQueue> placeholderDLTMessagesQueue() { return placeholderQueue; } } ``` Additional DLT listeners are added per-entity by `/add-java-kafka-cdc-consumer`. --- ## Worker toggle (Terraform) In the project module's `type_environments` local: ```hcl type_environments = { api = [{ name = "HUMAND__KAFKA_CDC_LISTENER_ENABLED", value = "false" }] worker = [{ name = "HUMAND__KAFKA_CDC_LISTENER_ENABLED", value = "true" }] } ``` --- ## Next step After scaffolding, run **`/add-java-kafka-cdc-consumer`** to add your first CDC consumer. It will analyze reference repos, scan your Liquibase migrations, and generate all required artifacts (event models, listeners, services, DLT handling, integration tests, Terraform SSM/IAM).