Notifications
Cinatra ships a Postgres-backed per-user notifications layer. Inserts trigger a Postgres pg_notify; a single process-level LISTEN connection fans events out to per-tab SSE handlers; the browser flyout subscribes via EventSource with a 30s poll backstop. Background jobs route their lifecycle events through a recipient policy that picks the right user (or admin fanout for system jobs).
What goes where
Section titled “What goes where”| Concern | Module |
|---|---|
| Schema (typed columns + 3 partial indexes + trigger fn) | src/lib/drizzle-store.ts — bundled in buildCreateStoreSchemaQueries(), applied by ensurePostgresSchema() |
| Server-only service (CRUD, fan-out, dedupe) | src/lib/notifications/service.ts |
| Recipient discriminated union + job→recipient policy | src/lib/notifications/recipient-policy.ts |
| Request-scope ActorContext helper | src/lib/notifications/request-actor.ts |
Process-level LISTEN singleton + EventEmitter fanout | src/lib/notifications/realtime.ts |
| Legacy 5-function compat surface (preserves pre-2026-05 callers) | src/lib/notifications.ts |
| SSE endpoint | src/app/api/notifications/stream/route.ts |
| API CRUD endpoint | src/app/api/notifications/route.ts |
| Full-page list view | src/app/notifications/page.tsx |
| Flyout subscription + poll backstop | src/components/app-shell.tsx |
Pure SSE dedupe-prepend helper (applySseNotification) | src/lib/notifications/flyout-state.ts — extracted in PR #174 so Phase 284 SC #5 can test the contract without mounting the component; the SC #1 flyout rebuild must reuse this helper |
Import-guard regression test pinning helper usage in app-shell.tsx | src/components/__tests__/app-shell-flyout-state-import.test.ts — PR #180 |
| BullMQ worker lifecycle hook | src/lib/background-jobs.ts (notifyJobLifecycle private helper) |
Recipient policy
Section titled “Recipient policy”type NotificationRecipient = | { kind: "user"; userId: string } | { kind: "team"; teamId: string } | { kind: "organization"; organizationId: string } | { kind: "project"; projectId: string } | { kind: "admins" };Resolution happens at write time: a team recipient becomes one row per team member; admins becomes one row per platform admin. There is no nullable user_id, no join table; every notification belongs to exactly one user.
The job→recipient map lives in recipient-policy.ts. User-initiated jobs route to the initiator (resolved from the job’s __actorContext ALS frame); system jobs are silent on success and route failures to { kind: "admins" }. Unknown job names log a warning and return null — register new job names in USER_INITIATED_JOBS or SYSTEM_JOBS when adding to BACKGROUND_JOB_NAMES.
Enqueue-time auto-attribution
Section titled “Enqueue-time auto-attribution”enqueueBackgroundJob() runs a cascade when options.actorContext is unset:
- Active
getActorContext()ALS frame — but only forHumanUserprincipals. resolveRequestActorContext()— better-auth session throughbuildActorContext().undefined(system context).
Set inheritActorContext: false to opt out. Current opt-out sites (every SYSTEM_JOB worker-internal / self-rescheduling enqueue):
src/instrumentation.node.ts—LITELLM_PRICING_SYNCweekly bootstrap +GRAPHITI_PROJECTION_REPAIRrepair-loop bootstrap.src/lib/background-jobs.tsworker dispatcher —LITELLM_PRICING_SYNC+GRAPHITI_PROJECTION_REPAIRself-rescheduling re-enqueues.src/lib/registry-poll-job.ts—REGISTRY_POLLself-reschedule via the sharedreschedule()helper (covers every 200/302/4xx/5xx branch). Adopted in Phase 284 PR #171.packages/skills/src/llm-matching/jobs.ts—SKILL_MATCH_BATCH_POLL(a) initial enqueue fromhandleBatchSubmit, (b) stale-poll-guard reschedule inhandleBatchPoll, (c) in-progress reschedule inhandleBatchPoll. Adopted in Phase 284 PR #171.
When to opt out (inheritActorContext: false)
Section titled “When to opt out (inheritActorContext: false)”Rule of thumb: if you’re calling enqueueBackgroundJob() from src/instrumentation.node.ts OR from a worker self-rescheduling re-enqueue inside a SYSTEM_JOBS handler (recipient-policy.ts), pass inheritActorContext: false defensively. The worker’s ALS frame should already be non-HumanUser, but the explicit opt-out makes system-context intent visible at the call site and is robust to upstream refactors.
The “Current opt-out sites” list above enumerates the canonical sites.
Counterexample — do NOT opt out for:
- User-initiated server actions (anything triggered from
/api/*/route.tsor a"use server"form handler) — auto-attribution to the calling HumanUser is the entire point of the cascade. ForcinginheritActorContext: falsehere drops the user attribution: failure → only aconsole.warn(no user notification, no admin fanout, no silent re-route); success → no notification at all. Seerecipient-policy.tsgetRecipientForJob: aUSER_INITIATED_JOBSentry with no resolved initiator returnsnull. - Request-path enqueues from a route handler (e.g.
/agents/[agentId]/runtriggering anAGENT_BUILDER_EXECUTION) — the better-auth session is the recipient signal. - HITL resume actions — the resuming user’s identity needs to land on any downstream notification.
When in doubt: read USER_INITIATED_JOBS and SYSTEM_JOBS in recipient-policy.ts. If the job name is in USER_INITIATED_JOBS, default-on attribution is correct. If it’s in SYSTEM_JOBS and you’re enqueuing from a non-user context, opt out.
Real-time delivery
Section titled “Real-time delivery”ONE channel cinatra_notifications. Payload {userId, id}. The trigger function fn_notify_notification_insert fires on AFTER INSERT of rows with a non-null user_id. The realtime singleton in realtime.ts exposes subscribeUserNotifications(userId, cb) returning a cleanup function — the per-tab SSE route handler subscribes; the listener client auto-reconnects on error/end with exponential backoff (capped at 30s).
The SSE handler re-reads the inserted row via listNotificationsForUser(session.user.id) before pushing — defense-in-depth so the in-process emitter can never leak cross-user even if a future refactor misroutes.
Hybrid client behaviour
Section titled “Hybrid client behaviour”src/components/app-shell.tsx runs three triggers in parallel:
EventSource("/api/notifications/stream")— primary, native reconnect.- 30-second poll — backstop when SSE drops.
focus+visibilitychange— manual refresh.
Notifications are de-duped by id when both SSE push and a poll cycle deliver the same row.
Dedupe
Section titled “Dedupe”Partial unique index notifications_dedupe_job_kind_idx on (user_id, source_job_id, kind) WHERE source_job_id IS NOT NULL AND user_id IS NOT NULL. BullMQ retries inserting the same (user, job, kind) tuple swallow on ON CONFLICT DO NOTHING.
Coexistence with the legacy compat surface
Section titled “Coexistence with the legacy compat surface”src/lib/notifications.ts keeps the original 5 public functions (createNotification, listNotifications, markNotificationRead, markNotificationsReadByHrefPrefix, markAllNotificationsRead) at their pre-2026 signatures. User resolution flows through:
getAuthSession()(server actions, route handlers, server components)getActorContext()ALS frame (worker context)- Falls back to a platform-admins fan-out when neither is present, so system-context legacy callers in
packages/asset-blog/src/generation.tskeep producing visible notifications.
When you have an explicit recipient (BullMQ hook, scheduler, system event), use createNotificationForRecipient(recipient, input) directly instead.
src/lib/notifications/__tests__/service.test.ts— CRUD + dedupe (SQL shape mocked)src/lib/notifications/__tests__/recipient-policy.test.ts— job routing + admin SQLsrc/lib/notifications/__tests__/realtime.test.ts— listener + EventEmitter fanout + reconnect backoffsrc/lib/__tests__/background-jobs-auto-attribution.test.ts— cascade tierssrc/app/api/notifications/__tests__/route.test.ts— session gatesrc/app/api/notifications/__tests__/stream.test.ts— SSE session gate + scoped fanout
Operator runbook
Section titled “Operator runbook”- Set
SENTRY_DSN(andNEXT_PUBLIC_SENTRY_DSNfor browser-side capture) when standing up the Sentry/GlitchTip backend. - The schema migration runs automatically on the next
pnpm devboot viaensurePostgresSchema(). Idempotent —ADD COLUMN IF NOT EXISTS+CREATE OR REPLACE FUNCTION+DROP TRIGGER IF EXISTS. - No additional infra. Postgres
LISTEN/NOTIFYis already part of the existing PG. The EventEmitter is in-process. - For multi-process production deployment (multiple Node servers behind a load balancer), each server keeps its own
LISTENconnection and routes to only its own SSE clients — that’s correct. A future Redis pub/sub bridge becomes useful only when an event needs to reach a client on a different server than the one that wrote the row.
Known gaps
Section titled “Known gaps”- The flyout DOM in
app-shell.tsxis unchanged from pre-2026-05 — a rebuild on a cleaner component model is queued as Phase 284. confirm()dialogs (yes/no) still usewindow.confirm()— an<AlertDialog>wrapper lands with Phase 284.- Multi-channel (email, SMS, push) is not yet wired. The recipient policy is channel-agnostic; the service simply doesn’t emit to anything except the in-app inbox today. Phase 284 includes an ADR for the Novu vs. in-house provider abstraction decision.