-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Description
Gemini Abstract
This is an idea to extend go-micro to support agents. It aligns with the evolving landscape of AI-powered distributed systems, where autonomous and goal-oriented components are becoming increasingly crucial.
Here's an outline of what micro.NewAgent
and an Agent
interface could encompass, its functionality, developer interaction, and an example:
The micro.NewAgent
and Agent
Interface: Empowering Autonomous Components
The core idea is to provide a higher-level abstraction for building intelligent, self-managing, and often AI-driven components within a distributed system. While micro.NewService
focuses on exposing functionalities (RPCs, Pub/Sub), micro.NewAgent
would focus on defining an agent's behavior and its interaction with the environment and other services.
What would this encompass?
A micro.NewAgent
would essentially wrap the capabilities of a micro.Service
with additional features and a different lifecycle suited for autonomous operations. It would encapsulate:
- Identity and Discovery: Like services, agents need to be discoverable within the go-micro ecosystem.
- State Management: Agents often maintain internal state, which might need to be persistent or accessible across agent instances.
- Perception: The ability to receive and interpret information from its environment (e.g., events, messages from other services, external data sources).
- Reasoning/Decision Making: This is where the
genai
package becomes central. Agents would use AI models to process perceived information, formulate goals, and decide on actions. - Action/Execution: The ability to perform actions, which could involve calling other microservices, publishing events, interacting with external APIs, or modifying its own state.
- Learning/Adaptation (Optional but powerful): Agents might learn from their experiences, updating their internal models or decision-making processes over time.
- Lifecycle Management: How an agent starts, stops, rebalances, and recovers from failures in a distributed environment.
What kind of functionality would it have?
The Agent
interface would expose methods crucial for building and managing intelligent agents:
Init(...Option) error
: Similar toService.Init
, for configuration and initialization of agent-specific components (e.g., AI model clients, state stores).HandleEvent(ctx context.Context, event micro.Event) error
: A primary mechanism for agents to react to asynchronous events published on the message bus. This is where the agent's "perception" loop would often begin.ProcessRequest(ctx context.Context, req Request) (Response, error)
: While agents are more autonomous, they might still expose RPC-like endpoints for direct queries or control from other services or human operators.Run() error
: Starts the agent's main loop, which would involve listening for events, performing internal reasoning, and executing actions.Stop() error
: Gracefully shuts down the agent.SetBrain(brain genai.Model)
: Allows injecting a generative AI model into the agent for its reasoning capabilities.SetMemory(memory datastore.Store)
: Provides a mechanism to attach a persistent memory store.Plan(ctx context.Context, goal string, contextData map[string]interface{}) ([]Action, error)
: A potential method for an agent to formulate a series of actions based on a high-level goal and provided context. This would heavily leverage thegenai
package.Execute(ctx context.Context, action Action) error
: A method to execute a planned action.
How would developers use and interact with it?
Developers would interact with micro.NewAgent
in a similar fashion to micro.NewService
, but with a shift in mindset towards defining intelligent behaviors rather than just API endpoints.
- Defining Agent Behavior: Developers would implement the
Agent
interface, providing the concrete logic for how their agent perceives, reasons, and acts. - Integrating with GenAI: The
genai
package would be used within the agent's logic to interact with LLMs for tasks like:- Goal Interpretation: Translating natural language goals into structured plans.
- Contextual Understanding: Analyzing incoming data and events to build a rich understanding of the current situation.
- Decision Making: Choosing the best course of action based on the current state and objectives.
- Content Generation: Producing human-readable responses, summaries, or new data.
- Tooling Integration: Agents would likely expose "tools" (e.g., RPC methods on other services, external APIs) that the LLM can decide to use as part of its plan. The
genai
package often supports function calling, which would be a natural fit here. - Observability: Monitoring agent performance, decision paths, and interactions would be crucial. Go-micro's existing observability features (tracing, metrics) would be extended for agents.
Outline of Main Features and Functionality
micro.NewAgent
Functionality:
- Agent Lifecycle Management:
NewAgent(opts ...Option) Agent
: Creates a new agent instance.Run()
: Starts the agent's event loop and processing.Stop()
: Gracefully stops the agent.
- Core Agent Capabilities:
- Perception: Ability to subscribe to and process messages from the go-micro message bus (Pub/Sub), or receive direct RPC requests.
- Reasoning Engine Integration: Seamless integration with the
genai
package for large language model (LLM) interactions (text generation, chat, function calling). - Memory Management: Pluggable interfaces for persistent and short-term memory stores (e.g., key-value stores, vector databases for RAG).
- Tool Usage: Mechanism for agents to discover and invoke other microservices or external APIs as "tools" to achieve their goals.
- Autonomous Operation: Handles continuous processing based on internal state, perceived events, and reasoned actions.
- Distributed System Integration:
- Service Discovery: Agents register themselves with the go-micro registry.
- Load Balancing: Client-side load balancing for agent-to-service communication.
- Configuration: Dynamic configuration loading for agent parameters (e.g., AI model parameters, memory settings).
- Authentication/Authorization: Secure communication between agents and other services.
Agent
Interface (Conceptual):
type Agent interface {
// Init initializes the agent with provided options.
Init(...Option) error
// Options returns the current agent options.
Options() Options
// Run starts the agent's main loop, processing events and executing actions.
Run() error
// Stop gracefully stops the agent.
Stop() error
// HandleEvent processes an incoming event from the message bus.
HandleEvent(ctx context.Context, event micro.Event) error
// HandleRequest processes a direct RPC-like request to the agent.
HandleRequest(ctx context.Context, req Request, rsp interface{}) error
// SetBrain sets the generative AI model for the agent's reasoning.
SetBrain(brain genai.Model)
// GetBrain returns the generative AI model used by the agent.
GetBrain() genai.Model
// SetMemory sets the memory store for the agent.
SetMemory(memory datastore.Store)
// GetMemory returns the memory store used by the agent.
GetMemory() datastore.Store
// Plan generates a plan of actions based on a goal and context.
Plan(ctx context.Context, goal string, contextData map[string]interface{}) ([]Action, error)
// Execute performs a specific action.
Execute(ctx context.Context, action Action) error
}
// Action represents a discrete action an agent can take.
type Action struct {
Name string `json:"name"`
Payload map[string]interface{} `json:"payload"`
}
Example Usage: A Customer Support Agent
Let's imagine a "Customer Support Agent" that listens to incoming support tickets (events), uses a GenAI model to understand the issue, looks up knowledge base articles (via another service), and then drafts a response or escalates the ticket.
1. Define the Agent's Core Logic (support_agent.go
)
package main
import (
"context"
"fmt"
"log"
"time"
"go-micro.dev/v5"
"go-micro.dev/v5/broker"
"go-micro.dev/v5/client"
"go-micro.dev/v5/datastore"
"go-micro.dev/v5/genai" // Assuming the genai package is available
"go-micro.dev/v5/logger"
// Import necessary genai model and options if not directly exposed by go-micro/genai
// For demonstration, we'll use a placeholder for genai.Model
"github.com/google/generative-ai-go/genai/integration/gemini" // Placeholder, actual import might vary
)
// SupportTicket represents an incoming support ticket.
type SupportTicket struct {
ID string `json:"id"`
Customer string `json:"customer"`
Subject string `json:"subject"`
Issue string `json:"issue"`
Timestamp int64 `json:"timestamp"`
}
// SupportResponse represents the agent's drafted response or action.
type SupportResponse struct {
TicketID string `json:"ticket_id"`
AgentID string `json:"agent_id"`
ResponseType string `json:"response_type"` // e.g., "draft", "escalate", "resolve"
Content string `json:"content"`
SuggestedKBID string `json:"suggested_kb_id,omitempty"`
EscalatedTo string `json:"escalated_to,omitempty"`
}
// KnowledgeBaseService is an interface for interacting with a knowledge base service.
type KnowledgeBaseService interface {
Search(ctx context.Context, query string) (*KBArticle, error)
}
// KBArticle is a placeholder for a knowledge base article.
type KBArticle struct {
ID string `json:"id"`
Title string `json:"title"`
Content string `json:"content"`
}
// supportAgent implements the micro.Agent interface.
type supportAgent struct {
agent micro.Agent
brain genai.Model
kbSvc KnowledgeBaseService
// Other internal state like datastore, etc.
memory datastore.Store
}
func (sa *supportAgent) Init(opts ...micro.Option) error {
// Initialize agent, brain, and other dependencies
// The micro.NewAgent would internally call this.
return nil
}
func (sa *supportAgent) Options() micro.Options {
return sa.agent.Options()
}
func (sa *supportAgent) Run() error {
logger.Infof("Support Agent [%s] running...", sa.agent.Options().Name)
// The agent's main loop would be managed by micro.Agent.Run()
// We just need to define how it handles events.
return sa.agent.Run()
}
func (sa *supportAgent) Stop() error {
logger.Infof("Support Agent [%s] stopping...", sa.agent.Options().Name)
return sa.agent.Stop()
}
func (sa *supportAgent) HandleEvent(ctx context.Context, event micro.Event) error {
var ticket SupportTicket
if err := event.Decode(&ticket); err != nil {
logger.Errorf("Failed to decode support ticket event: %v", err)
return err
}
logger.Infof("Agent %s received ticket %s for customer %s: %s",
sa.agent.Options().Name, ticket.ID, ticket.Customer, ticket.Subject)
// 1. Perception & Reasoning (using genai)
prompt := fmt.Sprintf("Analyze the following customer support ticket and suggest a response or action. If a knowledge base article is relevant, suggest its ID. If escalation is needed, state the team to escalate to.\n\nTicket Subject: %s\nTicket Issue: %s\n\nSuggested Action:", ticket.Subject, ticket.Issue)
// In a real scenario, you'd use genai's chat session or function calling
// Here, we're simulating a simple text generation response
genaiResponse, err := sa.brain.GenerateContent(ctx, genai.Text(prompt))
if err != nil {
logger.Errorf("Failed to generate content with AI: %v", err)
return err
}
// Assuming the genaiResponse contains the structured action or text
// For simplicity, let's just take the first candidate's text
responseContent := "Unable to process with AI."
if len(genaiResponse.Candidates) > 0 && len(genaiResponse.Candidates[0].Parts) > 0 {
if text, ok := genaiResponse.Candidates[0].Parts[0].(genai.Text); ok {
responseContent = string(text)
}
}
// 2. Action (simulated based on AI response)
supportResp := SupportResponse{
TicketID: ticket.ID,
AgentID: sa.agent.Options().Name,
ResponseType: "draft", // Default
Content: responseContent,
}
// Simple keyword-based action detection for demonstration
if sa.kbSvc != nil {
// Try to find a relevant KB article
kbArticle, err := sa.kbSvc.Search(ctx, ticket.Issue)
if err != nil {
logger.Errorf("Failed to search knowledge base: %v", err)
} else if kbArticle != nil {
supportResp.SuggestedKBID = kbArticle.ID
supportResp.Content = fmt.Sprintf("Here's a draft response based on your issue and relevant knowledge base article:\n\n%s\n\nKnowledge Base Article: %s - %s", responseContent, kbArticle.Title, kbArticle.Content)
}
}
if sa.brain != nil && (sa.brain.GetInfo().Name == "gemini-pro" || sa.brain.GetInfo().Name == "gemini-1.5-pro-latest") { // Example check if it's a Gemini model
// More sophisticated parsing of AI response for structured actions
// This would ideally use function calling from genai
if containsIgnoreCase(responseContent, "escalate to technical team") {
supportResp.ResponseType = "escalate"
supportResp.EscalatedTo = "Technical Support"
} else if containsIgnoreCase(responseContent, "resolve automatically") {
supportResp.ResponseType = "resolve"
}
}
logger.Infof("Agent %s processed ticket %s. Suggested action: %s", sa.agent.Options().Name, ticket.ID, supportResp.ResponseType)
// Publish the response to another topic for a "response handler" service
return sa.agent.Options().Broker.Publish(ctx, "support.responses", &supportResp)
}
// HandleRequest is not used in this example but would be part of the Agent interface.
func (sa *supportAgent) HandleRequest(ctx context.Context, req client.Request, rsp interface{}) error {
logger.Infof("Support Agent [%s] received direct request for service %s, method %s", sa.agent.Options().Name, req.Service(), req.Method())
// Implement logic for direct queries to the agent, e.g., "What is your current status?"
return nil
}
func (sa *supportAgent) SetBrain(brain genai.Model) {
sa.brain = brain
}
func (sa *supportAgent) GetBrain() genai.Model {
return sa.brain
}
func (sa *supportAgent) SetMemory(memory datastore.Store) {
sa.memory = memory
}
func (sa *supportAgent) GetMemory() datastore.Store {
return sa.memory
}
// Plan and Execute would be more complex and depend on the agent's capabilities.
// For this example, we directly handle in HandleEvent.
func (sa *supportAgent) Plan(ctx context.Context, goal string, contextData map[string]interface{}) ([]micro.Action, error) {
// Not implemented for this simple example
return nil, nil
}
func (sa *supportAgent) Execute(ctx context.Context, action micro.Action) error {
// Not implemented for this simple example
return nil
}
func containsIgnoreCase(s, substr string) bool {
return len(s) >= len(substr) &&
fmt.Sprintf("%s", s)[0:len(substr)] == fmt.Sprintf("%s", substr)
}
// Placeholder for a simple KnowledgeBaseServiceClient
type knowledgeBaseServiceClient struct {
cl client.Client
}
func (kbc *knowledgeBaseServiceClient) Search(ctx context.Context, query string) (*KBArticle, error) {
// Simulate RPC call to a knowledge base service
fmt.Printf("Searching KB for: %s\n", query)
time.Sleep(100 * time.Millisecond) // Simulate network delay
if containsIgnoreCase(query, "billing") {
return &KBArticle{
ID: "KB001",
Title: "Understanding Billing Cycles",
Content: "This article explains our billing cycles and payment options.",
}, nil
}
return nil, nil
}
// NewSupportAgent creates a new supportAgent instance.
// This would be wrapped by micro.NewAgent internally.
func NewSupportAgent(a micro.Agent, brain genai.Model, kbSvc KnowledgeBaseService) micro.Agent {
sa := &supportAgent{
agent: a,
brain: brain,
kbSvc: kbSvc,
}
// The micro.NewAgent would likely expose options to set the brain and memory
// and then call an internal Init on the concrete agent implementation.
return a
}
2. Main Go File (main.go
)
package main
import (
"context"
"fmt"
"os"
"time"
"go-micro.dev/v5"
"go-micro.dev/v5/broker"
"go-micro.dev/v5/client"
"go-micro.dev/v5/logger"
"go-micro.dev/v5/server"
"github.com/google/generative-ai-go/genai"
genai_option "google.golang.org/api/option"
)
// main.go
func main() {
// Initialize Go Micro broker (e.g., in-memory for simple demo)
b := broker.NewBroker()
if err := b.Init(); err != nil {
logger.Fatalf("Broker Init error: %v", err)
}
if err := b.Connect(); err != nil {
logger.Fatalf("Broker Connect error: %v", err)
}
defer b.Disconnect()
// Initialize GenAI client
apiKey := os.Getenv("GEMINI_API_KEY")
if apiKey == "" {
logger.Fatal("GEMINI_API_KEY environment variable not set")
}
genaiCtx := context.Background()
genaiClient, err := genai.NewClient(genaiCtx, genai_option.WithAPIKey(apiKey))
if err != nil {
logger.Fatalf("Failed to create GenAI client: %v", err)
}
defer genaiClient.Close()
// Use a specific model (e.g., "gemini-pro")
model := genaiClient.GenerativeModel("gemini-pro")
if model == nil {
logger.Fatalf("Failed to get GenAI model 'gemini-pro'")
}
// Create a mock/placeholder for the KnowledgeBaseService client
kbServiceClient := &knowledgeBaseServiceClient{
cl: client.NewClient(), // Use default go-micro client
}
// Create a new agent instance using micro.NewAgent
// The micro.NewAgent would handle the core service registration and lifecycle.
// We'd then pass our custom agent implementation to it.
agentSvc := micro.NewAgent(
micro.Name("customer.support.agent"),
micro.Version("1.0.0"),
micro.Broker(b),
micro.WrapHandler(server.New
(func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
logger.Infof("Agent received RPC: %s.%s", req.Service(), req.Method())
return next(ctx, req, rsp)
}
})),
)
// Create our concrete supportAgent handler
supportAgentHandler := &supportAgent{
agent: agentSvc, // The underlying micro.Agent instance
brain: model,
kbSvc: kbServiceClient,
memory: nil, // Placeholder for memory
}
// The `micro.NewAgent` constructor would likely accept an `Agent` interface
// and internally manage its lifecycle. For this example, we'll manually set handlers.
// This part is illustrative of how micro.NewAgent would bind the custom agent logic.
// In a real go-micro agent framework, you might have `agentSvc.Handle(supportAgentHandler)`.
// For demonstration, we'll just use the broker to listen for events,
// and let our supportAgentHandler implement the event handler directly.
// `micro.NewAgent` would likely internally wrap this.
// For simplicity, let's assume `micro.NewAgent` provides a `Subscribe` method
// or `HandleEvent` like `micro.NewService.Handle`.
// For now, let's directly subscribe to the broker from the agent instance.
// This is where the magic of "NewAgent" would provide a higher-level abstraction.
// For this mock-up, we are directly calling Init and Run on the agent service.
// The ideal scenario for go-micro/agent would be:
// agent := micro.NewAgent(...)
// agent.Handle(myAgentImplementation)
// agent.Run()
// A more realistic scenario for `micro.NewAgent` integration:
// A new type of Handler that takes `Agent` as a receiver.
// Or, the `NewAgent` function implicitly sets up the event subscription.
// For the purpose of this example, we'll directly set the handler logic
// within the agent's options or a custom `NewAgent` helper.
// Imagine `NewAgent` taking an `Agent` interface implementation.
// `agentSvc` *is* our `micro.Agent` instance. We need to make it process events.
// The `HandleEvent` and `HandleRequest` methods would be registered internally
// by the `micro.NewAgent` framework.
// For this mock, we manually subscribe to simulate the event handling.
_, err = b.Subscribe("customer.tickets", supportAgentHandler.HandleEvent)
if err != nil {
logger.Fatalf("Failed to subscribe to customer.tickets: %v", err)
}
// Initialise and run the agent
if err := agentSvc.Init(); err != nil {
logger.Fatalf("Agent initialization failed: %v", err)
}
// Manually set the brain and kbService onto the underlying supportAgentHandler
// This would be handled by `micro.NewAgent` options in a real implementation.
supportAgentHandler.SetBrain(model)
// No explicit SetMemory for now, but it would be here.
// Run the agent (which internally starts the service, broker, etc.)
if err := agentSvc.Run(); err != nil {
logger.Fatalf("Agent failed to run: %v", err)
}
}
// Helper to simulate publishing a ticket for testing
func publishTicket(b broker.Broker, ticket SupportTicket) {
ctx := context.Background()
if err := b.Publish(ctx, "customer.tickets", &ticket); err != nil {
logger.Errorf("Failed to publish ticket: %v", err)
} else {
logger.Infof("Published ticket: %s", ticket.ID)
}
}
// You could add another `main` function in a separate file or a test to
// simulate publishing tickets. For a single `main.go`, we'd add it here.
// To run this demonstration, you would typically:
// 1. Set GEMINI_API_KEY environment variable.
// 2. Run `go run main.go support_agent.go`.
// 3. In another terminal, simulate publishing a message (requires a go-micro client or similar).
// For a simplified, self-contained example, let's add a goroutine to publish a test ticket.
func init() {
go func() {
// Wait for the agent to start and subscribe
time.Sleep(5 * time.Second)
testTicket := SupportTicket{
ID: "TICKET-001",
Customer: "Alice Wonderland",
Subject: "Problem with my billing statement",
Issue: "My last bill seems incorrect. I was charged for an extra month.",
Timestamp: time.Now().Unix(),
}
publishTicket(broker.DefaultBroker, testTicket)
testTicket2 := SupportTicket{
ID: "TICKET-002",
Customer: "Bob The Builder",
Subject: "Technical issue with API integration",
Issue: "Our system is failing to connect to your payment gateway API. Getting 500 errors.",
Timestamp: time.Now().Unix(),
}
publishTicket(broker.DefaultBroker, testTicket2)
testTicket3 := SupportTicket{
ID: "TICKET-003",
Customer: "Charlie Chaplin",
Subject: "General inquiry about service features",
Issue: "Can you tell me more about the new streaming features?",
Timestamp: time.Now().Unix(),
}
publishTicket(broker.DefaultBroker, testTicket3)
}()
}
This outline and example demonstrate how micro.NewAgent
and the Agent
interface could provide a powerful abstraction for building intelligent, autonomous components within a go-micro distributed system, leveraging the new genai
package for their reasoning capabilities. The key is to shift from purely request-response services to more proactive, event-driven, and AI-powered agents.