Skip to content

Hub fanout: drop-counter is dead, disconnect path untested, buffer size unconfigurable #94

@v0lkan

Description

@v0lkan

Summary

The fanout broadcaster at internal/hub/fanout.go partly addressed the "silent entry loss to slow listeners" concern from TASKS.md (line 393, since deleted) — the original "drop and lose" pattern is now "disconnect and prevent loss" (lines 62-64). But three sub-items from the original ask remain open, and the partial fix introduced a silent metric that nothing surfaces.

What's done

broadcast() no longer drops entries to a full channel. It disconnects the slow listener:

// internal/hub/fanout.go:54-68
func (f *fanOut) broadcast(entries []Entry) {
    f.mu.Lock()
    defer f.mu.Unlock()
    for ch := range f.subs {
        select {
        case ch <- entries:
        default:
            // Slow listener: disconnect to prevent loss.
            delete(f.subs, ch)
            close(ch)
            f.dropped++
        }
    }
}

What's open

1. The dropped counter is dead

f.dropped (declared at types.go:149-153) increments at fanout.go:65 and is never read anywhere else in the codebase. grep -r dropped internal/hub | grep -v _test.go returns only the declaration and the increment site. Operators have no way to know slow listeners are being kicked.

Fix options (probably more than one of these):

  • Log on disconnect: emit a structured warning via internal/log/event or similar at the moment the disconnect happens. Includes the dropped count so log aggregators can rate it.
  • Expose via Status RPC: add DroppedListeners uint64 to StatusResponse so ctx hub status can report cumulative drops.
  • Metrics endpoint: if/when the hub gets a /metrics Prometheus endpoint, this is a natural counter to export.

2. The disconnect path has no test coverage

internal/hub/fanout_test.go has three tests:

  • TestFanOut_SubscribeAndBroadcast — happy path
  • TestFanOut_Unsubscribe — explicit unsubscribe
  • TestFanOut_BroadcastToNone — no listeners

None exercises the slow-listener path. A future refactor could silently delete the delete(f.subs, ch); close(ch); f.dropped++ block and all three would still pass.

Test to add (regression-pin the contract):

func TestFanOut_DisconnectsSlowListener(t *testing.T) {
    f := newFanOut()
    slow := f.subscribe()
    // Don't read from `slow`. Broadcast until the buffer overflows
    // and the listener is disconnected.
    for i := 0; i < fanOutBuffer+1; i++ {
        f.broadcast([]Entry{{ID: fmt.Sprintf("e%d", i)}})
    }
    // The channel should be closed.
    select {
    case _, ok := <-slow:
        if ok {
            // Drain until close; the next receive should observe the close.
            for range slow {
            }
        }
    case <-time.After(time.Second):
        t.Fatal("disconnected channel never closed")
    }
    // The dropped counter incremented.
    if got := atomic.LoadUint64(&f.dropped); got < 1 {
        t.Errorf("dropped = %d, want >= 1", got)
    }
    // The slow listener is gone from f.subs.
    if f.count() != 0 {
        t.Errorf("count = %d, want 0 after disconnect", f.count())
    }
}

(The atomic.LoadUint64 reading on f.dropped is the right primitive once #1 lands — the increment site should also become atomic to avoid a torn read under concurrent broadcasts.)

3. fanOutBuffer = 64 is hardcoded

The original task observed "Buffer of 64 is too small for busy hubs." Without (1) we can't actually measure this — operators have no signal to even know they're hitting the limit. Once the drop-counter is observable, a follow-up can either:

  • Expose buffer size as an RC field (hub.fanout_buffer) with a sensible default and the metric to justify changes.
  • Leave it constant and use the drop-counter to inform a future static bump (e.g. 256, 1024) based on real workload data.

Either is fine. Don't tune the constant without the observability first — would be tuning blind.

Scope for a "good first issue" pick

Items #1 (log on disconnect) and #2 (regression test) are small, well-scoped, and have clear acceptance criteria. A newcomer can land both in one PR without needing deep hub knowledge:

  • Read internal/log/event/event.go (the structured log surface) and adapt one of its existing call sites — there's prior art for warn-level structured logs.
  • Add the test next to the existing three in fanout_test.go.

Item #3 (configurable buffer) is more involved (touches internal/rc/types.go, internal/assets/schema/ctxrc.schema.json, and the embed_test.go symmetry mirror) and is better deferred until #1 produces real signal.

Acceptance

  • f.dropped becomes observable (log line on increment OR Status RPC field OR both).
  • TestFanOut_DisconnectsSlowListener (or equivalent) added to fanout_test.go.
  • f.dropped reads use atomic.LoadUint64 / increments use atomic.AddUint64 if the counter is read from a different goroutine than the broadcaster (e.g. Status RPC handler).

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinggood first issueGood for newcomers

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions