Skip to content

fix: add circuit breaker to table-diff to prevent OOM on node failure#111

Merged
mason-sharp merged 1 commit intomainfrom
fix/ACE-184/oom-circuit-breaker
Apr 17, 2026
Merged

fix: add circuit breaker to table-diff to prevent OOM on node failure#111
mason-sharp merged 1 commit intomainfrom
fix/ACE-184/oom-circuit-breaker

Conversation

@rasifr
Copy link
Copy Markdown
Member

@rasifr rasifr commented Apr 16, 2026

Summary

When a node starts timing out during a table diff, workers had no mechanism to stop — they continued iterating through every remaining sub-range (each waiting up to 60 s on a context deadline), accumulating error objects until OOM.

  • Add shouldStop() combining the existing row-limit check with a new error-recorded flag, replacing all shouldStopDueToLimit() calls in the recursive diff path so all workers bail out once any node error is set.
  • Extend the circuit breaker to the initial hash phase: worker goroutines drain the task queue without doing work once shouldStop() is true.
  • Change continuereturn on hash errors in the sub-range loop so a goroutine stops immediately rather than grinding through remaining sub-ranges.
  • Back hasError() with an atomic.Bool to avoid mutex acquisition on every sub-range iteration in the common no-error path.

Fixes ACE-184.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 16, 2026

📝 Walkthrough

Walkthrough

Modified table_diff.go to add an atomic errorRecorded flag to TableDiffTask, introduce hasError() and shouldStop() methods, refactor control flow to use shouldStop() in place of shouldStopDueToLimit(), change sub-range hash error handling to return early, and reset error state on ExecuteTask invocation.

Changes

Cohort / File(s) Summary
Error State & Circuit-Breaker Logic
internal/consistency/diff/table_diff.go
Added atomic errorRecorded flag, hasError() method to check flag status, and shouldStop() method that returns true when either the diff-row limit is reached or an error is recorded. State is reset at the start of ExecuteTask.
Control Flow Refactoring
internal/consistency/diff/table_diff.go
Updated all diff execution paths (hash worker loop, mismatch-task processing, recursiveDiff early-exit conditions, per-row processing, sub-range recursion gating, and mismatch recursion gating) to use shouldStop() instead of shouldStopDueToLimit().
Recursion Error Handling
internal/consistency/diff/table_diff.go
Modified recursiveDiff to terminate the current recursion path (return) instead of continuing to the next sub-range (continue) when a sub-range hash operation fails for either node.

Poem

🐰 Through warren paths of diff so deep,
Error flags their vigil keep,
shouldStop() hops from branch to bough,
Recursion halts when errors bow,
Early returns save the way,
Cleaner logic wins the day!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title directly describes the main change: adding a circuit breaker mechanism to table-diff to prevent out-of-memory errors when nodes fail.
Description check ✅ Passed The description is related to the changeset, explaining the problem, solution approach, and specific implementation details of the circuit breaker mechanism.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/ACE-184/oom-circuit-breaker

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codacy-production
Copy link
Copy Markdown

codacy-production bot commented Apr 16, 2026

Up to standards ✅

🟢 Issues 0 issues

Results:
0 new issues

View in Codacy

🟢 Metrics 0 duplication

Metric Results
Duplication 0

View in Codacy

TIP This summary will be updated as you push new changes. Give us feedback

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/consistency/mtree/merkle.go (1)

708-743: Consider adding direct tests for the fetch-row SQL builders.

This PR updates the schema/table contract for buildFetchRowsSQLSimple and buildFetchRowsSQLComposite too, but the visible test coverage only locks in buildRowHashQuery. A small unit test around qualified-table generation and placeholder ordering here would make this harder to regress.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/mtree/merkle.go` around lines 708 - 743, Add unit tests
for buildFetchRowsSQLSimple and buildFetchRowsSQLComposite that assert the
generated SQL string includes the sanitized qualified table name, the selectCols
prefix (commit_ts and node_origin), the ORDER BY token with the provided orderBy
value, and that placeholder numbering and args ordering match the input keys;
specifically test: (1) buildFetchRowsSQLSimple with one and multiple scalar keys
to verify placeholders $1..$N and args ordering, and (2)
buildFetchRowsSQLComposite with multi-column PKs and multiple key tuples to
verify tuple column order, placeholder sequence ($1..$M across tuples) and
corresponding args flattening. Ensure tests check both the returned query string
and the args slice to guard against regressions in identifier qualification and
placeholder mapping.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 113-115: Reset the circuit-breaker state at the start of
ExecuteTask: clear firstError (guarded by firstErrorMu) and unset errorRecorded
(atomic.Bool) in the same place you reset totalDiffRows and diffLimitTriggered
so a reused TableDiffTask won't short-circuit subsequent runs; ensure
ExecuteTask initializes firstError under firstErrorMu and calls
errorRecorded.Store(false) before worker paths that call shouldStop() execute.

---

Nitpick comments:
In `@internal/consistency/mtree/merkle.go`:
- Around line 708-743: Add unit tests for buildFetchRowsSQLSimple and
buildFetchRowsSQLComposite that assert the generated SQL string includes the
sanitized qualified table name, the selectCols prefix (commit_ts and
node_origin), the ORDER BY token with the provided orderBy value, and that
placeholder numbering and args ordering match the input keys; specifically test:
(1) buildFetchRowsSQLSimple with one and multiple scalar keys to verify
placeholders $1..$N and args ordering, and (2) buildFetchRowsSQLComposite with
multi-column PKs and multiple key tuples to verify tuple column order,
placeholder sequence ($1..$M across tuples) and corresponding args flattening.
Ensure tests check both the returned query string and the args slice to guard
against regressions in identifier qualification and placeholder mapping.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 558a3e26-5de7-4a29-a71f-095acfadf764

📥 Commits

Reviewing files that changed from the base of the PR and between f087dbb and 5b2c9d6.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (12)
  • .github/workflows/test.yml
  • Dockerfile
  • db/queries/queries.go
  • go.mod
  • internal/consistency/diff/table_diff.go
  • internal/consistency/diff/table_rerun.go
  • internal/consistency/mtree/merkle.go
  • internal/consistency/mtree/merkle_test.go
  • internal/consistency/repair/stale_repair.go
  • internal/consistency/repair/table_repair.go
  • internal/infra/cdc/listen.go
  • internal/infra/db/auth.go

Comment thread internal/consistency/diff/table_diff.go
@rasifr rasifr force-pushed the fix/ACE-184/oom-circuit-breaker branch from 5b2c9d6 to 05b4099 Compare April 17, 2026 12:18
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/consistency/diff/table_diff.go (1)

1846-1889: Optional: consider per-query timeouts for split-path queries to fully close the OOM loop.

The initial hash phase uses context.WithTimeout(t.Ctx, 60*time.Second) (L1518) and the sub-range hashes do the same (L2098), but countQuery (L1848) and medianQuery (L1878, L1885) in generateSubRanges still run on raw t.Ctx. If a node is degraded but not outright closing connections, these can block longer than intended before any error is recorded to trip the circuit breaker. In practice the new shouldStop() gate at L2050 prevents new entries into this path once any error is recorded, so this PR doesn't regress anything — just flagging as a follow-up for full symmetry with the hash phase.

Possible follow-up
-	err := pool.QueryRow(t.Ctx, countQuery, args...).Scan(&count) // nosemgrep
+	countCtx, cancelCount := context.WithTimeout(t.Ctx, 60*time.Second)
+	err := pool.QueryRow(countCtx, countQuery, args...).Scan(&count) // nosemgrep
+	cancelCount()

Same treatment for the two medianQuery scans.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/diff/table_diff.go` around lines 1846 - 1889,
generateSubRanges uses pool.QueryRow with the parent context for countQuery and
medianQuery (and the two medianQuery scans); wrap each of those calls in a short
per-query context timeout (e.g. ctx, cancel := context.WithTimeout(t.Ctx,
60*time.Second); defer cancel()) and pass that ctx into pool.QueryRow instead of
t.Ctx. Update the countQuery QueryRow call and both medianQuery QueryRow.Scan
branches (single-PK and multi-PK scan) to use the new timeout context so
long-running or blocked queries return promptly. Ensure you import context if
not already present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1846-1889: generateSubRanges uses pool.QueryRow with the parent
context for countQuery and medianQuery (and the two medianQuery scans); wrap
each of those calls in a short per-query context timeout (e.g. ctx, cancel :=
context.WithTimeout(t.Ctx, 60*time.Second); defer cancel()) and pass that ctx
into pool.QueryRow instead of t.Ctx. Update the countQuery QueryRow call and
both medianQuery QueryRow.Scan branches (single-PK and multi-PK scan) to use the
new timeout context so long-running or blocked queries return promptly. Ensure
you import context if not already present.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 93c07043-2d06-4691-b951-598b2d9f7b4e

