-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Use Redis Streams to offload project.usage
updates
#2864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The latest updates on your projects. Learn more about Vercel for GitHub.
|
WalkthroughIntroduces a Redis Streams click-event publish path with DB-update fallback on publish failure, adds a Redis-streams utility with types and APIs, schedules a Vercel cron, and adds a Next.js cron route that batches and applies workspace usage updates from the stream. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant U as User
participant Web as Web App
participant RS as Redis Streams
participant DB as Database
U->>Web: Click link
Web->>RS: publishClickEvent({linkId, workspaceId, timestamp})
alt publish succeeds
RS-->>Web: ack
else publish fails
RS-->>Web: error
Web->>DB: Fallback — increment usage/totalClicks
DB-->>Web: ack
end
sequenceDiagram
autonumber
participant Cron as Vercel Cron
participant Route as /api/cron/usage/update-workspace-clicks
participant RS as Redis Streams
participant DB as Database
Cron->>Route: Invoke (every minute)
Route->>RS: workspaceUsageStream.processBatch (xrange)
RS-->>Route: ClickEvent entries[]
Route->>Route: aggregate by workspace (clicks, first/last ts, entryIds)
loop per sub-batch (<=50)
Route->>DB: batched workspace usage updates
DB-->>Route: per-workspace results
end
Route->>RS: xdel processed entry IDs (cleanup)
Route-->>Cron: JSON summary (processed, errors, lastProcessedId, streamInfo)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~55 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧬 Code graph analysis (1)apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (3)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Deployment failed with the following error:
Learn More: https://vercel.link/4kmZrhf |
b210ca0
to
7474553
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (4)
apps/web/app/(ee)/api/cron/projects/usage-updates-queue/route.ts (3)
81-88
: Use array length check.entries is an array; Object.keys(entries).length is unnecessary and confusing.
- if (!entries || Object.keys(entries).length === 0) { + if (!entries || entries.length === 0) {
113-116
: Reduce noisy batch logging.Dumping full batches (including entryIds) can flood logs. Log counts only.
- console.log(`BATCH: ${JSON.stringify(batch)}`); + console.log(`Processing sub-batch size=${batch.length}`);
121-124
: Escape reserved column name usage.If MySQL treats usage as reserved, quote it to avoid edge cases.
- await conn.execute( - "UPDATE Project p SET p.usage = p.usage + ?, p.totalClicks = p.totalClicks + ? WHERE id = ?", - [update.usage, update.clicks, update.projectId], - ); + await conn.execute( + "UPDATE Project p SET p.`usage` = p.`usage` + ?, p.totalClicks = p.totalClicks + ? WHERE id = ?", + [update.usage, update.clicks, update.projectId], + );apps/web/lib/upstash/redis-streams.ts (1)
86-144
: Use XLEN for O(1) length; avoid scanning entire stream.Current implementation reads the whole stream to compute length. Replace with XLEN and single-entry range calls.
async getStreamInfo(): Promise<{ length: number; firstEntryId: string | null; lastEntryId: string | null; }> { try { - // Get stream length - check if stream exists first - let length = 0; - let firstEntryId: string | null = null; - let lastEntryId: string | null = null; - - try { - // Try to get first entry to check if stream exists - const firstEntry = await redis.xrange(this.streamKey, "-", "+", 1); - - if (firstEntry && Object.keys(firstEntry).length > 0) { - firstEntryId = Object.keys(firstEntry)[0]; - - // If stream exists, get its length and last entry - const entries = await redis.xrange(this.streamKey, "-", "+"); - length = Object.keys(entries || {}).length; - - if (length > 0) { - // Use "$" to get the last entry instead of "+" - const lastEntry = await redis.xrevrange( - this.streamKey, - "+", - "-", - 1, - ); - - if (lastEntry && Object.keys(lastEntry).length > 0) { - const entryId = Object.keys(lastEntry)[0]; - - if (entryId !== firstEntryId) { - lastEntryId = entryId; - } - } - } - } - } catch (streamError) { - // Stream might not exist yet, which is fine - console.log("Stream does not exist or is empty:", streamError); - console.log(streamError.message); - console.log(streamError.type); - console.log(JSON.stringify(streamError, null, 2)); - } - - return { - length, - firstEntryId, - lastEntryId, - }; + const length = await redis.xlen(this.streamKey).catch(() => 0); + let firstEntryId: string | null = null; + let lastEntryId: string | null = null; + + if (length > 0) { + const first = await redis.xrange(this.streamKey, "-", "+", 1); + if (first && Object.keys(first).length > 0) { + firstEntryId = Object.keys(first)[0]; + } + const last = await redis.xrevrange(this.streamKey, "+", "-", 1); + if (last && Object.keys(last).length > 0) { + lastEntryId = Object.keys(last)[0]; + } + } + + return { length, firstEntryId, lastEntryId }; } catch (error) { console.error("Failed to get stream info:", error); throw error; } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
apps/web/app/(ee)/api/cron/projects/usage-updates-queue/route.ts
(1 hunks)apps/web/lib/tinybird/record-click.ts
(2 hunks)apps/web/lib/upstash/redis-streams.ts
(1 hunks)apps/web/vercel.json
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
apps/web/app/(ee)/api/cron/projects/usage-updates-queue/route.ts (3)
apps/web/lib/upstash/redis-streams.ts (3)
Entry
(13-16)ClickEvent
(6-11)projectUsageStream
(148-150)apps/web/lib/cron/verify-vercel.ts (1)
verifyVercelSignature
(3-19)apps/web/lib/api/errors.ts (1)
handleAndReturnErrorResponse
(175-178)
apps/web/lib/tinybird/record-click.ts (1)
apps/web/lib/upstash/redis-streams.ts (1)
publishClickEvent
(153-172)
apps/web/lib/upstash/redis-streams.ts (1)
apps/web/lib/upstash/redis.ts (1)
redis
(4-7)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (1)
apps/web/lib/upstash/redis-streams.ts (1)
49-62
: Confirm XRANGE return shape.This code assumes XRANGE returns a map object keyed by ID. Some clients return arrays. Verify the Upstash TS client’s return type to avoid runtime shape mismatches.
Run a quick check against the client API docs and adjust mapping accordingly (Object.entries vs array mapping).
project.usage
updates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (1)
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (1)
35-40
: Entries without workspaceId are dropped and never deleted.You continue on missing workspaceId, so these IDs aren't included in processedEntryIds and remain forever. This causes reprocessing loops.
Apply this diff to track invalid entries for deletion:
const aggregateWorkspaceUsage = ( entries: RedisStreamEntry<ClickEvent>[], -): { updates: WorkspaceAggregateUsage[]; lastProcessedId: string | null } => { +): { + updates: WorkspaceAggregateUsage[]; + lastProcessedId: string | null; + invalidEntryIds: string[]; +} => { // Aggregate usage by workspaceId const aggregatedUsage = new Map<string, WorkspaceAggregateUsage>(); let lastId: string | null = null; + const invalidEntryIds: string[] = []; for (const entry of entries) { const workspaceId = entry.data.workspaceId; if (!workspaceId) { + invalidEntryIds.push(entry.id); continue; }And include invalid IDs in the return:
return { updates: Array.from(aggregatedUsage.values()), lastProcessedId: lastId, + invalidEntryIds, };
Then update the caller at Line 82:
- const { updates, lastProcessedId } = aggregateWorkspaceUsage(entries); + const { updates, lastProcessedId, invalidEntryIds } = aggregateWorkspaceUsage(entries);And include invalid IDs in processedEntryIds at Line 103:
- const processedEntryIds: string[] = []; + const processedEntryIds: string[] = [...invalidEntryIds];
🧹 Nitpick comments (5)
apps/web/lib/upstash/redis-streams.ts (3)
79-82
: Error message hardcoded for workspace usage in generic method.The error message "Failed to read workspace usage updates from stream:" is specific to workspace usage, but
processBatch
is a generic method used for any stream type. Use a generic message or make the error context configurable.Apply this diff:
console.error( - "Failed to read workspace usage updates from stream:", + `Failed to read from stream ${this.streamKey}:`, error, );
101-136
: Reduce excessive debug logging.Lines 132-135 log the same error information multiple times (
streamError
,streamError.message
,streamError.type
, andJSON.stringify(streamError)
). This clutters logs for a non-critical path (stream not existing is normal).Apply this diff to simplify:
} catch (streamError) { // Stream might not exist yet, which is fine - console.log("Stream does not exist or is empty:", streamError); - console.log(streamError.message); - console.log(streamError.type); - console.log(JSON.stringify(streamError, null, 2)); + console.log("Stream does not exist or is empty"); }
109-110
: Fetching all entries to compute length may be inefficient.Line 109 calls
xrange(this.streamKey, "-", "+")
without a count limit, loading all entries into memory just to count them. For large streams, this is inefficient. Consider usingXINFO STREAM
if available or fetching with a count parameter and computing approximate length.If the Redis client supports
XINFO STREAM
, use that command to get stream metadata directly. Otherwise, document that this method is best-effort and may be slow for large streams.apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (2)
106-106
: Verbose logging of entire batch JSON.Line 106 logs
JSON.stringify(batch)
for each sub-batch of up to 50 updates. This will produce very large log entries and clutter logs. Consider logging only a summary (e.g., count and first few workspace IDs).Apply this diff:
- console.log(`BATCH: ${JSON.stringify(batch)}`); + console.log(`Processing sub-batch of ${batch.length} updates`);
202-213
: Consider getStreamInfo performance impact.Line 202 calls
workspaceUsageStream.getStreamInfo()
after processing, which fetches all entries to compute length (as noted in the redis-streams.ts review). For large streams, this could add significant latency to the cron response. Consider whether stream info is essential for the response or if it can be computed asynchronously or cached.If stream info is primarily for monitoring/logging, consider:
- Making it optional and skipping on timeout risk
- Computing it asynchronously and logging separately
- Using approximate metrics instead of exact counts
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts
(1 hunks)apps/web/lib/tinybird/record-click.ts
(2 hunks)apps/web/lib/upstash/redis-streams.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/web/lib/tinybird/record-click.ts
🧰 Additional context used
🧬 Code graph analysis (2)
apps/web/lib/upstash/redis-streams.ts (1)
apps/web/lib/upstash/redis.ts (1)
redis
(4-7)
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (3)
apps/web/lib/upstash/redis-streams.ts (3)
RedisStreamEntry
(12-15)ClickEvent
(6-10)workspaceUsageStream
(150-152)apps/web/lib/cron/verify-vercel.ts (1)
verifyVercelSignature
(3-19)apps/web/lib/api/errors.ts (1)
handleAndReturnErrorResponse
(175-178)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (10)
apps/web/lib/upstash/redis-streams.ts (6)
1-4
: LGTM!Clean imports and well-named stream key constant.
6-10
: LGTM!The ClickEvent interface is well-structured for Redis stream serialization.
12-15
: LGTM!Appropriate generic type for stream entries.
17-22
: LGTM!Simple and correct constructor implementation.
150-152
: LGTM!Correctly instantiates the workspace usage stream.
162-163
: TODO: Clarify future plans for link click updates stream.The commented-out
xadd
forLINK_CLICK_UPDATES_STREAM_KEY
suggests a future feature. Document whether this is a near-term plan or can be removed.Do you want to track this TODO in a separate issue or remove it if it's not on the roadmap?
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (4)
1-13
: LGTM!Imports and constants are correctly defined for the cron route.
15-22
: LGTM!Well-structured type for aggregated workspace usage.
184-218
: LGTM with caveats!The GET handler correctly verifies the Vercel signature, processes the batch, and returns a structured response. Assuming the critical issues with entry deletion and timestamp validation are addressed, the overall flow is sound.
112-115
: Remove incorrect schema concern. TheProject
table models workspaces throughout the codebase (e.g.,Pick<Project, 'id'>
asworkspace
), soid
aligns withworkspaceId
.Likely an incorrect or invalid review comment.
References:
Summary by CodeRabbit
New Features
Chores