+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions crates/fluent-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,13 @@ pub mod credentials {

/// Parse a line from amber print output
fn parse_amber_line(line: &str) -> Option<(String, String)> {
if line.contains('=') {
let parts: Vec<&str> = line.splitn(2, '=').collect();
if parts.len() == 2 {
let key = parts[0].trim().to_string();
let value = parts[1].trim().trim_matches('"').to_string();
return Some((key, value));
}
if let Some((key, value)) = fluent_core::config::parse_key_value_pair(line) {
let key = key.trim().to_string();
let value = value.trim().trim_matches('"').to_string();
Some((key, value))
} else {
None
}
None
}

/// Validate that required credentials are available
Expand Down
6 changes: 4 additions & 2 deletions crates/fluent-agent/src/mcp_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ mod tests {
async fn test_mcp_adapter_creation() {
let tool_registry = Arc::new(ToolRegistry::new());
let memory_system =
Arc::new(SqliteMemoryStore::new(":memory:").unwrap()) as Arc<dyn LongTermMemory>;
Arc::new(SqliteMemoryStore::new(":memory:")
.expect("Failed to create in-memory SQLite store for test")) as Arc<dyn LongTermMemory>;

let adapter = FluentMcpAdapter::new(tool_registry, memory_system);
let info = adapter.get_info();
Expand All @@ -522,7 +523,8 @@ mod tests {
async fn test_tool_conversion() {
let tool_registry = Arc::new(ToolRegistry::new());
let memory_system =
Arc::new(SqliteMemoryStore::new(":memory:").unwrap()) as Arc<dyn LongTermMemory>;
Arc::new(SqliteMemoryStore::new(":memory:")
.expect("Failed to create in-memory SQLite store for test")) as Arc<dyn LongTermMemory>;

let adapter = FluentMcpAdapter::new(tool_registry, memory_system);
let tool = adapter.convert_tool_to_mcp("test_tool", "Test tool description");
Expand Down
5 changes: 3 additions & 2 deletions crates/fluent-agent/src/performance/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,9 @@ fn get_current_process_memory() -> Result<u64, anyhow::Error> {
}

#[cfg(target_os = "linux")]
fn get_process_memory_linux() -> Result<u64, anyhow::Error> {
let status = std::fs::read_to_string("/proc/self/status")
async fn get_process_memory_linux() -> Result<u64, anyhow::Error> {
let status = tokio::fs::read_to_string("/proc/self/status")
.await
.map_err(|e| anyhow::anyhow!("Failed to read /proc/self/status: {}", e))?;

for line in status.lines() {
Expand Down
38 changes: 27 additions & 11 deletions crates/fluent-agent/src/profiling/memory_profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,34 @@ impl ReflectionMemoryProfiler {
report
}

/// Save the report to a file
pub fn save_report(&self, filename: &str) -> Result<()> {
/// Save the report to a file asynchronously
pub async fn save_report(&self, filename: &str) -> Result<()> {
let report = self.generate_report();
std::fs::write(filename, report)?;
tokio::fs::write(filename, report).await?;
Ok(())
}

/// Get current memory usage (cross-platform implementation)
fn get_current_memory_usage() -> usize {
get_process_memory_usage().unwrap_or_else(|_| {
// Fallback: return a reasonable estimate
std::mem::size_of::<Self>() * 1000
})
// Use a blocking approach for constructor compatibility
// In a real implementation, you might want to use a different approach
match std::thread::spawn(|| {
tokio::runtime::Handle::try_current()
.map(|handle| {
handle.block_on(get_process_memory_usage())
})
.unwrap_or_else(|_| {
// If no tokio runtime, create a minimal one
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(get_process_memory_usage())
})
}).join() {
Ok(Ok(memory)) => memory,
_ => {
// Fallback: return a reasonable estimate
std::mem::size_of::<Self>() * 1000
}
}
}
}

Expand All @@ -218,10 +233,10 @@ impl Default for ReflectionMemoryProfiler {
}

/// Get current process memory usage in bytes (cross-platform)
fn get_process_memory_usage() -> Result<usize> {
async fn get_process_memory_usage() -> Result<usize> {
#[cfg(target_os = "linux")]
{
get_process_memory_usage_linux()
get_process_memory_usage_linux().await
}
#[cfg(target_os = "macos")]
{
Expand All @@ -239,8 +254,9 @@ fn get_process_memory_usage() -> Result<usize> {
}

#[cfg(target_os = "linux")]
fn get_process_memory_usage_linux() -> Result<usize> {
let status = std::fs::read_to_string("/proc/self/status")
async fn get_process_memory_usage_linux() -> Result<usize> {
let status = tokio::fs::read_to_string("/proc/self/status")
.await
.map_err(|e| anyhow!("Failed to read /proc/self/status: {}", e))?;

for line in status.lines() {
Expand Down
6 changes: 4 additions & 2 deletions crates/fluent-agent/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,10 @@ mod tests {
retry_config: RetryConfig::default(),
};

let serialized = serde_json::to_string(&config).unwrap();
let deserialized: TransportConfig = serde_json::from_str(&serialized).unwrap();
let serialized = serde_json::to_string(&config)
.expect("Failed to serialize TransportConfig for test");
let deserialized: TransportConfig = serde_json::from_str(&serialized)
.expect("Failed to deserialize TransportConfig for test");

assert!(matches!(deserialized.transport_type, TransportType::Http));
}
Expand Down
10 changes: 2 additions & 8 deletions crates/fluent-cli/src/cli_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,5 @@ pub fn build_cli() -> Command {
)
}

/// Parse key-value pairs from command line arguments
pub fn parse_key_value_pair(s: &str) -> Option<(String, String)> {
if let Some((key, value)) = s.split_once('=') {
Some((key.to_string(), value.to_string()))
} else {
None
}
}
// Re-export the centralized parse_key_value_pair function
pub use fluent_core::config::parse_key_value_pair;
6 changes: 6 additions & 0 deletions crates/fluent-cli/src/commands/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl EngineCommand {
}

/// Validate request payload
#[allow(dead_code)]
fn validate_request_payload(payload: &str, _context: &str) -> Result<String> {
if payload.trim().is_empty() {
return Err(anyhow!("Request payload cannot be empty"));
Expand All @@ -34,6 +35,7 @@ impl EngineCommand {
}

/// Process request with file upload
#[allow(dead_code)]
async fn process_request_with_file(
engine: &dyn Engine,
request_content: &str,
Expand All @@ -49,6 +51,7 @@ impl EngineCommand {
}

/// Process simple request
#[allow(dead_code)]
async fn process_request(engine: &dyn Engine, request_content: &str) -> Result<Response> {
let request = Request {
flowname: "default".to_string(),
Expand All @@ -59,6 +62,7 @@ impl EngineCommand {
}

/// Format response for output
#[allow(dead_code)]
fn format_response(response: &Response, parse_code: bool, markdown: bool) -> String {
let mut output = response.content.clone();

Expand All @@ -76,6 +80,7 @@ impl EngineCommand {
}

/// Extract code blocks from response
#[allow(dead_code)]
fn extract_code_blocks(content: &str) -> String {
// Simplified code block extraction
let mut result = String::new();
Expand Down Expand Up @@ -108,6 +113,7 @@ impl EngineCommand {
}

/// Execute engine request with all options
#[allow(dead_code)]
async fn execute_engine_request(
engine_name: &str,
request: &str,
Expand Down
3 changes: 2 additions & 1 deletion crates/fluent-cli/src/commands/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ impl PipelineCommand {
json_output: bool,
) -> Result<CommandResult> {
// Read and validate pipeline file
let yaml_str = std::fs::read_to_string(pipeline_file)
let yaml_str = tokio::fs::read_to_string(pipeline_file)
.await
.map_err(|e| anyhow!("Failed to read pipeline file '{}': {}", pipeline_file, e))?;

Self::validate_pipeline_yaml(&yaml_str)
Expand Down
3 changes: 2 additions & 1 deletion crates/fluent-cli/src/commands/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ impl ToolsCommand {
serde_json::from_str::<HashMap<String, Value>>(json_str)
.map_err(|e| anyhow!("Invalid JSON parameters: {}", e))?
} else if let Some(file_path) = params_file {
let file_content = std::fs::read_to_string(file_path)
let file_content = tokio::fs::read_to_string(file_path)
.await
.map_err(|e| anyhow!("Failed to read params file: {}", e))?;
serde_json::from_str::<HashMap<String, Value>>(&file_content)
.map_err(|e| anyhow!("Invalid JSON in params file: {}", e))?
Expand Down
5 changes: 4 additions & 1 deletion crates/fluent-cli/src/engine_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ pub fn create_test_engine_config(engine_type: &str) -> EngineConfig {
parameters.insert("api_key".to_string(), serde_json::Value::String("test-key".to_string()));
parameters.insert("model".to_string(), serde_json::Value::String("test-model".to_string()));
parameters.insert("max_tokens".to_string(), serde_json::Value::Number(serde_json::Number::from(1000)));
parameters.insert("temperature".to_string(), serde_json::Value::Number(serde_json::Number::from_f64(0.7).unwrap()));
parameters.insert("temperature".to_string(), serde_json::Value::Number(
serde_json::Number::from_f64(0.7)
.ok_or_else(|| anyhow!("Failed to create temperature number from f64"))?
));

EngineConfig {
name: format!("test-{}", engine_type),
Expand Down
4 changes: 4 additions & 0 deletions crates/fluent-cli/src/mcp_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use fluent_core::config::Config;
/// Run MCP server
pub async fn run_mcp_server(_sub_matches: &ArgMatches) -> Result<()> {
use fluent_agent::mcp_adapter::FluentMcpServer;
#[allow(deprecated)]
use fluent_agent::memory::SqliteMemoryStore;
use fluent_agent::tools::ToolRegistry;
use std::sync::Arc;
Expand All @@ -20,6 +21,7 @@ pub async fn run_mcp_server(_sub_matches: &ArgMatches) -> Result<()> {
let tool_registry = Arc::new(ToolRegistry::new());

// Initialize memory system
#[allow(deprecated)]
let memory_system = Arc::new(SqliteMemoryStore::new(":memory:")?);

// Create MCP server
Expand Down Expand Up @@ -70,6 +72,7 @@ pub async fn run_agent_with_mcp(
config: &Config,
) -> Result<()> {
use fluent_agent::agent_with_mcp::AgentWithMcp;
#[allow(deprecated)]
use fluent_agent::memory::SqliteMemoryStore;
use fluent_agent::reasoning::LLMReasoningEngine;

Expand All @@ -88,6 +91,7 @@ pub async fn run_agent_with_mcp(

// Create memory system
let memory_path = format!("agent_memory_{}.db", engine_name);
#[allow(deprecated)]
let memory = std::sync::Arc::new(SqliteMemoryStore::new(&memory_path)?);

// Create agent
Expand Down
4 changes: 2 additions & 2 deletions crates/fluent-cli/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,12 @@ impl ResourceGuard {
}

/// Create a temporary file and add it to cleanup list
pub fn create_temp_file(&mut self, prefix: &str) -> Result<std::fs::File> {
pub async fn create_temp_file(&mut self, prefix: &str) -> Result<tokio::fs::File> {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)
.unwrap_or_default().as_nanos();
let temp_path = format!("/tmp/{}_{}", prefix, timestamp);
let file = std::fs::File::create(&temp_path)?;
let file = tokio::fs::File::create(&temp_path).await?;
self.add_temp_file(&temp_path);
Ok(file)
}
Expand Down
7 changes: 5 additions & 2 deletions crates/fluent-cli/src/pipeline_builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::Result;
use dialoguer::{Confirm, Input, Select};
use fluent_core::centralized_config::ConfigManager;
use fluent_engines::pipeline_executor::{FileStateStore, Pipeline, PipelineExecutor, PipelineStep};
use std::io::stdout;
use std::path::PathBuf;
use termimad::crossterm::{
execute,
terminal::{Clear, ClearType},
Expand Down Expand Up @@ -87,7 +87,10 @@ pub async fn build_interactively() -> Result<()> {

if Confirm::new().with_prompt("Run pipeline now?").interact()? {
let input: String = Input::new().with_prompt("Pipeline input").interact_text()?;
let state_store_dir = PathBuf::from("./pipeline_states");

// Use centralized configuration for pipeline state directory
let config = ConfigManager::get();
let state_store_dir = config.get_pipeline_state_dir();
tokio::fs::create_dir_all(&state_store_dir).await?;
let state_store = FileStateStore {
directory: state_store_dir,
Expand Down
6 changes: 3 additions & 3 deletions crates/fluent-cli/src/request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub async fn read_file_content(file_path: &str) -> Result<String> {
}

/// Validate file size and type for upload
pub fn validate_file_for_upload(file_path: &str) -> Result<()> {
pub async fn validate_file_for_upload(file_path: &str) -> Result<()> {
let path = Path::new(file_path);

if !path.exists() {
return Err(anyhow::anyhow!("File does not exist: {}", file_path));
}

let metadata = std::fs::metadata(path)?;
let metadata = tokio::fs::metadata(path).await?;
let file_size = metadata.len();

// Check file size (limit to 10MB)
Expand Down
18 changes: 9 additions & 9 deletions crates/fluent-cli/src/response_formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ pub fn print_success(message: &str, no_color: bool) {
}
}

/// Write response to a file
pub fn write_response_to_file(
/// Write response to a file asynchronously
pub async fn write_response_to_file(
response: &Response,
file_path: &str,
format: &str,
Expand Down Expand Up @@ -247,7 +247,7 @@ pub fn write_response_to_file(
_ => response.content.clone(),
};

std::fs::write(file_path, content)
tokio::fs::write(file_path, content).await
}

#[cfg(test)]
Expand Down Expand Up @@ -284,15 +284,15 @@ mod tests {
assert!(options.show_cost);
}

#[test]
fn test_write_response_to_file() {
#[tokio::test]
async fn test_write_response_to_file() {
let response = create_test_response();
let temp_file = "/tmp/test_response.txt";
let result = write_response_to_file(&response, temp_file, "text");

let result = write_response_to_file(&response, temp_file, "text").await;
assert!(result.is_ok());

// Clean up
let _ = std::fs::remove_file(temp_file);
let _ = tokio::fs::remove_file(temp_file).await;
}
}
10 changes: 2 additions & 8 deletions crates/fluent-cli/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,8 @@ pub fn validate_engine_name(engine_name: &str) -> FluentResult<String> {
Ok(engine_name.to_string())
}

/// Parse key-value pairs from command line arguments
pub fn parse_key_value_pair(s: &str) -> Option<(String, String)> {
if let Some((key, value)) = s.split_once('=') {
Some((key.to_string(), value.to_string()))
} else {
None
}
}
// Re-export the centralized parse_key_value_pair function
pub use fluent_core::config::parse_key_value_pair;

#[cfg(test)]
mod tests {
Expand Down
1 change: 1 addition & 0 deletions crates/fluent-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ sled = { workspace = true }
sha2 = { workspace = true }
which = "6.0"
serde_yaml.workspace = true
toml = "0.8"
Loading
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载