an experiment by ben davis that went WAY too far...
Warning
As I said, this is alpha software that's gonna change. A new branch has been made and is in progress (ben/river-alpha-v2). It's a very heavy refactor of all the api's that will result in the ability to do resumable streams, have a unified api for all the agents, and support more frameworks then sveltekit...
<script lang="ts">
import { myRiverClient } from '$lib/river/client';
// ALL of this is type safe, feels just like TRPC
const { start, stop } = myRiverClient.basicExample({
onStart: () => {
console.log('starting basic example');
},
onChunk: (chunk) => {
// full type safety on the chunks
},
onCancel: () => {
console.warn('you cancelled the basic example');
},
onError: (error) => {
console.error('error in basic example', error);
},
onComplete: ({ totalChunks, duration }) => {
console.log(`basic example completed in ${duration}ms with ${totalChunks} chunks`);
}
});
</script>
bun i @davis7dotsh/river-alpha
this is alpha software, use it at your own risk. api's will change, bugs will be fixed, features will be added, etc...
- full type safety
- rpc-like function calling
- trpc mutation like interface for consuming the streams
- ai sdk streaming support with full stack type safety
- custom stream support with zod validation on chunks
this project does actually work right now, but it is very early in development and NOT recommended for production use. it is in alpha, the apis will change a lot...
- get an openrouter api key
- add it to your
.env.local
file (see.env.example
) bun i
bun dev
if you want to try this out, it's now available on npm!
i've built out two examples, one using the ai-sdk and one using a custom stream.
they're both are fully type safe, are pleasant to work in, and work great: check them out
- create a new sveltekit project (if you don't have one already)
bunx sv create river-demo
- install dependencies
bun i @davis7dotsh/river-alpha zod
- setup your first agent (this looks slightly different for the ai-sdk agents)
// src/lib/river/agents.ts
import { RIVER_SERVER } from '@davis7dotsh/river-alpha';
import { z } from 'zod';
export const basicExampleAgent = RIVER_SERVER.createCustomAgent({
inputSchema: z.object({
message: z.string()
}),
streamChunkSchema: z.object({
letter: z.string(),
isVowel: z.boolean()
}),
// a stream will automatically be created for you when you call this agent
// first param is the input, second param is a function to append chunks to the stream
// the stream will close when the agent returns
agent: async ({ message }, appendChunk) => {
const letters = message.split('');
const onlyLetters = letters.filter((letter) => /^[a-zA-Z]$/.test(letter));
for (let i = 0; i < onlyLetters.length; i++) {
const letter = onlyLetters[i];
const isVowel = /^[aeiou]$/i.test(letter);
appendChunk({ letter, isVowel });
await new Promise((resolve) => setTimeout(resolve, 20));
}
}
});
- setup your router
// src/lib/river/router.ts
import { RIVER_SERVER } from '@davis7dotsh/river-alpha';
import { basicExampleAgent } from './agents';
export const myRiverRouter = RIVER_SERVER.createAgentRouter({
// I recommend having the key not be the name of the agent, this will make the go to definition experience much better
basicExample: basicExampleAgent
});
// this is to get type inference on the client
export type MyRiverRouter = typeof myRiverRouter;
- setup the endpoint
// src/routes/api/river/+server.ts
import { myRiverRouter } from '$lib/river/router';
import { RIVER_SERVER } from '@davis7dotsh/river-alpha';
// this is all it takes, nothing else needed
// NOTE: this is sveltekit specific, more frameworks coming eventually...
export const { POST } = RIVER_SERVER.createServerEndpointHandler(myRiverRouter);
- setup the client
// src/lib/river/client.ts
import { RIVER_CLIENT } from '@davis7dotsh/river-alpha';
import type { MyRiverRouter } from './router';
// similar to a trpc client, this is how we call the agents from the client
export const myRiverClient = RIVER_CLIENT.createClientCaller<MyRiverRouter>('/api/river');
- use your agent on the client with a client side caller
<!-- src/routes/+page.svelte -->
<script lang="ts">
import { myRiverClient } from '$lib/river/client';
// this works just like mutations in trpc, it will not actually run until you call start
// the callbacks are optional, and will fire when they are defined and the agent starts
const basicExampleCaller = myRiverClient.basicExample({
onStart: () => {
// fires when the agent starts
console.log('Starting basic example');
},
onChunk: ({ letter, isVowel }) => {
// fires when a chunk is received
// will always just have one chunk and is fully type safe
console.log(`${letter} is ${isVowel ? 'a vowel' : 'a consonant'}`);
},
onCancel: () => {
// fires when the agent is cancelled/stopped
console.log('You cancelled the basic example');
},
onError: (error) => {
// fires when the agent errors
console.error('Error in basic example', error);
},
onComplete: ({ totalChunks, duration }) => {
// fires when the agent completes
// this will ALWAYS fire last, even if the agent was cancelled or errored
console.log(`Basic example completed in ${duration}ms with ${totalChunks} chunks`);
}
});
const handleStart = async () => {
// actually starts the agent
await basicExampleCaller.start({
message: 'This is in fact a message'
});
};
const handleCancel = () => {
// stops the agent (uses an abort controller under the hood)
basicExampleCaller.stop();
};
</script>
<!-- some UI to to consume and start the stream -->
- streams went from something you touch every once and a while, to something we're using all the time
- i want typesafety
- mutations are awesome in tanstack query, i want them for streams
- rpc >>>>>>
- streams are a pain to consume out of the box (readers and encoders and raw fetch and type casting and more annoying shit)
- cleanup package deps, currently too many are bundled in
- more robust error handling on both client and server. want to do something similar to trpc's
TRPCError
- stream resumability support. need to figure out a good way to dump the stream to a persistent store so we can easily resume later will require api changes
- "waitUntil" support. this pretty much goes hand and hand with stream resumability
see the examples for more detailed usage, these api's will change...
- agents: these come in two flavors,
AiSdkAgent
andCustomAgent
. The ai-sdk agent is for when you want to use thestreamText
function from theai
package. The custom agent is for when you want to do custom stuff and just need a type safe stream (validated with zod) - agent router: the is the thing you create on the server which will allow you to call agents. VERY similar to a TRPC router.
- agent client: this is the client side primitive for actually calling agents. It's fully type safe (grabs types from the router) and feels like the trpc client.
- endpoint handler: this is something you will basically never touch. it's just a function that returns a POST handler for actually processing your requests
these are a few helper types I made that really help with getting good type safety in your clients. the names are a bit verbose, but at least they're descriptive...
// gets the "tool set" type (a record of tool names to their tool types) for an ai-sdk agent
type AiSdkAgentToolSet = RiverClientCallerAiSdkToolSetType<typeof riverClient.exampleAiSdkAgent>;
// gets the input type for a tool call for an ai-sdk agent. pass in the tool set type and the tool name
type ImposterToolCallInputType = RiverClientCallerToolCallInputType<
AiSdkAgentToolSet,
'imposterCheck'
>;
// gets the output type for a tool call for an ai-sdk agent. pass in the tool set type and the tool name
type ImposterToolCallOutputType = RiverClientCallerToolCallOutputType<
AiSdkAgentToolSet,
'imposterCheck'
>;
// gets the chunk type for an agent (the thing passed to the onChunk callback)
type AiSdkAgentChunk = RiverClientCallerChunkType<typeof riverClient.exampleAiSdkAgent>;
// gets the input type for an agent (the thing passed to the start function)
type AiSdkAgentInputType = RiverClientCallerInputType<typeof riverClient.exampleAiSdkAgent>;
if you have feedback or want to contribute, don't hesitate. best place to reach out is on my twitter @bmdavis419