--- name: add-java-kafka-cdc-consumer description: Add a Kafka CDC consumer to a Java/Spring Boot service. Analyzes reference repos, scans migrations, generates event models, listener, service, repository, KafkaConfiguration, Terraform, and integration tests. --- # /add-java-kafka-cdc-consumer — Add Kafka CDC Consumer Interactive workflow to add a new Debezium CDC consumer to a Java/Spring Boot service. Discovers reference patterns from sibling repos, scans target migrations for table definitions, and generates all required code, configuration, and infrastructure. ## Known Backend Services The following Java/Spring Boot services are candidates for CDC consumers: | Repo | Description | GitHub | |------|-------------|--------| | humand-cerberus | Authorization: RBAC, permissions, user segmentation (REST + gRPC) | HumandDev/humand-cerberus | | humand-janus | Auth data replication: jOOQ, Kafka CDC, Redis caching | HumandDev/humand-janus | | humand-argus | Analytics/auditing backend service | HumandDev/humand-argus | | humand-audiences-service | Audiences API: audience calculation and segmentation | HumandDev/humand-audiences-service | | humand-marty | Communications: chat, messaging | HumandDev/humand-marty | Libraries (not services, excluded from targets): `humand-backend-java-libraries`. New services not in this list can also be used — the skill only needs a local path to any Maven project. ## Workflow Overview ``` Phase 0: Discovery → Ask user for local repo paths Phase 1: Target Select → User picks which repo gets the new consumer Phase 2: Analysis → Parallel agents scan target + reference repos for CDC patterns Phase 3: Table Select → Scan target migrations, user picks the table Phase 4: User Input → Confirm topic name, entity name, consumer group Phase 5: Code Gen → Generate all artifacts using matched reference patterns Phase 6: Verification → spotless:apply + mvn verify ``` --- ## Phase 0: Discovery Ask the user which backend repos are available locally and where they live. 1. Present the Known Backend Services table and ask: **"Which of these repos do you have locally? Provide the local path for each (e.g. `../humand-cerberus` or `/Users/me/repos/humand-cerberus`)."** 2. The user may also provide repos not in the table — any Maven project with Kafka CDC patterns works. 3. Verify each provided path exists and contains a `pom.xml`. 4. Keep only repos with valid local paths. At least one repo (the target) is required. ## Phase 1: Target Selection Present the repos discovered in Phase 0 to the user via `AskQuestion`: **"Which repo should receive the new CDC consumer?"** List only repos with verified local paths from Phase 0. ## Phase 2: Analysis (Parallel Agents) Launch `repo-cdc-analyzer` agents on the **target repo** and all **other maven repos** in parallel (max 4 concurrent). Each agent receives `repoName` and `localPath`. Use `subagent_type="explore"` with model `fast`. Pass a prompt that includes: - The full content of `agents/repo-cdc-analyzer.md` as instructions - `repoName` and `localPath` for that repo When all agents return, aggregate results: ### Multi-Dimensional Pattern Matching Do NOT pick a single "best reference" repo. Match per dimension: | Dimension | What to take | Match criteria | |-----------|-------------|----------------| | **Structure** | Module layout, `@Import` patterns, POM config | Same structure type (monorepo vs single-module) as target | | **ORM** | Repository pattern, upsert strategy, persistence code | Same ORM (jOOQ vs Hibernate) as target | | **CDC/Kafka** | KafkaConfiguration, event model, listener, DLT patterns | Any repo with CDC consumers (largely ORM/structure-agnostic) | | **Infrastructure** | Terraform SSM, IAM, env var patterns | Any repo with Kafka infra (largely structure-agnostic) | Example: target = monorepo + jOOQ → Take monorepo structure from Cerberus (monorepo + Hibernate), jOOQ persistence from Argus (single-module + jOOQ), and CDC patterns from whichever has the most consumers. Store the matched references for use in Phase 5. ## Phase 3: Table Selection Launch `migration-table-scanner` agent on the **target repo only**. Use `subagent_type="explore"` with model `fast`. Pass a prompt that includes: - The full content of `agents/migration-table-scanner.md` as instructions - `repoName` and `localPath` for the target repo When the agent returns, present the table list to the user via `AskQuestion`: ``` Which table should the CDC consumer target? 1. instances (15 columns, PK: uuid, has deleted_at) 2. users (22 columns, PK: uuid, has deleted_at) 3. departments (8 columns, PK: bigint, no deleted_at) ... ``` ## Phase 4: User Input Ask the user to confirm or adjust the derived configuration via `AskQuestion`: | Field | Default / Derivation | Required | |-------|---------------------|----------| | **Topic name** | `monolith.cdc.{TableName}` — source is the monolith, table name in PascalCase (e.g. `monolith.cdc.Users`, `monolith.cdc.Instances`) | Yes | | **Entity name** | Singular of table name (e.g. `instances` → `Instance`) | Confirm | | **Consumer group** | `{repo-short-name}-{table_name}` (e.g. `janus-instances`) | Confirm | | **Concurrency** | `1` | No (advanced) | Auto-derived (do NOT ask the user): - **Soft delete handling**: auto-detect from `deleted_at` column in table definition - **DLT topic name**: `{repo-short-name}_{topic_name}_dlt` with underscores (e.g. `janus_monolith.cdc.Instances_dlt`) — consistent with Terraform pattern `${msk_group_id}_${msk_topic_value}_dlt` used in all reference repos - **Java class names**: from entity name: - `{Entity}CdcEvent` — CDC event wrapper - `{Entity}Data` — data payload (before/after) - `{Entity}CdcEventService` — Kafka listener - `{Entity}Service` — core business logic - `{Entity}Repository` (+ `{Entity}RepositoryImpl` for jOOQ) ## Phase 5: Code Generation Using the matched reference patterns from Phase 2 + user input from Phase 4, generate all artifacts. Read the actual reference files from the matched repos to use as templates. ### First Consumer vs Additional Consumer Check the target repo's Phase 2 analysis: **If NO existing Kafka setup (first consumer):** Generate everything from scratch — base classes, Kafka module (if monorepo), KafkaConfiguration, `@Import`, plus per-consumer files. **If existing Kafka setup (additional consumer):** Extend existing — add new factory beans to KafkaConfiguration, add per-consumer files only. ### 5.0 Nullability Convention Before generating code, check which nullability framework the target repo uses. Look at existing `package-info.java` files in the target (from the Target Kit, Phase 2 section 8): **JSpecify** (`@NullMarked` from `org.jspecify.annotations`): - A single `@NullMarked` on the module's root package propagates to all subpackages automatically. - **Do NOT create `package-info.java` in subpackages** — they inherit from the root. - Only create `package-info.java` at the root package of each new module (e.g. `co.humand.janus.kafka`). - Use `@Nullable` from `org.jspecify.annotations` for nullable fields/params/returns. **Spring** (`@NonNullApi` + `@NonNullFields` from `org.springframework.lang`): - Each package needs its own `package-info.java` with both annotations. - Create `package-info.java` in every new package (e.g. `model/`, `service/`, `config/`). - Use `@Nullable` from `org.springframework.lang` for nullable fields/params/returns. Follow whichever framework the target already uses. Do NOT mix them. ### 5.1 Base Classes (only if first consumer) Generate in the kafka module/package: #### `BaseCdcEvent` Generic CDC event wrapper with fields: `before` (T, nullable), `after` (T, nullable), `op` (String), `ts_ms` (Long). Do NOT add `@AtLeastOnePresent` or any validation annotations — see Known Pitfalls #1. Read the actual class from the CDC reference repo to match the exact pattern, but skip the validation annotation. #### `CdcOperationType` Enum mapping Debezium operation codes to operations: - `c` → `CREATE` - `r` → `READ` - `u` → `UPDATE` - `d` → `DELETE` With a `fromCode(String)` method. #### `BaseCrudCdcEventListener` Abstract class with `consumeEvent(BaseCdcEvent)` that dispatches to `handleCreate`, `handleRead`, `handleUpdate`, `handleDelete` based on `CdcOperationType`. The `consumeEvent` method must include two guard clauses: 1. **Null before/after check**: if both `before` and `after` are null, throw an `IllegalArgumentException` (caught by error handler → DLT). 2. **Unknown/missing op check**: if `CdcOperationType.fromCode(event.getOp())` returns empty (op is null, empty, or unrecognized), **throw an exception** — do NOT just log a warning and continue. A swallowed unknown op means the event is silently dropped and permanently lost (Kafka offset commits, event never reaches DLT). Throwing lets the `DefaultErrorHandler` route it to DLT for investigation. Do NOT use `@Valid` on the method parameter — see Known Pitfalls #1. Read the reference to match exact method signatures and dispatch logic. #### Kafka Module POM (if monorepo and first consumer) Create `{prefix}-kafka/pom.xml` with dependencies: spring-kafka, spring-retry, jackson, aws-msk-iam-auth, spring-boot-starter-validation, spring-boot-starter-aop. Match the reference repo's POM structure. #### Module Configuration Class with `@ComponentScan` (monorepo only) Each external module (kafka, core, model) that contains Spring-managed beans (`@Service`, `@Repository`, `@Component`) needs a **configuration bridge class** so that `@Import` on the Application class triggers component scanning for that module's entire package tree. **Critical: the bridge class MUST live in the module's ROOT package** — not in a subpackage like `config/`. `@ComponentScan(basePackageClasses = X.class)` scans the package where `X` lives and all its subpackages. If the class is in `co.humand.janus.kafka.config`, it only scans `config/**` and misses `service/`, `model/`, `validation/` etc. If it's in `co.humand.janus.kafka` (the root), it scans everything in the module. **Two-class pattern** (Cerberus reference): 1. **Bridge class in module root package** (`co.humand.{project}.kafka`): ```java package co.humand.{project}.kafka; @Configuration @ComponentScan(basePackageClasses = {Prefix}KafkaConfiguration.class) public class {Prefix}KafkaConfiguration {} ``` 2. **Actual KafkaConfiguration with beans** in `config/` subpackage (`co.humand.{project}.kafka.config`): ```java package co.humand.{project}.kafka.config; @Configuration public class KafkaConfiguration { // consumer factories, container factories, producer factory, DLT config... } ``` The bridge class (`{Prefix}KafkaConfiguration`) is what the webapp imports via `@Import`. Its `@ComponentScan` covers the entire `co.humand.{project}.kafka.**` tree, which discovers `KafkaConfiguration` in `config/`, listener services in `service/`, etc. **Do NOT** put the bridge class in a subpackage — it will silently miss beans in sibling packages. **Do the same for core and model modules** if they don't already have a configuration class with `@ComponentScan` at the module's root package. Check what the target repo already has before adding. **If the target project already uses `scanBasePackages` on `@SpringBootApplication`** (e.g. projects scaffolded by the kickoff skill), keep that pattern — the kafka module's beans will be discovered automatically and no `@Import` is needed. **If it uses explicit `@Import` instead**, follow the `@Import` + per-module `@ComponentScan` pattern described above. Always match what the project already does. #### `@Import` on Application class (monorepo only) Add the kafka module's Configuration class to the `@Import` annotation on the main `@SpringBootApplication` class. In monorepos, the kafka module is outside the webapp's component scan — without `@Import`, Spring won't discover it. In single-module projects (like Argus), component scanning discovers `KafkaConfiguration` automatically — no `@Import` needed. ### 5.2 KafkaConfiguration **If first consumer:** Create a new `KafkaConfiguration` class. Must be `public` (Spring Boot 4 cross-package requirement). Use `@Value("${spring.kafka.bootstrap-servers}")` for bootstrap servers (not `KafkaProperties`). **If additional consumer:** Read the existing `KafkaConfiguration` and add new beans for the new consumer. For each consumer, generate: - **ConsumerFactory bean**: `{entity}CdcConsumerFactory()` — returns `DefaultKafkaConsumerFactory` with deserializer config for the event type, consumer group from properties. - **ConcurrentKafkaListenerContainerFactory bean**: `{entity}CdcContainerFactory()` — uses the consumer factory, configures retry (via `DefaultErrorHandler` with `DeadLetterPublishingRecoverer`), backoff, and DLT publishing. - **KafkaTemplate/ProducerFactory bean** (shared, one per KafkaConfiguration): for DLT publishing. Only generate if not already present. Read the actual KafkaConfiguration from the CDC reference repo to match the exact bean patterns, error handler setup, and DLT configuration. ### 5.3 Per-Consumer Artifacts In monorepos, CDC data flows through two separate classes to keep module boundaries clean: ``` Kafka module Core module Model module {Entity}Data (CDC DTO) → toEntity() → {Entity} (domain model) → {Entity}Repository Jackson annotations No Jackson deps Lives in kafka/model/ Lives in model/{entity}/ ``` **Do NOT use a single class for both deserialization and persistence.** This creates a circular dependency: `janus-model` would need to know `{Entity}Data` to persist it, but `{Entity}Data` lives in `janus-kafka` (which already depends on `janus-model` through `core`). Even if you move the class to `model`, it drags Jackson dependencies (`jackson-databind`, `@JsonProperty`, `@JsonIgnoreProperties`) into the model module, contaminating the persistence layer. #### Event Models (kafka module: `model/` package) **`{Entity}CdcEvent`**: Extends `BaseCdcEvent<{Entity}Data>`. Minimal — just the type parameter binding. **`{Entity}Data`**: CDC deserialization DTO. POJO with fields matching the table columns from Phase 3. This class lives in the **kafka module only** — it is the Jackson-aware representation of the Debezium payload. **Important**: The source tables live in the monolith, which uses **camelCase column names** (e.g. `instanceId`, `firstName`, `deletedAt`). Debezium mirrors column names as-is, so the CDC JSON payload uses camelCase. Therefore, `{Entity}Data` fields must use camelCase to match — **no `@JsonProperty` needed** in most cases (Jackson's default camelCase mapping just works). Do not convert to snake_case. Use `@JsonProperty` only when the field name differs from the JSON key (e.g. boolean fields like `isAgent` where Java naming clashes — use `@JsonProperty("isAgent")` on a field named `agent` or keep `isAgent` and annotate explicitly to avoid Lombok getter ambiguity). Use `@JsonIgnoreProperties(ignoreUnknown = true)` on the class to tolerate schema evolution (new columns won't break deserialization). Field types: - `uuid` / `UUID` columns → `UUID` - `integer` / `int` → `int` or `Integer` (nullable → wrapper) - `bigint` → `long` or `Long` - `text` / `varchar` → `String` - `boolean` → `boolean` or `Boolean` - `timestamp` / `timestamptz` → `Instant` - `jsonb` / `json` → `String` (or `JsonNode` if reference uses it) Include `deletedAt` field if table has `deleted_at`. #### Domain Model (model module: `{entity}/` package) **`{Entity}`**: Clean domain model class. No Jackson annotations, no Jackson dependency. This is what the core service, repository, and tests work with. Fields mirror `{Entity}Data` but use the project's persistence conventions: - **jOOQ**: plain POJO/record. Field types match the jOOQ-generated table columns. - **Hibernate/JPA**: `@Entity`, `@Table`, `@Column` annotations. Field types match JPA column types. #### Listener (kafka module: `service/` package) **`{Entity}CdcEventService`**: Extends `BaseCrudCdcEventListener<{Entity}Data>`. Annotated with `@KafkaListener` targeting the entity's container factory. Injects `{Entity}Service` from core. **Includes a `toEntity({Entity}Data)` conversion method** that maps from the CDC DTO to the domain model. This is the bridge between the kafka module and the core/model modules — it keeps Jackson concerns in the kafka layer. Implement the four handler methods: - `handleCreate(event)` → call `service.upsert(toEntity(event.getAfter()))` - `handleRead(event)` → call `service.upsert(toEntity(event.getAfter()))` - `handleUpdate(event)` → always call `service.upsert(toEntity(event.getAfter()))`. The upsert writes all fields including `deletedAt` — if the source row was soft-deleted, `deletedAt` will have the original monolith timestamp and the upsert persists it as-is. No special soft-delete branching needed (see Known Pitfalls #5). - `handleDelete(event)` → call `service.deleteById(id from event.getBefore())` (hard delete — the source row was physically deleted) #### Core Service (core module: `{entity}/` package) **`{Entity}Service`**: Business logic for persisting CDC events. Methods: - `upsert({Entity} entity)` — insert or update all fields including `deletedAt` (takes the domain model, NOT the CDC DTO). The upsert handles both normal updates and soft-deletes naturally — no separate `softDeleteById` needed. - `deleteById(primaryKeyType id)` — hard delete (for Debezium `d` operations where the source row was physically removed) Annotated with `@Service`, `@RequiredArgsConstructor`. Uses `@Transactional` for writes. Injects `{Entity}Repository`. #### Repository (model module: `{entity}/` package) **Hibernate (JPA):** - `{Entity}Repository` extends the project's base repository or `JpaRepository<{Entity}, PkType>`. - Works with the `{Entity}` JPA entity class. **jOOQ:** - `{Entity}Repository` interface with `upsert` and `deleteById` methods — all taking the domain model `{Entity}`, not the CDC DTO. - `{Entity}RepositoryImpl` using `DSLContext` with jOOQ DSL: `dslContext.insertInto(TABLE).set(...).onConflict(TABLE.ID).doUpdate().set(...)` for upsert. Read the persistence reference repo to match the exact pattern. ### 5.4 Infrastructure & Configuration (delegated to kafka-infra-generator agent) Delegate all Terraform and application.yml work to the **kafka-infra-generator** agent (`agents/kafka-infra-generator.md`). This agent handles: application.yml, application-test.yml, Terraform SSM parameters, IAM policies, ECS environment variables, and `terraform fmt`. Launch the agent with `subagent_type="generalPurpose"`, model `fast`. Pass a prompt that includes: - The full content of `agents/kafka-infra-generator.md` as instructions - All required inputs: `targetRepoPath`, `referenceRepoPaths` (from Phase 2 matched repos that have Kafka infra), `entityName`, `tableName`, `topicName`, `dltTopicName`, `consumerGroup`, `repoShortName`, `projectTfModuleName`, `isFirstConsumer`, `concurrency` When the agent returns, capture: - **Environment variable names** — needed to understand which Spring properties are overridden by Terraform - **IAM policy status** — verify it was created (first consumer) or confirmed sufficient (additional) - **Warnings** — act on any issues reported **Do NOT generate Terraform or application.yml configuration in the main agent thread.** Always delegate to the kafka-infra-generator agent. #### Concurrency Rule The default Kafka listener concurrency is **always 1**. There must be exactly ONE `concurrency` entry under `spring.kafka.listener`, shared across all topics. Never add per-topic concurrency. Never set concurrency > 1 unless the user explicitly requested it for a specific consumer. ### 5.6 Tests #### DLT Test Listener (if first consumer) Add `KafkaDLTTestListenerConfig` in test-integration config — reads DLT messages into an in-memory queue for assertion. Read from reference. #### Integration Tests Create `Kafka{Entity}IntegrationTest.java` in `src/test-integration/java/.../kafka/` (or equivalent location from reference). Generate these test cases: | Test | Description | |------|-------------| | `shouldProcess{Entity}CreatedEvent` | op=c, after present → verify upserted in DB | | `shouldProcess{Entity}UpdatedEvent` | op=u, before+after present → verify updated in DB | | `shouldProcess{Entity}ReadEvent` | op=r, after present → verify upserted in DB | | `shouldProcess{Entity}DeletedEvent` | op=d, before present → verify deleted from DB | | `shouldSoftDelete{Entity}OnUpdateWithDeletedAt` | *(only if `deleted_at` exists)* op=u, after.deletedAt not null → verify soft-deleted | | `shouldRevertSoftDeleteOnUpdateWithNullDeletedAt` | *(only if `deleted_at` exists)* op=u, after.deletedAt null on previously soft-deleted record → verify reverted | | `shouldSendInvalidEventToDLT` | No before, no after → verify message sent to DLT | Read the integration test reference to match: - Base class (`BaseIntegrationTest` or similar) - How Kafka test messages are produced (`KafkaTemplate` or `KafkaProducer`) - How assertions wait for async processing (polling, `Awaitility`, `Thread.sleep`) - How DLT messages are verified ### 5.7 Package Info Follow the nullability convention detected in section 5.0: - **JSpecify projects** (`@NullMarked`): Only create `package-info.java` at the **root package of each new module** (e.g. `co.humand.janus.kafka`). Subpackages (`model/`, `service/`, `config/`) inherit automatically — do NOT create `package-info.java` in them. - **Spring projects** (`@NonNullApi` + `@NonNullFields`): Create `package-info.java` in **every new package**, including subpackages. ## Phase 6: Verification 1. Run `mvn spotless:apply` in the target repo to fix formatting. 2. Run `mvn verify` to compile and run all tests (unit + integration). 3. Report results — if tests fail, analyze the error and attempt to fix. ## Error Handling - If the user provides no valid repo paths: stop and inform the user that at least one local Maven repo is required. - If the target repo has no Liquibase migrations: stop — cannot determine table schema. - If the selected table has no primary key: warn the user and ask how to identify records. - If `mvn verify` fails: read the error output, attempt to fix common issues (imports, formatting, missing beans), and re-run. ## Notes - The skill is distributed via the `humand-backend` plugin. - Agent definitions at `agents/repo-cdc-analyzer.md`, `agents/migration-table-scanner.md`, and `agents/kafka-infra-generator.md`. - `repo-cdc-analyzer` and `migration-table-scanner` are **read-only** — they analyze and report. - `kafka-infra-generator` **writes files** — it generates Terraform and application.yml configuration directly in the target repo. - Java code generation (event models, listeners, services, repositories, tests) stays in the main agent thread for user visibility and error handling. - When generating Java code, always read the actual reference files from the matched repos — do not rely solely on the agent reports. The reports tell you *where* to look; the actual file content is the template. ## Known Pitfalls These are issues encountered in past runs. Avoid them when generating code. ### 1. Do NOT use `@Valid` on `@KafkaListener` payload or `@AtLeastOnePresent` on CDC events The `@AtLeastOnePresent` annotation and its `AtLeastOnePresentValidator` exist in some reference repos (Cerberus, Janus) on `BaseCdcEvent`, but they are effectively dead code — Spring Kafka does not invoke `@Valid` on `@Payload` unless the `MessageHandlerMethodFactory` has a `Validator` explicitly configured, which these projects don't do. The annotation is decorative. Additionally, the validator implementation has a known bug: it uses `getDeclaredField()` which fails on inherited fields, causing `NoSuchFieldException` at runtime if validation ever does get triggered. **Do NOT**: - Add `@AtLeastOnePresent` to generated `BaseCdcEvent` or entity CDC events - Add `@Valid` to `@Payload` parameters in `@KafkaListener` methods - Generate or copy `AtLeastOnePresentValidator` **Instead**: Handle missing `before`/`after` in the listener's `consumeEvent` method with a simple null check — if both are null, throw an `IllegalArgumentException`. The `DefaultErrorHandler` catches it and routes the event to DLT automatically. This is explicit, testable, and actually works. ### 2. `@ComponentScan` bridge class MUST be in module root package When creating the `{Prefix}KafkaConfiguration` bridge class with `@ComponentScan(basePackageClasses = ...)`, the class **must** live in the module's root package (e.g. `co.humand.janus.kafka`), not in a subpackage like `config/`. `@ComponentScan` scans the package of the referenced class and below. If the bridge class is in `co.humand.janus.kafka.config`, it only scans `config/**` and silently misses `service/`, `model/`, etc. — causing `NoSuchBeanDefinitionException` at startup for beans in those packages. This is a common mistake when copying the Cerberus pattern without checking that Cerberus places its bridge class (`CerberusKafkaConfiguration`) at the module root (`co.humand.authorization.cerberus.kafka`), separate from `KafkaConfiguration` which is in the `config/` subpackage. ### 3. Separate CDC DTO from domain model in monorepos In monorepos (kafka → core → model), do NOT use a single class for both Jackson deserialization and persistence. `{Entity}Data` (CDC DTO with `@JsonProperty`, `@JsonIgnoreProperties`) lives in the **kafka module** only. `{Entity}` (clean domain model) lives in the **model module**. The listener has a `toEntity()` method that converts between them. Using one class for both creates a circular dependency (`model` needs the class to persist, `kafka` needs it to deserialize, `kafka` depends on `model` through `core`) and drags Jackson into the model module. In single-module projects this separation is optional since there are no module boundaries. ### 4. Lombok boolean getters Lombok generates getters for `boolean` fields following JavaBeans convention. For a field named `boolean isAgent`, the getter is `isAgent()` — NOT `isIsAgent()`. Lombok strips the `is` prefix when the field already starts with `is`. When generating repository or service code that calls getters on `{Entity}Data` fields, use `data.isAgent()` (not `data.isIsAgent()`), `data.isBot()` (not `data.isIsBot()`), etc. This applies to any `boolean` field with an `is` prefix. ### 5. Do NOT use `softDeleteById` for CDC soft-deletes — just upsert When a source row is soft-deleted (its `deletedAt` goes from null to a timestamp), the CDC event's `after` payload contains ALL fields including the original `deletedAt` timestamp. Using a dedicated `softDeleteById(id)` method is wrong because: 1. It typically sets `deletedAt = OffsetDateTime.now()` instead of preserving the monolith's original timestamp, causing data divergence between source and replica. 2. It only updates the `deletedAt` column, silently dropping any other field changes in the same CDC event. The correct approach: always `upsert(toEntity(event.getAfter()))` for ALL update operations. The upsert writes every field including `deletedAt` — if the source soft-deleted the row, `deletedAt` has the original timestamp and gets persisted as-is. No special branching, no data loss, no timestamp divergence. ### 6. Unknown/missing `op` field must throw, not log-and-continue In `BaseCrudCdcEventListener.consumeEvent()`, if `CdcOperationType.fromCode(event.getOp())` returns empty (op is null, empty string, or unrecognized code), the handler **must throw an exception**. Do NOT just log a warning and continue processing. A swallowed unknown op means the Kafka offset is committed, the event is permanently lost, and it never reaches DLT. Throwing an exception lets the `DefaultErrorHandler` catch it and route the event to the Dead Letter Topic for investigation.