+
Skip to content

Conversation

steven-tey
Copy link
Collaborator

@steven-tey steven-tey commented Sep 19, 2025

References:

Summary by CodeRabbit

  • New Features

    • Adds Redis stream–based publishing and asynchronous processing for link click events to improve reliability and scalability.
    • Introduces a new scheduled endpoint that aggregates workspace click usage every minute for timelier analytics.
    • Adds a safe fallback to ensure click counts are preserved if event publishing fails.
  • Chores

    • Configures a new cron schedule for automated workspace click aggregation.

Copy link
Contributor

vercel bot commented Sep 19, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Updated (UTC)
dub Ready Ready Preview Oct 3, 2025 0:08am

Copy link
Contributor

coderabbitai bot commented Sep 19, 2025

Walkthrough

Introduces a Redis Streams click-event publish path with DB-update fallback on publish failure, adds a Redis-streams utility with types and APIs, schedules a Vercel cron, and adds a Next.js cron route that batches and applies workspace usage updates from the stream.

Changes

Cohort / File(s) Summary
Click event publish + fallback
apps/web/lib/tinybird/record-click.ts
Replaces direct DB increment for link clicks with an asynchronous publishClickEvent({ linkId, workspaceId, timestamp }) call and chains a fallback to the previous DB update when publishing fails.
Redis streams utility
apps/web/lib/upstash/redis-streams.ts
New Redis streams module: exports WORKSPACE_USAGE_UPDATES_STREAM_KEY, ClickEvent interface, RedisStreamEntry<T> type, RedisStream class (processBatch, getStreamInfo), workspaceUsageStream instance, and publishClickEvent(event) to enqueue click events.
Cron route + schedule
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts, apps/web/vercel.json
Adds a Next.js GET route (dynamic = "force-dynamic") that reads ClickEvent entries via RedisStream.processBatch, aggregates by workspace, updates DB in batched sub-requests (with per-update tracking), deletes processed stream entries, and returns a JSON summary; adds a Vercel cron entry to invoke the route every minute.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant U as User
  participant Web as Web App
  participant RS as Redis Streams
  participant DB as Database

  U->>Web: Click link
  Web->>RS: publishClickEvent({linkId, workspaceId, timestamp})
  alt publish succeeds
    RS-->>Web: ack
  else publish fails
    RS-->>Web: error
    Web->>DB: Fallback — increment usage/totalClicks
    DB-->>Web: ack
  end
Loading
sequenceDiagram
  autonumber
  participant Cron as Vercel Cron
  participant Route as /api/cron/usage/update-workspace-clicks
  participant RS as Redis Streams
  participant DB as Database

  Cron->>Route: Invoke (every minute)
  Route->>RS: workspaceUsageStream.processBatch (xrange)
  RS-->>Route: ClickEvent entries[]
  Route->>Route: aggregate by workspace (clicks, first/last ts, entryIds)
  loop per sub-batch (<=50)
    Route->>DB: batched workspace usage updates
    DB-->>Route: per-workspace results
  end
  Route->>RS: xdel processed entry IDs (cleanup)
  Route-->>Cron: JSON summary (processed, errors, lastProcessedId, streamInfo)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~55 minutes

Possibly related PRs

Suggested reviewers

  • devkiran

Poem

I hop and queue each tiny click with care,
Into streams they tumble, whispering in air.
Cron wakes each minute to count the tracks,
If streams look shy, I nibble DB snacks.
A rabbit's ledger — neat, robust, and fair. 🐰✨

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit's high-level summary is enabled.
Title Check ✅ Passed The title succinctly describes the primary change—using Redis Streams to offload usage updates—and aligns with the modifications introducing a stream-based publish path for click events and cron-driven processing of project/workspace usage metrics.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch redis-streams

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ab39482 and 923284b.

📒 Files selected for processing (1)
  • apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (3)
apps/web/lib/upstash/redis-streams.ts (3)
  • RedisStreamEntry (12-15)
  • ClickEvent (6-10)
  • workspaceUsageStream (150-152)
apps/web/lib/cron/verify-vercel.ts (1)
  • verifyVercelSignature (3-19)
apps/web/lib/api/errors.ts (1)
  • handleAndReturnErrorResponse (175-178)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

vercel bot commented Oct 2, 2025

Deployment failed with the following error:

Error while validating your Cron Jobs expressions:

            - Expected 5 values, but got 6. (Input cron: '*/5 * * * * *')

Learn More: https://vercel.link/4kmZrhf

@steven-tey steven-tey marked this pull request as ready for review October 2, 2025 23:25
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
apps/web/app/(ee)/api/cron/projects/usage-updates-queue/route.ts (3)

81-88: Use array length check.

entries is an array; Object.keys(entries).length is unnecessary and confusing.

-      if (!entries || Object.keys(entries).length === 0) {
+      if (!entries || entries.length === 0) {

113-116: Reduce noisy batch logging.

Dumping full batches (including entryIds) can flood logs. Log counts only.

-        console.log(`BATCH: ${JSON.stringify(batch)}`);
+        console.log(`Processing sub-batch size=${batch.length}`);

121-124: Escape reserved column name usage.

If MySQL treats usage as reserved, quote it to avoid edge cases.

-              await conn.execute(
-                "UPDATE Project p SET p.usage = p.usage + ?, p.totalClicks = p.totalClicks + ? WHERE id = ?",
-                [update.usage, update.clicks, update.projectId],
-              );
+              await conn.execute(
+                "UPDATE Project p SET p.`usage` = p.`usage` + ?, p.totalClicks = p.totalClicks + ? WHERE id = ?",
+                [update.usage, update.clicks, update.projectId],
+              );
apps/web/lib/upstash/redis-streams.ts (1)

86-144: Use XLEN for O(1) length; avoid scanning entire stream.

Current implementation reads the whole stream to compute length. Replace with XLEN and single-entry range calls.

   async getStreamInfo(): Promise<{
     length: number;
     firstEntryId: string | null;
     lastEntryId: string | null;
   }> {
     try {
-      // Get stream length - check if stream exists first
-      let length = 0;
-      let firstEntryId: string | null = null;
-      let lastEntryId: string | null = null;
-
-      try {
-        // Try to get first entry to check if stream exists
-        const firstEntry = await redis.xrange(this.streamKey, "-", "+", 1);
-
-        if (firstEntry && Object.keys(firstEntry).length > 0) {
-          firstEntryId = Object.keys(firstEntry)[0];
-
-          // If stream exists, get its length and last entry
-          const entries = await redis.xrange(this.streamKey, "-", "+");
-          length = Object.keys(entries || {}).length;
-
-          if (length > 0) {
-            // Use "$" to get the last entry instead of "+"
-            const lastEntry = await redis.xrevrange(
-              this.streamKey,
-              "+",
-              "-",
-              1,
-            );
-
-            if (lastEntry && Object.keys(lastEntry).length > 0) {
-              const entryId = Object.keys(lastEntry)[0];
-
-              if (entryId !== firstEntryId) {
-                lastEntryId = entryId;
-              }
-            }
-          }
-        }
-      } catch (streamError) {
-        // Stream might not exist yet, which is fine
-        console.log("Stream does not exist or is empty:", streamError);
-        console.log(streamError.message);
-        console.log(streamError.type);
-        console.log(JSON.stringify(streamError, null, 2));
-      }
-
-      return {
-        length,
-        firstEntryId,
-        lastEntryId,
-      };
+      const length = await redis.xlen(this.streamKey).catch(() => 0);
+      let firstEntryId: string | null = null;
+      let lastEntryId: string | null = null;
+
+      if (length > 0) {
+        const first = await redis.xrange(this.streamKey, "-", "+", 1);
+        if (first && Object.keys(first).length > 0) {
+          firstEntryId = Object.keys(first)[0];
+        }
+        const last = await redis.xrevrange(this.streamKey, "+", "-", 1);
+        if (last && Object.keys(last).length > 0) {
+          lastEntryId = Object.keys(last)[0];
+        }
+      }
+
+      return { length, firstEntryId, lastEntryId };
     } catch (error) {
       console.error("Failed to get stream info:", error);
       throw error;
     }
   }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c5e4882 and fad2884.

📒 Files selected for processing (4)
  • apps/web/app/(ee)/api/cron/projects/usage-updates-queue/route.ts (1 hunks)
  • apps/web/lib/tinybird/record-click.ts (2 hunks)
  • apps/web/lib/upstash/redis-streams.ts (1 hunks)
  • apps/web/vercel.json (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
apps/web/app/(ee)/api/cron/projects/usage-updates-queue/route.ts (3)
apps/web/lib/upstash/redis-streams.ts (3)
  • Entry (13-16)
  • ClickEvent (6-11)
  • projectUsageStream (148-150)
apps/web/lib/cron/verify-vercel.ts (1)
  • verifyVercelSignature (3-19)
apps/web/lib/api/errors.ts (1)
  • handleAndReturnErrorResponse (175-178)
apps/web/lib/tinybird/record-click.ts (1)
apps/web/lib/upstash/redis-streams.ts (1)
  • publishClickEvent (153-172)
apps/web/lib/upstash/redis-streams.ts (1)
apps/web/lib/upstash/redis.ts (1)
  • redis (4-7)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (1)
apps/web/lib/upstash/redis-streams.ts (1)

49-62: Confirm XRANGE return shape.

This code assumes XRANGE returns a map object keyed by ID. Some clients return arrays. Verify the Upstash TS client’s return type to avoid runtime shape mismatches.

Run a quick check against the client API docs and adjust mapping accordingly (Object.entries vs array mapping).

@steven-tey steven-tey changed the title Use Redis Streams to offload link.clicks and project.usage updates Use Redis Streams to offload project.usage updates Oct 2, 2025
@steven-tey steven-tey changed the title Use Redis Streams to offload project.usage updates Use Redis Streams to offload project.usage updates Oct 2, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (1)

35-40: Entries without workspaceId are dropped and never deleted.

You continue on missing workspaceId, so these IDs aren't included in processedEntryIds and remain forever. This causes reprocessing loops.

Apply this diff to track invalid entries for deletion:

 const aggregateWorkspaceUsage = (
   entries: RedisStreamEntry<ClickEvent>[],
-): { updates: WorkspaceAggregateUsage[]; lastProcessedId: string | null } => {
+): {
+  updates: WorkspaceAggregateUsage[];
+  lastProcessedId: string | null;
+  invalidEntryIds: string[];
+} => {
   // Aggregate usage by workspaceId
   const aggregatedUsage = new Map<string, WorkspaceAggregateUsage>();
 
   let lastId: string | null = null;
+  const invalidEntryIds: string[] = [];
 
   for (const entry of entries) {
     const workspaceId = entry.data.workspaceId;
 
     if (!workspaceId) {
+      invalidEntryIds.push(entry.id);
       continue;
     }

And include invalid IDs in the return:

   return {
     updates: Array.from(aggregatedUsage.values()),
     lastProcessedId: lastId,
+    invalidEntryIds,
   };

Then update the caller at Line 82:

-      const { updates, lastProcessedId } = aggregateWorkspaceUsage(entries);
+      const { updates, lastProcessedId, invalidEntryIds } = aggregateWorkspaceUsage(entries);

And include invalid IDs in processedEntryIds at Line 103:

-      const processedEntryIds: string[] = [];
+      const processedEntryIds: string[] = [...invalidEntryIds];
🧹 Nitpick comments (5)
apps/web/lib/upstash/redis-streams.ts (3)

79-82: Error message hardcoded for workspace usage in generic method.

The error message "Failed to read workspace usage updates from stream:" is specific to workspace usage, but processBatch is a generic method used for any stream type. Use a generic message or make the error context configurable.

Apply this diff:

       console.error(
-        "Failed to read workspace usage updates from stream:",
+        `Failed to read from stream ${this.streamKey}:`,
         error,
       );

101-136: Reduce excessive debug logging.

Lines 132-135 log the same error information multiple times (streamError, streamError.message, streamError.type, and JSON.stringify(streamError)). This clutters logs for a non-critical path (stream not existing is normal).

Apply this diff to simplify:

       } catch (streamError) {
         // Stream might not exist yet, which is fine
-        console.log("Stream does not exist or is empty:", streamError);
-        console.log(streamError.message);
-        console.log(streamError.type);
-        console.log(JSON.stringify(streamError, null, 2));
+        console.log("Stream does not exist or is empty");
       }

109-110: Fetching all entries to compute length may be inefficient.

Line 109 calls xrange(this.streamKey, "-", "+") without a count limit, loading all entries into memory just to count them. For large streams, this is inefficient. Consider using XINFO STREAM if available or fetching with a count parameter and computing approximate length.

If the Redis client supports XINFO STREAM, use that command to get stream metadata directly. Otherwise, document that this method is best-effort and may be slow for large streams.

apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (2)

106-106: Verbose logging of entire batch JSON.

Line 106 logs JSON.stringify(batch) for each sub-batch of up to 50 updates. This will produce very large log entries and clutter logs. Consider logging only a summary (e.g., count and first few workspace IDs).

Apply this diff:

-        console.log(`BATCH: ${JSON.stringify(batch)}`);
+        console.log(`Processing sub-batch of ${batch.length} updates`);

202-213: Consider getStreamInfo performance impact.

Line 202 calls workspaceUsageStream.getStreamInfo() after processing, which fetches all entries to compute length (as noted in the redis-streams.ts review). For large streams, this could add significant latency to the cron response. Consider whether stream info is essential for the response or if it can be computed asynchronously or cached.

If stream info is primarily for monitoring/logging, consider:

  1. Making it optional and skipping on timeout risk
  2. Computing it asynchronously and logging separately
  3. Using approximate metrics instead of exact counts
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9e1305f and ab39482.

📒 Files selected for processing (3)
  • apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (1 hunks)
  • apps/web/lib/tinybird/record-click.ts (2 hunks)
  • apps/web/lib/upstash/redis-streams.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • apps/web/lib/tinybird/record-click.ts
🧰 Additional context used
🧬 Code graph analysis (2)
apps/web/lib/upstash/redis-streams.ts (1)
apps/web/lib/upstash/redis.ts (1)
  • redis (4-7)
apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (3)
apps/web/lib/upstash/redis-streams.ts (3)
  • RedisStreamEntry (12-15)
  • ClickEvent (6-10)
  • workspaceUsageStream (150-152)
apps/web/lib/cron/verify-vercel.ts (1)
  • verifyVercelSignature (3-19)
apps/web/lib/api/errors.ts (1)
  • handleAndReturnErrorResponse (175-178)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (10)
apps/web/lib/upstash/redis-streams.ts (6)

1-4: LGTM!

Clean imports and well-named stream key constant.


6-10: LGTM!

The ClickEvent interface is well-structured for Redis stream serialization.


12-15: LGTM!

Appropriate generic type for stream entries.


17-22: LGTM!

Simple and correct constructor implementation.


150-152: LGTM!

Correctly instantiates the workspace usage stream.


162-163: TODO: Clarify future plans for link click updates stream.

The commented-out xadd for LINK_CLICK_UPDATES_STREAM_KEY suggests a future feature. Document whether this is a near-term plan or can be removed.

Do you want to track this TODO in a separate issue or remove it if it's not on the roadmap?

apps/web/app/(ee)/api/cron/usage/update-workspace-clicks/route.ts (4)

1-13: LGTM!

Imports and constants are correctly defined for the cron route.


15-22: LGTM!

Well-structured type for aggregated workspace usage.


184-218: LGTM with caveats!

The GET handler correctly verifies the Vercel signature, processes the batch, and returns a structured response. Assuming the critical issues with entry deletion and timestamp validation are addressed, the overall flow is sound.


112-115: Remove incorrect schema concern. The Project table models workspaces throughout the codebase (e.g., Pick<Project, 'id'> as workspace), so id aligns with workspaceId.

Likely an incorrect or invalid review comment.

@steven-tey steven-tey merged commit 12aff5d into main Oct 3, 2025
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载