📥 Commits

Reviewing files that changed from the base of the PR and between 5b2c9d6 and 05b4099.

📒 Files selected for processing (1)
  • internal/consistency/diff/table_diff.go

When a node starts timing out during recursive diff, workers had no
mechanism to stop — they continued iterating through every remaining
sub-range, each waiting up to 60 s for a context deadline, accumulating
error objects and log output until OOM.

- Add shouldStop() combining the existing row-limit check with a new
  error-recorded check, so all workers bail out once any node error is
  recorded.
- Replace all shouldStopDueToLimit() calls in the recursive diff path
  (recursiveDiff + mismatch dispatch loop) with shouldStop().
- Extend the circuit breaker to the initial hash phase: worker goroutines
  now drain hashTaskQueue without doing work once shouldStop() is true.
- Change continue → return on hash errors inside the sub-range loop so
  a goroutine stops immediately rather than grinding through remaining
  sub-ranges.
- Back hasError() with an atomic.Bool (errorRecorded) set in recordError,
  eliminating mutex acquisition on every sub-range iteration in the
  common no-error path.
- Reset errorRecorded and firstError at the start of ExecuteTask so a
  reused TableDiffTask does not short-circuit subsequent runs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@rasifr rasifr force-pushed the fix/ACE-184/oom-circuit-breaker branch from 05b4099 to 7b0e63a Compare April 17, 2026 14:50
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/consistency/diff/table_diff.go (1)

1514-1518: Optional: return instead of draining the remaining hash queue.

When shouldStop() flips to true, each worker still iterates every remaining task just to call bar.Increment() and continue. Because hashTaskQueue is buffered to totalHashTasks (line 1508), the dispatcher at lines 1541–1546 will never block even if all workers exit, and initialHashWg.Wait() still completes after close(hashTaskQueue). Swapping continue for return lets workers exit immediately on trip, which is marginally tidier for very wide range sets. The progress bar being left partially filled is inconsequential since mpb.BarRemoveOnComplete() is set on this bar.

Not a correctness issue — feel free to defer.

Suggested change
 		for task := range hashTaskQueue {
 			if t.shouldStop() {
-				bar.Increment()
-				continue
+				return
 			}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/diff/table_diff.go` around lines 1514 - 1518, The worker
loop reading from hashTaskQueue currently calls bar.Increment() and continue
when t.shouldStop() is true; change that behavior to call bar.Increment() and
then return so the goroutine exits immediately (locate the loop that ranges over
hashTaskQueue and the t.shouldStop() check in table_diff.go), ensuring workers
stop without draining the remaining buffered tasks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1514-1518: The worker loop reading from hashTaskQueue currently
calls bar.Increment() and continue when t.shouldStop() is true; change that
behavior to call bar.Increment() and then return so the goroutine exits
immediately (locate the loop that ranges over hashTaskQueue and the
t.shouldStop() check in table_diff.go), ensuring workers stop without draining
the remaining buffered tasks.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: abb111de-463e-42dc-8ddd-ae114727745b

📥 Commits

Reviewing files that changed from the base of the PR and between 05b4099 and 7b0e63a.

📒 Files selected for processing (1)
  • internal/consistency/diff/table_diff.go

@mason-sharp mason-sharp merged commit b6d5a30 into main Apr 17, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants