Conversation
Replace the previous recursive reprocessing on database duplicate-key errors with a safer flow: log the duplicate, invalidate the event cache, wait briefly, fetch the event inserted by the competing worker and treat it as a repetition. If the event cannot be read back, raise a DatabaseReadWriteError. This avoids recursive calls to handle(), ensures fresh DB reads (cache coherence), and adds logging for diagnostics; repetition processing is resumed using the fetched event.
There was a problem hiding this comment.
Pull request overview
This PR updates GrouperWorker.handle() to avoid recursive re-processing on MongoDB duplicate-key inserts by invalidating the event cache, reading back the event created by a competing worker, and proceeding as repetition handling.
Changes:
- Replaces recursive
handle()retry on duplicate-key with cache invalidation + read-back flow. - Adds diagnostic logging around duplicate-key detection and read-back.
- Moves repetition-processing logic into a separate post-insert conditional block.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| this.cache.del(eventCacheKey); | ||
|
|
||
| /** |
There was a problem hiding this comment.
await new Promise(resolve => setTimeout(resolve, 10)) introduces a magic number in production code. This repo commonly either defines numeric constants or explicitly disables @typescript-eslint/no-magic-numbers for such cases (e.g. lib/memoize/index.ts:43 and workers/release/src/index.ts:228-229). Please extract the delay into a named constant (or add a targeted eslint-disable) so linting and future tuning are straightforward.
| } catch (e) { | ||
| /** | ||
| * If we caught Database duplication error, then another worker thread has already saved it to the database | ||
| * and we need to process this event as repetition | ||
| * Clear the cache and fetch the event that was just inserted, then process it as a repetition | ||
| */ | ||
| if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { | ||
| await this.handle(task); | ||
| this.logger.info(`[handle] Duplicate key detected for groupHash=${uniqueEventHash}, fetching created event as repetition`); | ||
|
|
||
| const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); | ||
|
|
||
| return; | ||
| /** | ||
| * Invalidate cache to force fresh fetch from database | ||
| */ | ||
| this.cache.del(eventCacheKey); | ||
|
|
||
| /** | ||
| * Fetch the event that was just inserted by the competing worker | ||
| * Add small delay to ensure the event is persisted |
There was a problem hiding this comment.
The new duplicate-key handling path (invalidate cache → delay → read-back → continue as repetition) doesn’t appear to have explicit test coverage. There are extensive handle() tests in workers/grouper/tests/index.test.ts, but none that asserts the duplicate-key branch increments totalCount / creates a repetition / marks notifier payload as isNew: false when two workers race on the first insert. Please add or extend a test to cover this branch deterministically (e.g., by forcing saveEvent to throw code 11000 once, or by asserting outcomes in the existing simultaneous-handles test).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * Now continue processing as if this was not the first occurrence | ||
| * This avoids recursion and properly handles the event as a repetition | ||
| */ |
There was a problem hiding this comment.
The existing test coverage for simultaneous events (line 292-301 in tests/index.test.ts) may not adequately verify that duplicate key scenarios are processed as repetitions. Specifically, the test should verify that:
totalCountis incremented for each duplicate event- Repetition records are created in the database
- Daily event counts are correctly updated
Without these assertions, the critical bug at line 271 (incorrect condition) would not be caught by tests.
| * Now continue processing as if this was not the first occurrence | |
| * This avoids recursion and properly handles the event as a repetition | |
| */ | |
| * Mark this occurrence as a repetition to ensure repetition-processing logic runs | |
| * after handling the duplicate key error. | |
| */ | |
| isFirstOccurrence = false; |
| * Fetch the event that was just inserted by the competing worker | ||
| * Add small delay to ensure the event is persisted | ||
| */ | ||
| await new Promise(resolve => setTimeout(resolve, DUPLICATE_KEY_RETRY_DELAY_MS)); | ||
|
|
||
| existedEvent = await this.getEvent(task.projectId, uniqueEventHash); | ||
|
|
There was a problem hiding this comment.
A 10ms delay may not be sufficient for all scenarios, particularly under high load or with slow disk I/O. MongoDB's write concern and replication lag could mean the event isn't immediately visible to other queries. Consider:
- Using a configurable delay value
- Implementing retry logic with exponential backoff
- Or better yet, using MongoDB's read concern "majority" to ensure we read the committed write
The current implementation may still result in the error "Event not found after duplicate key error" in edge cases.
| * Fetch the event that was just inserted by the competing worker | |
| * Add small delay to ensure the event is persisted | |
| */ | |
| await new Promise(resolve => setTimeout(resolve, DUPLICATE_KEY_RETRY_DELAY_MS)); | |
| existedEvent = await this.getEvent(task.projectId, uniqueEventHash); | |
| * Fetch the event that was just inserted by the competing worker. | |
| * Use bounded retry with exponential backoff to tolerate replication / visibility lag. | |
| */ | |
| const maxDuplicateKeyRetries = 5; | |
| let duplicateKeyRetryDelayMs = DUPLICATE_KEY_RETRY_DELAY_MS; | |
| for (let attempt = 0; attempt < maxDuplicateKeyRetries && !existedEvent; attempt++) { | |
| if (attempt > 0) { | |
| await new Promise(resolve => setTimeout(resolve, duplicateKeyRetryDelayMs)); | |
| duplicateKeyRetryDelayMs *= 2; | |
| } | |
| existedEvent = await this.getEvent(task.projectId, uniqueEventHash); | |
| } |
neSpecc
left a comment
There was a problem hiding this comment.
I thought about using Redis for storing group hash that already has been accepted.
All Grouper instances can access this key to check for isFirstOccurrence
The problem is how to support old events that was accepted before Redis solution added.
Replace the previous recursive reprocessing on database duplicate-key errors with a safer flow: log the duplicate, invalidate the event cache, wait briefly, fetch the event inserted by the competing worker and treat it as a repetition. If the event cannot be read back, raise a DatabaseReadWriteError. This avoids recursive calls to handle(), ensures fresh DB reads (cache coherence), and adds logging for diagnostics; repetition processing is resumed using the fetched event.