diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index ba7d10135d950..80583d8eb8821 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -4,11 +4,7 @@ "dockerfile": "Dockerfile", "context": "." }, - "runArgs": [ - "--cap-add=SYS_PTRACE", - "--security-opt", - "seccomp=unconfined" - ], + "runArgs": ["--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined"], "features": { "ghcr.io/devcontainers/features/github-cli:1": {}, "ghcr.io/devcontainers/features/git:1": {} @@ -26,6 +22,7 @@ ], "settings": { "rust-analyzer.cargo.buildScripts.enable": true, + "rust-analyzer.cargo.features": ["otel"], "rust-analyzer.checkOnSave.command": "clippy", "rust-analyzer.rustfmt.rangeFormatting.enable": true, "editor.defaultFormatter": "esbenp.prettier-vscode", @@ -48,4 +45,4 @@ "source=${localWorkspaceFolder}/.devcontainer/cache,target=/home/vscode/.cache,type=bind,consistency=cached", "source=${localWorkspaceFolder}/.devcontainer/cargo,target=/usr/local/cargo,type=bind,consistency=cached" ] -} \ No newline at end of file +} diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 34c9137adcb01..fa55264793915 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,7 +1,7 @@ Thank you for your interest in contributing to Turborepo! - [General dependencies](#general-dependencies) - - [Optional dependencies](#optional-dependencies) +- [Optional dependencies](#optional-dependencies) - [Structure of the repository](#structure-of-the-repository) - [Building Turborepo](#building-turborepo) - [TLS Implementation](#tls-implementation) @@ -69,6 +69,16 @@ and `rustls-tls` features. By default, the `rustls-tls` feature is selected so that `cargo build` works out of the box. If you wish to select `native-tls`, you may do so by running `cargo build --no-default-features --features native-tls`. +### OpenTelemetry Observability + +Turborepo includes optional OpenTelemetry (OTel) support for exporting metrics. OTel is an experimental feature and is **not** included in default builds. If you wish to build `turbo` with OTel support, you can enable it by running: + +```bash +cargo build -p turbo --features otel +``` + +Note that when OTel is disabled at compile time (the default), the `experimentalObservability` configuration in `turbo.json` and related CLI flags will be ignored (they will not error, but no metrics will be exported). IDEs using the devcontainer configuration will have OTel enabled automatically for development purposes. + ## Running tests > [!IMPORTANT] diff --git a/Cargo.lock b/Cargo.lock index 4cc226fe90033..3c458fa2a39d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -533,7 +533,7 @@ dependencies = [ "serde_path_to_error", "serde_urlencoded", "sha1", - "sync_wrapper 1.0.1", + "sync_wrapper 1.0.2", "tokio", "tokio-tungstenite 0.21.0", "tower 0.4.13", @@ -570,7 +570,7 @@ dependencies = [ "serde_path_to_error", "serde_urlencoded", "sha1", - "sync_wrapper 1.0.1", + "sync_wrapper 1.0.2", "tokio", "tokio-tungstenite 0.26.2", "tower 0.5.2", @@ -631,7 +631,7 @@ dependencies = [ "mime", "pin-project-lite", "rustversion", - "sync_wrapper 1.0.1", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", "tracing", @@ -2676,6 +2676,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -2694,9 +2707,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.7" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -2707,7 +2720,6 @@ dependencies = [ "pin-project-lite", "socket2 0.5.6", "tokio", - "tower 0.4.13", "tower-service", "tracing", ] @@ -3860,6 +3872,88 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.12", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http 1.1.0", + "opentelemetry", + "reqwest", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http 1.1.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost 0.14.1", + "reqwest", + "thiserror 2.0.12", + "tokio", + "tonic 0.14.2", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost 0.14.1", + "tonic 0.14.2", + "tonic-prost", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.1", + "thiserror 2.0.12", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -4389,6 +4483,16 @@ dependencies = [ "prost-derive 0.12.3", ] +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive 0.14.1", +] + [[package]] name = "prost-build" version = "0.11.8" @@ -4437,6 +4541,19 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "prost-types" version = "0.11.8" @@ -4771,7 +4888,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper 1.0.1", + "sync_wrapper 1.0.2", "tokio", "tokio-native-tls", "tokio-rustls", @@ -5419,6 +5536,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "spin" version = "0.9.8" @@ -5786,9 +5913,9 @@ checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "sync_wrapper" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" dependencies = [ "futures-core", ] @@ -6164,9 +6291,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ "futures-core", "pin-project-lite", @@ -6271,7 +6398,7 @@ dependencies = [ "http 0.2.11", "http-body 0.4.5", "hyper 0.14.28", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", "prost 0.12.3", @@ -6283,6 +6410,35 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "axum 0.8.4", + "base64 0.22.1", + "bytes", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.2", + "hyper-util", + "percent-encoding", + "pin-project", + "socket2 0.6.1", + "sync_wrapper 1.0.2", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.8.4" @@ -6296,6 +6452,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost 0.14.1", + "tonic 0.14.2", +] + [[package]] name = "tower" version = "0.4.13" @@ -6324,9 +6491,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.2.6", "pin-project-lite", - "sync_wrapper 1.0.1", + "slab", + "sync_wrapper 1.0.2", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -6959,7 +7129,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic", + "tonic 0.11.0", "tonic-build", "tower 0.4.13", "tower-http", @@ -6987,6 +7157,7 @@ dependencies = [ "turborepo-lockfiles", "turborepo-microfrontends", "turborepo-microfrontends-proxy", + "turborepo-otel", "turborepo-process", "turborepo-repository", "turborepo-scm", @@ -7103,6 +7274,22 @@ dependencies = [ "turborepo-scm", ] +[[package]] +name = "turborepo-otel" +version = "0.1.0" +dependencies = [ + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "serde", + "serde_json", + "thiserror 1.0.63", + "tokio", + "tonic 0.14.2", + "tracing", +] + [[package]] name = "turborepo-process" version = "0.1.0" @@ -7825,6 +8012,12 @@ dependencies = [ "windows-targets 0.48.1", ] +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-registry" version = "0.2.0" @@ -7906,6 +8099,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -7945,13 +8147,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -7970,6 +8189,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -7988,6 +8213,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -8006,12 +8237,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -8030,6 +8273,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -8048,6 +8297,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -8066,6 +8321,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -8084,6 +8345,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.5.40" diff --git a/Cargo.toml b/Cargo.toml index d11214162168b..26285b6d9c62a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ turborepo-ui = { path = "crates/turborepo-ui" } turborepo-unescape = { path = "crates/turborepo-unescape" } turborepo-scm = { path = "crates/turborepo-scm" } turborepo-signals = { path = "crates/turborepo-signals" } +turborepo-otel = { path = "crates/turborepo-otel" } wax = { path = "crates/turborepo-wax" } turborepo-vercel-api = { path = "crates/turborepo-vercel-api" } turborepo-vercel-api-mock = { path = "crates/turborepo-vercel-api-mock" } diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 41dd0af80da88..12c8c2a366cb3 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -13,6 +13,7 @@ rustls-tls = ["turborepo-api-client/rustls-tls", "turbo-updater/rustls-tls"] daemon-package-discovery = [] daemon-file-hashing = [] +otel = ["dep:turborepo-otel"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dev-dependencies] @@ -138,6 +139,7 @@ turborepo-graph-utils = { path = "../turborepo-graph-utils" } turborepo-lockfiles = { workspace = true } turborepo-microfrontends = { workspace = true } turborepo-microfrontends-proxy = { workspace = true } +turborepo-otel = { workspace = true, optional = true } turborepo-process = { workspace = true } turborepo-repository = { path = "../turborepo-repository" } turborepo-scm = { workspace = true, features = ["git2"] } diff --git a/crates/turborepo-lib/src/cli/mod.rs b/crates/turborepo-lib/src/cli/mod.rs index ff2e963e7cb86..6159ca2340c32 100644 --- a/crates/turborepo-lib/src/cli/mod.rs +++ b/crates/turborepo-lib/src/cli/mod.rs @@ -39,6 +39,8 @@ use crate::{ }; mod error; +mod observability; + // Global turbo sets this environment variable to its cwd so that local // turbo can use it for package inference. pub const INVOCATION_DIR_ENV_VAR: &str = "TURBO_INVOCATION_DIR"; @@ -228,6 +230,8 @@ pub struct Args { /// verbosity #[clap(flatten)] pub verbosity: Verbosity, + #[clap(flatten)] + pub experimental_otel_args: observability::ExperimentalOtelCliArgs, /// Force a check for a new version of turbo #[clap(long, global = true, hide = true)] pub check_for_update: bool, @@ -287,6 +291,17 @@ impl From for u8 { } } +fn parse_key_val_pair(s: &str) -> Result<(String, String), String> { + let (key, value) = s + .split_once('=') + .ok_or_else(|| "must be in key=value format".to_string())?; + let key = key.trim(); + if key.is_empty() { + return Err("key cannot be empty".to_string()); + } + Ok((key.to_string(), value.trim().to_string())) +} + #[derive(Subcommand, Copy, Clone, Debug, PartialEq)] pub enum DaemonCommand { /// Restarts the turbo daemon @@ -3369,4 +3384,52 @@ mod test { assert_snapshot!(args.join("-").as_str(), err); } } + + #[test] + fn test_parse_key_val_pair_valid() { + let result = super::parse_key_val_pair("key=value"); + assert_eq!(result.unwrap(), ("key".to_string(), "value".to_string())); + } + + #[test] + fn test_parse_key_val_pair_with_whitespace() { + let result = super::parse_key_val_pair(" key = value "); + assert_eq!(result.unwrap(), ("key".to_string(), "value".to_string())); + } + + #[test] + fn test_parse_key_val_pair_multiple_equals() { + let result = super::parse_key_val_pair("key=value=more"); + assert_eq!( + result.unwrap(), + ("key".to_string(), "value=more".to_string()) + ); + } + + #[test] + fn test_parse_key_val_pair_empty_value() { + let result = super::parse_key_val_pair("key="); + assert_eq!(result.unwrap(), ("key".to_string(), "".to_string())); + } + + #[test] + fn test_parse_key_val_pair_no_equals() { + let result = super::parse_key_val_pair("keyvalue"); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "must be in key=value format"); + } + + #[test] + fn test_parse_key_val_pair_empty_key() { + let result = super::parse_key_val_pair("=value"); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "key cannot be empty"); + } + + #[test] + fn test_parse_key_val_pair_whitespace_only_key() { + let result = super::parse_key_val_pair(" =value"); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "key cannot be empty"); + } } diff --git a/crates/turborepo-lib/src/cli/observability.rs b/crates/turborepo-lib/src/cli/observability.rs new file mode 100644 index 0000000000000..9270281d226a5 --- /dev/null +++ b/crates/turborepo-lib/src/cli/observability.rs @@ -0,0 +1,378 @@ +use std::collections::BTreeMap; + +use clap::Parser; + +use crate::{ + cli::parse_key_val_pair, + config::{ExperimentalOtelMetricsOptions, ExperimentalOtelOptions, ExperimentalOtelProtocol}, +}; + +#[derive(Parser, Clone, Debug, Default, PartialEq)] +pub struct ExperimentalOtelCliArgs { + #[clap( + long = "experimental-otel-enabled", + global = true, + num_args = 0..=1, + default_missing_value = "true" + )] + pub enabled: Option, + + #[clap( + long = "experimental-otel-protocol", + value_enum, + global = true, + value_name = "PROTOCOL" + )] + pub protocol: Option, + + #[clap(long = "experimental-otel-endpoint", global = true, value_name = "URL")] + pub endpoint: Option, + + #[clap( + long = "experimental-otel-timeout-ms", + global = true, + value_name = "MILLISECONDS" + )] + pub timeout_ms: Option, + + #[clap( + long = "experimental-otel-header", + global = true, + value_parser = parse_key_val_pair, + value_name = "KEY=VALUE" + )] + pub headers: Vec<(String, String)>, + + #[clap( + long = "experimental-otel-resource", + global = true, + value_parser = parse_key_val_pair, + value_name = "KEY=VALUE" + )] + pub resource_attributes: Vec<(String, String)>, + + #[clap( + long = "experimental-otel-metrics-run-summary", + global = true, + num_args = 0..=1, + default_missing_value = "true" + )] + pub metrics_run_summary: Option, + + #[clap( + long = "experimental-otel-metrics-task-details", + global = true, + num_args = 0..=1, + default_missing_value = "true" + )] + pub metrics_task_details: Option, +} + +impl ExperimentalOtelCliArgs { + pub fn to_config(&self) -> Option { + let mut options = ExperimentalOtelOptions::default(); + let mut touched = false; + + if let Some(enabled) = self.enabled { + options.enabled = Some(enabled); + touched = true; + } + if let Some(protocol) = self.protocol { + options.protocol = Some(protocol); + touched = true; + } + if let Some(endpoint) = &self.endpoint { + options.endpoint = Some(endpoint.clone()); + touched = true; + } + if let Some(timeout) = self.timeout_ms { + options.timeout_ms = Some(timeout); + touched = true; + } + if !self.headers.is_empty() { + let mut map = BTreeMap::new(); + for (key, value) in &self.headers { + map.insert(key.clone(), value.clone()); + } + options.headers = Some(map); + touched = true; + } + if !self.resource_attributes.is_empty() { + let mut map = BTreeMap::new(); + for (key, value) in &self.resource_attributes { + map.insert(key.clone(), value.clone()); + } + options.resource = Some(map); + touched = true; + } + if let Some(value) = self.metrics_run_summary { + options + .metrics + .get_or_insert_with(ExperimentalOtelMetricsOptions::default) + .run_summary = Some(value); + touched = true; + } + if let Some(value) = self.metrics_task_details { + options + .metrics + .get_or_insert_with(ExperimentalOtelMetricsOptions::default) + .task_details = Some(value); + touched = true; + } + + touched.then_some(options) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_experimental_otel_cli_args_empty() { + let args = ExperimentalOtelCliArgs::default(); + let result = args.to_config(); + assert_eq!(result, None); + } + + #[test] + fn test_experimental_otel_cli_args_enabled() { + let args = ExperimentalOtelCliArgs { + enabled: Some(true), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + assert_eq!(result.unwrap().enabled, Some(true)); + } + + #[test] + fn test_experimental_otel_cli_args_disabled() { + let args = ExperimentalOtelCliArgs { + enabled: Some(false), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + assert_eq!(result.unwrap().enabled, Some(false)); + } + + #[test] + fn test_experimental_otel_cli_args_protocol() { + let args = ExperimentalOtelCliArgs { + protocol: Some(ExperimentalOtelProtocol::Grpc), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + assert_eq!( + result.unwrap().protocol, + Some(ExperimentalOtelProtocol::Grpc) + ); + } + + #[test] + fn test_experimental_otel_cli_args_protocol_http_protobuf() { + let args = ExperimentalOtelCliArgs { + protocol: Some(ExperimentalOtelProtocol::HttpProtobuf), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + assert_eq!( + result.unwrap().protocol, + Some(ExperimentalOtelProtocol::HttpProtobuf) + ); + } + + #[test] + fn test_experimental_otel_cli_args_endpoint() { + let args = ExperimentalOtelCliArgs { + endpoint: Some("https://example.com/otel".to_string()), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + assert_eq!( + result.unwrap().endpoint, + Some("https://example.com/otel".to_string()) + ); + } + + #[test] + fn test_experimental_otel_cli_args_timeout_ms() { + let args = ExperimentalOtelCliArgs { + timeout_ms: Some(5000), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + assert_eq!(result.unwrap().timeout_ms, Some(5000)); + } + + #[test] + fn test_experimental_otel_cli_args_headers_single() { + let args = ExperimentalOtelCliArgs { + headers: vec![("key1".to_string(), "value1".to_string())], + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let headers = result.unwrap().headers.unwrap(); + assert_eq!(headers.get("key1"), Some(&"value1".to_string())); + } + + #[test] + fn test_experimental_otel_cli_args_headers_multiple() { + let args = ExperimentalOtelCliArgs { + headers: vec![ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ], + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let headers = result.unwrap().headers.unwrap(); + assert_eq!(headers.get("key1"), Some(&"value1".to_string())); + assert_eq!(headers.get("key2"), Some(&"value2".to_string())); + } + + #[test] + fn test_experimental_otel_cli_args_headers_empty() { + let args = ExperimentalOtelCliArgs { + headers: vec![], + ..Default::default() + }; + let result = args.to_config(); + assert_eq!(result, None); + } + + #[test] + fn test_experimental_otel_cli_args_resource_single() { + let args = ExperimentalOtelCliArgs { + resource_attributes: vec![("service.name".to_string(), "my-service".to_string())], + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let resource = result.unwrap().resource.unwrap(); + assert_eq!( + resource.get("service.name"), + Some(&"my-service".to_string()) + ); + } + + #[test] + fn test_experimental_otel_cli_args_resource_multiple() { + let args = ExperimentalOtelCliArgs { + resource_attributes: vec![ + ("service.name".to_string(), "my-service".to_string()), + ("env".to_string(), "production".to_string()), + ], + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let resource = result.unwrap().resource.unwrap(); + assert_eq!( + resource.get("service.name"), + Some(&"my-service".to_string()) + ); + assert_eq!(resource.get("env"), Some(&"production".to_string())); + } + + #[test] + fn test_experimental_otel_cli_args_metrics_run_summary() { + let args = ExperimentalOtelCliArgs { + metrics_run_summary: Some(true), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.run_summary, Some(true)); + } + + #[test] + fn test_experimental_otel_cli_args_metrics_task_details() { + let args = ExperimentalOtelCliArgs { + metrics_task_details: Some(true), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.task_details, Some(true)); + } + + #[test] + fn test_experimental_otel_cli_args_metrics_both() { + let args = ExperimentalOtelCliArgs { + metrics_run_summary: Some(true), + metrics_task_details: Some(false), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.run_summary, Some(true)); + assert_eq!(metrics.task_details, Some(false)); + } + + #[test] + fn test_experimental_otel_cli_args_metrics_run_summary_disabled() { + let args = ExperimentalOtelCliArgs { + metrics_run_summary: Some(false), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.run_summary, Some(false)); + } + + #[test] + fn test_experimental_otel_cli_args_metrics_task_details_disabled() { + let args = ExperimentalOtelCliArgs { + metrics_task_details: Some(false), + ..Default::default() + }; + let result = args.to_config(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.task_details, Some(false)); + } + + #[test] + fn test_experimental_otel_cli_args_combined() { + let args = ExperimentalOtelCliArgs { + enabled: Some(true), + protocol: Some(ExperimentalOtelProtocol::Grpc), + endpoint: Some("https://example.com/otel".to_string()), + timeout_ms: Some(15000), + headers: vec![("auth".to_string(), "token123".to_string())], + resource_attributes: vec![("service.name".to_string(), "test".to_string())], + metrics_run_summary: Some(true), + metrics_task_details: Some(false), + }; + let result = args.to_config(); + assert!(result.is_some()); + let opts = result.unwrap(); + assert_eq!(opts.enabled, Some(true)); + assert_eq!(opts.protocol, Some(ExperimentalOtelProtocol::Grpc)); + assert_eq!(opts.endpoint, Some("https://example.com/otel".to_string())); + assert_eq!(opts.timeout_ms, Some(15000)); + assert_eq!( + opts.headers.unwrap().get("auth"), + Some(&"token123".to_string()) + ); + assert_eq!( + opts.resource.unwrap().get("service.name"), + Some(&"test".to_string()) + ); + let metrics = opts.metrics.unwrap(); + assert_eq!(metrics.run_summary, Some(true)); + assert_eq!(metrics.task_details, Some(false)); + } +} diff --git a/crates/turborepo-lib/src/commands/mod.rs b/crates/turborepo-lib/src/commands/mod.rs index 534bda2cb7ee0..c6ce383198876 100644 --- a/crates/turborepo-lib/src/commands/mod.rs +++ b/crates/turborepo-lib/src/commands/mod.rs @@ -10,7 +10,7 @@ use crate::{ cli, config::{ resolve_turbo_config_path, ConfigurationOptions, Error as ConfigError, - TurborepoConfigBuilder, + ExperimentalObservabilityOptions, TurborepoConfigBuilder, }, opts::Opts, Args, @@ -129,6 +129,11 @@ impl CommandBase { args.execution_args() .and_then(|args| args.concurrency.clone()), ) + .with_experimental_observability( + args.experimental_otel_args + .to_config() + .map(|otel| ExperimentalObservabilityOptions { otel: Some(otel) }), + ) .build() } diff --git a/crates/turborepo-lib/src/config/env.rs b/crates/turborepo-lib/src/config/env.rs index 26fb21395e2c1..40879230195aa 100644 --- a/crates/turborepo-lib/src/config/env.rs +++ b/crates/turborepo-lib/src/config/env.rs @@ -9,7 +9,10 @@ use tracing::warn; use turbopath::AbsoluteSystemPathBuf; use turborepo_cache::CacheConfig; -use super::{ConfigurationOptions, Error, ResolvedConfigurationOptions}; +use super::{ + ConfigurationOptions, Error, ExperimentalObservabilityOptions, ExperimentalOtelOptions, + ResolvedConfigurationOptions, +}; use crate::{ cli::{EnvMode, LogOrder}, turbo_json::UIMode, @@ -46,6 +49,38 @@ const TURBO_MAPPING: &[(&str, &str)] = [ ("turbo_concurrency", "concurrency"), ("turbo_no_update_notifier", "no_update_notifier"), ("turbo_sso_login_callback_port", "sso_login_callback_port"), + ( + "turbo_experimental_otel_enabled", + "experimental_otel_enabled", + ), + ( + "turbo_experimental_otel_protocol", + "experimental_otel_protocol", + ), + ( + "turbo_experimental_otel_endpoint", + "experimental_otel_endpoint", + ), + ( + "turbo_experimental_otel_timeout_ms", + "experimental_otel_timeout_ms", + ), + ( + "turbo_experimental_otel_headers", + "experimental_otel_headers", + ), + ( + "turbo_experimental_otel_resource", + "experimental_otel_resource", + ), + ( + "turbo_experimental_otel_metrics_run_summary", + "experimental_otel_metrics_run_summary", + ), + ( + "turbo_experimental_otel_metrics_task_details", + "experimental_otel_metrics_task_details", + ), ] .as_slice(); @@ -221,6 +256,10 @@ impl ResolvedConfigurationOptions for EnvVars { .transpose() .map_err(Error::InvalidSsoLoginCallbackPort)?; + let experimental_otel = ExperimentalOtelOptions::from_env_map(&self.output_map)?; + let experimental_observability = + experimental_otel.map(|otel| ExperimentalObservabilityOptions { otel: Some(otel) }); + let output = ConfigurationOptions { api_url: self.output_map.get("api_url").cloned(), login_url: self.output_map.get("login_url").cloned(), @@ -257,6 +296,7 @@ impl ResolvedConfigurationOptions for EnvVars { sso_login_callback_port, // Do not allow future flags to be set by env var future_flags: None, + experimental_observability, }; Ok(output) diff --git a/crates/turborepo-lib/src/config/experimental_otel.rs b/crates/turborepo-lib/src/config/experimental_otel.rs new file mode 100644 index 0000000000000..bc7d579da9eaf --- /dev/null +++ b/crates/turborepo-lib/src/config/experimental_otel.rs @@ -0,0 +1,533 @@ +use std::{collections::BTreeMap, fmt, str::FromStr}; + +use clap::ValueEnum; +use merge::Merge; +use serde::{Deserialize, Serialize}; + +use super::Error; + +#[derive( + Copy, + Clone, + Debug, + PartialEq, + Eq, + Serialize, + Deserialize, + Default, + Hash, + PartialOrd, + Ord, + ValueEnum, +)] +#[serde(rename_all = "kebab-case")] +pub enum ExperimentalOtelProtocol { + #[default] + #[serde(alias = "grpc")] + Grpc, + #[serde(alias = "http")] + #[serde(alias = "http/protobuf")] + HttpProtobuf, +} + +impl fmt::Display for ExperimentalOtelProtocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ExperimentalOtelProtocol::Grpc => write!(f, "grpc"), + ExperimentalOtelProtocol::HttpProtobuf => write!(f, "http/protobuf"), + } + } +} + +impl FromStr for ExperimentalOtelProtocol { + type Err = (); + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "grpc" => Ok(Self::Grpc), + "http" | "http/protobuf" | "http_protobuf" => Ok(Self::HttpProtobuf), + _ => Err(()), + } + } +} + +#[derive(Deserialize, Serialize, Default, Debug, Clone, PartialEq, Eq, Merge)] +#[serde(rename_all = "camelCase")] +pub struct ExperimentalOtelMetricsOptions { + pub run_summary: Option, + pub task_details: Option, +} + +#[derive(Deserialize, Serialize, Default, Debug, Clone, PartialEq, Eq, Merge)] +#[serde(rename_all = "camelCase")] +pub struct ExperimentalOtelOptions { + pub enabled: Option, + pub protocol: Option, + pub endpoint: Option, + pub headers: Option>, + pub timeout_ms: Option, + pub resource: Option>, + pub metrics: Option, +} + +impl ExperimentalOtelOptions { + pub fn is_empty(&self) -> bool { + self.enabled.is_none() + && self.protocol.is_none() + && self.endpoint.is_none() + && self.headers.as_ref().map(|m| m.is_empty()).unwrap_or(true) + && self.timeout_ms.is_none() + && self.resource.as_ref().map(|m| m.is_empty()).unwrap_or(true) + && self + .metrics + .as_ref() + .map(|m| m.run_summary.is_none() && m.task_details.is_none()) + .unwrap_or(true) + } + + pub fn from_env_map( + map: &std::collections::HashMap<&'static str, String>, + ) -> Result, Error> { + let mut options = Self::default(); + let mut touched = false; + + if let Some(raw) = get_non_empty(map, "experimental_otel_enabled") { + options.enabled = Some(parse_bool_flag(raw, "TURBO_EXPERIMENTAL_OTEL_ENABLED")?); + touched = true; + } + + if let Some(raw) = get_non_empty(map, "experimental_otel_protocol") { + let protocol = ::from_str(raw).map_err(|_| { + Error::InvalidExperimentalOtelConfig { + message: format!( + "Unsupported experimentalObservability.otel protocol `{raw}`. Use `grpc` \ + or `http/protobuf`." + ), + } + })?; + options.protocol = Some(protocol); + touched = true; + } + + if let Some(raw) = get_non_empty(map, "experimental_otel_endpoint") { + options.endpoint = Some(raw.to_string()); + touched = true; + } + + if let Some(raw) = get_non_empty(map, "experimental_otel_timeout_ms") { + let timeout = raw + .parse() + .map_err(|_| Error::InvalidExperimentalOtelConfig { + message: "TURBO_EXPERIMENTAL_OTEL_TIMEOUT_MS must be a number.".to_string(), + })?; + options.timeout_ms = Some(timeout); + touched = true; + } + + if let Some(raw) = get_non_empty(map, "experimental_otel_headers") { + options.headers = Some(parse_key_value_pairs( + raw, + "TURBO_EXPERIMENTAL_OTEL_HEADERS", + )?); + touched = true; + } + + if let Some(raw) = get_non_empty(map, "experimental_otel_resource") { + options.resource = Some(parse_key_value_pairs( + raw, + "TURBO_EXPERIMENTAL_OTEL_RESOURCE", + )?); + touched = true; + } + + touched |= set_metric_flag( + map, + "experimental_otel_metrics_run_summary", + "TURBO_EXPERIMENTAL_OTEL_METRICS_RUN_SUMMARY", + |metrics, value| metrics.run_summary = Some(value), + &mut options, + )?; + + touched |= set_metric_flag( + map, + "experimental_otel_metrics_task_details", + "TURBO_EXPERIMENTAL_OTEL_METRICS_TASK_DETAILS", + |metrics, value| metrics.task_details = Some(value), + &mut options, + )?; + + Ok(touched.then_some(options)) + } +} + +fn get_non_empty<'a>( + map: &'a std::collections::HashMap<&'static str, String>, + key: &'static str, +) -> Option<&'a str> { + map.get(key).map(|s| s.as_str()).filter(|s| !s.is_empty()) +} + +fn parse_bool_flag(raw: &str, var: &str) -> Result { + crate::config::env::truth_env_var(raw).ok_or_else(|| Error::InvalidExperimentalOtelConfig { + message: format!("{var} should be either 1 or 0."), + }) +} + +fn parse_key_value_pairs(raw: &str, context: &str) -> Result, Error> { + let mut map = BTreeMap::new(); + for entry in raw.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()) { + let Some((key, value)) = entry.split_once('=') else { + return Err(Error::InvalidExperimentalOtelConfig { + message: format!("{context} entries must be in key=value format."), + }); + }; + if key.trim().is_empty() { + return Err(Error::InvalidExperimentalOtelConfig { + message: format!("{context} keys cannot be empty."), + }); + } + map.insert(key.trim().to_string(), value.trim().to_string()); + } + + Ok(map) +} + +fn set_metric_flag( + map: &std::collections::HashMap<&'static str, String>, + key: &'static str, + env_name: &'static str, + set: impl FnOnce(&mut ExperimentalOtelMetricsOptions, bool), + options: &mut ExperimentalOtelOptions, +) -> Result { + if let Some(raw) = get_non_empty(map, key) { + let value = parse_bool_flag(raw, env_name)?; + set( + options + .metrics + .get_or_insert_with(ExperimentalOtelMetricsOptions::default), + value, + ); + return Ok(true); + } + Ok(false) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + + fn build_env_map(entries: &[(&'static str, &str)]) -> HashMap<&'static str, String> { + entries.iter().map(|(k, v)| (*k, v.to_string())).collect() + } + + #[test] + fn test_from_env_map_empty() { + let map = HashMap::new(); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_from_env_map_enabled_true() { + let map = build_env_map(&[("experimental_otel_enabled", "1")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + assert_eq!(result.unwrap().enabled, Some(true)); + } + + #[test] + fn test_from_env_map_enabled_false() { + let map = build_env_map(&[("experimental_otel_enabled", "0")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + assert_eq!(result.unwrap().enabled, Some(false)); + } + + #[test] + fn test_from_env_map_enabled_true_string() { + let map = build_env_map(&[("experimental_otel_enabled", "true")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + assert_eq!(result.unwrap().enabled, Some(true)); + } + + #[test] + fn test_from_env_map_enabled_invalid() { + let map = build_env_map(&[("experimental_otel_enabled", "invalid")]); + let result = ExperimentalOtelOptions::from_env_map(&map); + assert!(result.is_err()); + match result.unwrap_err() { + Error::InvalidExperimentalOtelConfig { message } => { + assert!(message.contains("TURBO_EXPERIMENTAL_OTEL_ENABLED")); + } + _ => panic!("Expected InvalidExperimentalOtelConfig"), + } + } + + #[test] + fn test_from_env_map_protocol_grpc() { + let map = build_env_map(&[("experimental_otel_protocol", "grpc")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + assert_eq!( + result.unwrap().protocol, + Some(ExperimentalOtelProtocol::Grpc) + ); + } + + #[test] + fn test_from_env_map_protocol_http_protobuf() { + for protocol_str in ["http/protobuf", "http", "http_protobuf"] { + let map = build_env_map(&[("experimental_otel_protocol", protocol_str)]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + assert_eq!( + result.unwrap().protocol, + Some(ExperimentalOtelProtocol::HttpProtobuf) + ); + } + } + + #[test] + fn test_from_env_map_protocol_invalid() { + let map = build_env_map(&[("experimental_otel_protocol", "invalid")]); + let result = ExperimentalOtelOptions::from_env_map(&map); + assert!(result.is_err()); + match result.unwrap_err() { + Error::InvalidExperimentalOtelConfig { message } => { + assert!(message.contains("Unsupported experimentalObservability.otel protocol")); + assert!(message.contains("invalid")); + } + _ => panic!("Expected InvalidExperimentalOtelConfig"), + } + } + + #[test] + fn test_from_env_map_endpoint() { + let endpoint = "https://example.com/otel"; + let map = build_env_map(&[("experimental_otel_endpoint", endpoint)]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + assert_eq!(result.unwrap().endpoint, Some(endpoint.to_string())); + } + + #[test] + fn test_from_env_map_endpoint_empty_ignored() { + let map = build_env_map(&[("experimental_otel_endpoint", "")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_from_env_map_timeout_ms() { + let map = build_env_map(&[("experimental_otel_timeout_ms", "5000")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + assert_eq!(result.unwrap().timeout_ms, Some(5000)); + } + + #[test] + fn test_from_env_map_timeout_ms_invalid() { + let map = build_env_map(&[("experimental_otel_timeout_ms", "not-a-number")]); + let result = ExperimentalOtelOptions::from_env_map(&map); + assert!(result.is_err()); + match result.unwrap_err() { + Error::InvalidExperimentalOtelConfig { message } => { + assert!(message.contains("TURBO_EXPERIMENTAL_OTEL_TIMEOUT_MS must be a number")); + } + _ => panic!("Expected InvalidExperimentalOtelConfig"), + } + } + + #[test] + fn test_from_env_map_headers_single() { + let map = build_env_map(&[("experimental_otel_headers", "key1=value1")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let headers = result.unwrap().headers.unwrap(); + assert_eq!(headers.get("key1"), Some(&"value1".to_string())); + } + + #[test] + fn test_from_env_map_headers_multiple() { + let map = build_env_map(&[("experimental_otel_headers", "key1=value1,key2=value2")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let headers = result.unwrap().headers.unwrap(); + assert_eq!(headers.get("key1"), Some(&"value1".to_string())); + assert_eq!(headers.get("key2"), Some(&"value2".to_string())); + } + + #[test] + fn test_from_env_map_headers_with_spaces() { + let map = build_env_map(&[( + "experimental_otel_headers", + " key1 = value1 , key2 = value2 ", + )]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let headers = result.unwrap().headers.unwrap(); + assert_eq!(headers.get("key1"), Some(&"value1".to_string())); + assert_eq!(headers.get("key2"), Some(&"value2".to_string())); + } + + #[test] + fn test_from_env_map_headers_missing_equals() { + let map = build_env_map(&[("experimental_otel_headers", "key1value1")]); + let result = ExperimentalOtelOptions::from_env_map(&map); + assert!(result.is_err()); + match result.unwrap_err() { + Error::InvalidExperimentalOtelConfig { message } => { + assert!(message.contains("key=value format")); + } + _ => panic!("Expected InvalidExperimentalOtelConfig"), + } + } + + #[test] + fn test_from_env_map_headers_empty_key() { + let map = build_env_map(&[("experimental_otel_headers", "=value1")]); + let result = ExperimentalOtelOptions::from_env_map(&map); + assert!(result.is_err()); + match result.unwrap_err() { + Error::InvalidExperimentalOtelConfig { message } => { + assert!(message.contains("keys cannot be empty")); + } + _ => panic!("Expected InvalidExperimentalOtelConfig"), + } + } + + #[test] + fn test_from_env_map_resource_single() { + let map = build_env_map(&[("experimental_otel_resource", "service.name=my-service")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let resource = result.unwrap().resource.unwrap(); + assert_eq!( + resource.get("service.name"), + Some(&"my-service".to_string()) + ); + } + + #[test] + fn test_from_env_map_resource_multiple() { + let map = build_env_map(&[( + "experimental_otel_resource", + "service.name=my-service,env=production", + )]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let resource = result.unwrap().resource.unwrap(); + assert_eq!( + resource.get("service.name"), + Some(&"my-service".to_string()) + ); + assert_eq!(resource.get("env"), Some(&"production".to_string())); + } + + #[test] + fn test_from_env_map_metrics_run_summary() { + let map = build_env_map(&[("experimental_otel_metrics_run_summary", "1")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.run_summary, Some(true)); + } + + #[test] + fn test_from_env_map_metrics_task_details() { + let map = build_env_map(&[("experimental_otel_metrics_task_details", "1")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.task_details, Some(true)); + } + + #[test] + fn test_from_env_map_metrics_both() { + let map = build_env_map(&[ + ("experimental_otel_metrics_run_summary", "1"), + ("experimental_otel_metrics_task_details", "0"), + ]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.run_summary, Some(true)); + assert_eq!(metrics.task_details, Some(false)); + } + + #[test] + fn test_from_env_map_enabled_with_endpoint() { + let map = build_env_map(&[ + ("experimental_otel_enabled", "1"), + ("experimental_otel_endpoint", "https://example.com/otel"), + ]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let opts = result.unwrap(); + assert_eq!(opts.enabled, Some(true)); + assert_eq!(opts.endpoint, Some("https://example.com/otel".to_string())); + } + + #[test] + fn test_from_env_map_disabled_with_endpoint() { + let map = build_env_map(&[ + ("experimental_otel_enabled", "0"), + ("experimental_otel_endpoint", "https://example.com/otel"), + ]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let opts = result.unwrap(); + assert_eq!(opts.enabled, Some(false)); + assert_eq!(opts.endpoint, Some("https://example.com/otel".to_string())); + } + + #[test] + fn test_from_env_map_metrics_run_summary_disabled() { + let map = build_env_map(&[("experimental_otel_metrics_run_summary", "0")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.run_summary, Some(false)); + } + + #[test] + fn test_from_env_map_metrics_task_details_disabled() { + let map = build_env_map(&[("experimental_otel_metrics_task_details", "0")]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let metrics = result.unwrap().metrics.unwrap(); + assert_eq!(metrics.task_details, Some(false)); + } + + #[test] + fn test_from_env_map_combined() { + let map = build_env_map(&[ + ("experimental_otel_enabled", "1"), + ("experimental_otel_protocol", "grpc"), + ("experimental_otel_endpoint", "https://example.com/otel"), + ("experimental_otel_timeout_ms", "15000"), + ("experimental_otel_headers", "auth=token123"), + ("experimental_otel_resource", "service.name=test"), + ("experimental_otel_metrics_run_summary", "1"), + ]); + let result = ExperimentalOtelOptions::from_env_map(&map).unwrap(); + assert!(result.is_some()); + let opts = result.unwrap(); + assert_eq!(opts.enabled, Some(true)); + assert_eq!(opts.protocol, Some(ExperimentalOtelProtocol::Grpc)); + assert_eq!(opts.endpoint, Some("https://example.com/otel".to_string())); + assert_eq!(opts.timeout_ms, Some(15000)); + assert_eq!( + opts.headers.unwrap().get("auth"), + Some(&"token123".to_string()) + ); + assert_eq!( + opts.resource.unwrap().get("service.name"), + Some(&"test".to_string()) + ); + assert_eq!(opts.metrics.unwrap().run_summary, Some(true)); + } +} diff --git a/crates/turborepo-lib/src/config/mod.rs b/crates/turborepo-lib/src/config/mod.rs index fad0c7ed21dd8..105e20a4b7940 100644 --- a/crates/turborepo-lib/src/config/mod.rs +++ b/crates/turborepo-lib/src/config/mod.rs @@ -1,4 +1,5 @@ mod env; +mod experimental_otel; mod file; mod override_env; mod turbo_json; @@ -13,7 +14,7 @@ use file::{AuthFile, ConfigFile}; use merge::Merge; use miette::{Diagnostic, NamedSource, SourceSpan}; use override_env::OverrideEnvVars; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use struct_iterable::Iterable; use thiserror::Error; use tracing::debug; @@ -32,6 +33,16 @@ use crate::{ pub const CONFIG_FILE: &str = "turbo.json"; pub const CONFIG_FILE_JSONC: &str = "turbo.jsonc"; +pub use experimental_otel::{ + ExperimentalOtelMetricsOptions, ExperimentalOtelOptions, ExperimentalOtelProtocol, +}; + +#[derive(Deserialize, Serialize, Default, Debug, Clone, PartialEq, Eq, Merge)] +#[serde(rename_all = "camelCase")] +pub struct ExperimentalObservabilityOptions { + pub otel: Option, +} + #[derive(Debug, Error, Diagnostic)] #[error("Environment variables should not be prefixed with \"{env_pipeline_delimiter}\"")] #[diagnostic( @@ -252,6 +263,8 @@ pub enum Error { InvalidTuiScrollbackLength(#[source] std::num::ParseIntError), #[error("TURBO_SSO_LOGIN_CALLBACK_PORT: Invalid value. Use a number for the callback port.")] InvalidSsoLoginCallbackPort(#[source] std::num::ParseIntError), + #[error("Invalid experimentalOtel configuration: {message}")] + InvalidExperimentalOtelConfig { message: String }, } const DEFAULT_API_URL: &str = "https://vercel.com/api"; @@ -324,6 +337,8 @@ pub struct ConfigurationOptions { pub(crate) sso_login_callback_port: Option, #[serde(skip)] future_flags: Option, + #[serde(rename = "experimentalObservability")] + pub(crate) experimental_observability: Option, } #[derive(Default)] @@ -487,6 +502,10 @@ impl ConfigurationOptions { pub fn future_flags(&self) -> FutureFlags { self.future_flags.unwrap_or_default() } + + pub fn experimental_observability(&self) -> Option<&ExperimentalObservabilityOptions> { + self.experimental_observability.as_ref() + } } // Maps Some("") to None to emulate how Go handles empty strings diff --git a/crates/turborepo-lib/src/config/turbo_json.rs b/crates/turborepo-lib/src/config/turbo_json.rs index 1041c18b807ef..a096ef57da943 100644 --- a/crates/turborepo-lib/src/config/turbo_json.rs +++ b/crates/turborepo-lib/src/config/turbo_json.rs @@ -1,8 +1,16 @@ +use std::{collections::BTreeMap, str::FromStr}; + use camino::Utf8PathBuf; use turbopath::{AbsoluteSystemPath, RelativeUnixPath}; -use super::{ConfigurationOptions, Error, ResolvedConfigurationOptions}; -use crate::turbo_json::{RawRemoteCacheOptions, RawRootTurboJson, RawTurboJson}; +use super::{ + ConfigurationOptions, Error, ExperimentalObservabilityOptions, ExperimentalOtelMetricsOptions, + ExperimentalOtelOptions, ExperimentalOtelProtocol, ResolvedConfigurationOptions, +}; +use crate::turbo_json::{ + RawExperimentalObservability, RawKeyValue, RawObservabilityOtel, RawRemoteCacheOptions, + RawRootTurboJson, RawTurboJson, +}; pub struct TurboJsonReader<'a> { repo_root: &'a AbsoluteSystemPath, @@ -78,7 +86,21 @@ impl<'a> TurboJsonReader<'a> { opts.env_mode = turbo_json.env_mode.map(|mode| *mode.as_inner()); opts.cache_dir = cache_dir; opts.concurrency = turbo_json.concurrency.map(|c| c.as_inner().clone()); + + // Only read observability config if futureFlags.experimentalObservability is + // enabled + let future_flags = turbo_json.future_flags.as_ref().map(|f| *f.as_inner()); opts.future_flags = turbo_json.future_flags.map(|f| *f.as_inner()); + + if future_flags + .map(|f| f.experimental_observability) + .unwrap_or(false) + { + if let Some(raw_observability) = turbo_json.experimental_observability { + opts.experimental_observability = + Some(convert_raw_observability(raw_observability)?); + } + } Ok(opts) } } @@ -103,6 +125,58 @@ impl<'a> ResolvedConfigurationOptions for TurboJsonReader<'a> { } } +fn convert_key_values(entries: Vec) -> BTreeMap { + let mut map = BTreeMap::new(); + for entry in entries { + map.insert( + entry.key.into_inner().into(), + entry.value.into_inner().into(), + ); + } + map +} + +fn convert_raw_observability( + raw: RawExperimentalObservability, +) -> Result { + Ok(ExperimentalObservabilityOptions { + otel: raw.otel.map(convert_raw_observability_otel).transpose()?, + }) +} + +fn convert_raw_observability_otel( + raw: RawObservabilityOtel, +) -> Result { + let protocol = if let Some(protocol) = raw.protocol { + let proto_str = protocol.as_inner().as_str(); + Some(ExperimentalOtelProtocol::from_str(proto_str).map_err(|_| { + Error::InvalidExperimentalOtelConfig { + message: format!( + "Unsupported experimentalObservability.otel protocol `{proto_str}`. Use \ + `grpc` or `http/protobuf`." + ), + } + })?) + } else { + None + }; + + let metrics = raw.metrics.map(|metrics| ExperimentalOtelMetricsOptions { + run_summary: metrics.run_summary.map(|flag| *flag.as_inner()), + task_details: metrics.task_details.map(|flag| *flag.as_inner()), + }); + + Ok(ExperimentalOtelOptions { + enabled: raw.enabled.map(|flag| *flag.as_inner()), + protocol, + endpoint: raw.endpoint.map(|endpoint| endpoint.into_inner().into()), + headers: raw.headers.map(convert_key_values), + timeout_ms: raw.timeout_ms.map(|timeout| *timeout.as_inner()), + resource: raw.resource.map(convert_key_values), + metrics, + }) +} + #[cfg(test)] mod test { use serde_json::json; @@ -227,4 +301,81 @@ mod test { let config = TurboJsonReader::turbo_json_to_config_options(turbo_json).unwrap(); assert_eq!(config.allow_no_package_manager(), expected); } + + #[test] + fn test_experimental_observability_otel_with_future_flag_disabled() { + let turbo_json = RawRootTurboJson::parse( + &serde_json::to_string_pretty(&json!({ + "futureFlags": { + "experimentalObservability": false + }, + "experimentalObservability": { + "otel": { + "enabled": true, + "endpoint": "https://example.com/otel" + } + } + })) + .unwrap(), + "turbo.json", + ) + .unwrap() + .into(); + let config = TurboJsonReader::turbo_json_to_config_options(turbo_json).unwrap(); + // Should be None because future flag is disabled + assert_eq!(config.experimental_observability(), None); + } + + #[test] + fn test_experimental_observability_otel_with_future_flag_enabled() { + let endpoint = "https://example.com/otel"; + let turbo_json = RawRootTurboJson::parse( + &serde_json::to_string_pretty(&json!({ + "futureFlags": { + "experimentalObservability": true + }, + "experimentalObservability": { + "otel": { + "enabled": true, + "endpoint": endpoint, + "protocol": "grpc", + "timeoutMs": 5000 + } + } + })) + .unwrap(), + "turbo.json", + ) + .unwrap() + .into(); + let config = TurboJsonReader::turbo_json_to_config_options(turbo_json).unwrap(); + let observability_config = config.experimental_observability(); + assert!(observability_config.is_some()); + let otel_config = observability_config.unwrap().otel.as_ref().unwrap(); + assert_eq!(otel_config.enabled, Some(true)); + assert_eq!(otel_config.endpoint.as_ref(), Some(&endpoint.to_string())); + assert_eq!(otel_config.protocol, Some(ExperimentalOtelProtocol::Grpc)); + assert_eq!(otel_config.timeout_ms, Some(5000)); + } + + #[test] + fn test_experimental_observability_without_future_flag() { + let turbo_json = RawRootTurboJson::parse( + &serde_json::to_string_pretty(&json!({ + "experimentalObservability": { + "otel": { + "enabled": true, + "endpoint": "https://example.com/otel" + } + } + })) + .unwrap(), + "turbo.json", + ) + .unwrap() + .into(); + let config = TurboJsonReader::turbo_json_to_config_options(turbo_json).unwrap(); + // Should be None because future flag is not set (defaults to false) + assert_eq!(config.experimental_observability(), None); + } } diff --git a/crates/turborepo-lib/src/lib.rs b/crates/turborepo-lib/src/lib.rs index 65b8ed29b9819..fe337ac3b17e4 100644 --- a/crates/turborepo-lib/src/lib.rs +++ b/crates/turborepo-lib/src/lib.rs @@ -25,6 +25,7 @@ mod boundaries; mod gitignore; mod hash; mod microfrontends; +mod observability; mod opts; mod package_changes_watcher; mod panic_handler; diff --git a/crates/turborepo-lib/src/observability/mod.rs b/crates/turborepo-lib/src/observability/mod.rs new file mode 100644 index 0000000000000..3cb92c205b347 --- /dev/null +++ b/crates/turborepo-lib/src/observability/mod.rs @@ -0,0 +1,89 @@ +//! Observability abstraction layer for Turborepo runs. +//! +//! This module provides a generic interface for recording metrics and telemetry +//! from completed Turborepo runs. The public API consists of: +//! +//! - [`Handle`]: An opaque handle to an observability backend +//! - [`try_init`]: Factory function to initialize an observability backend +//! +//! Currently, only OpenTelemetry is supported as a backend. Additional backends +//! can be added by: +//! +//! 1. Creating a new submodule (e.g., `observability::prometheus`) that +//! implements the internal `RunObserver` trait +//! 2. Adding a `try_init_*` function that returns `Option` +//! 3. Updating `try_init` to dispatch to the appropriate backend based on +//! config +//! +//! The abstraction ensures that callers only depend on the generic `Handle` +//! type and are not coupled to any specific observability implementation. + +use std::sync::Arc; + +use crate::{config::ExperimentalObservabilityOptions, run::summary::RunSummary}; + +#[cfg(feature = "otel")] +mod otel; + +#[cfg(not(feature = "otel"))] +mod otel_disabled; + +#[cfg(not(feature = "otel"))] +use otel_disabled as otel; + +/// Trait for observing completed Turborepo runs, allowing different +/// observability backends like OpenTelemetry to be plugged in. +pub(crate) trait RunObserver: Send + Sync { + /// Record metrics for a completed run. + fn record(&self, summary: &RunSummary<'_>); + /// Shutdown the observer, flushing any pending metrics. + /// This is called when the handle is dropped or explicitly shut down. + fn shutdown(&self); +} + +/// A generic handle to an observability backend. +/// +/// This is the only type that the rest of turborepo-lib needs to know about. +/// The concrete backend implementation is hidden behind the `RunObserver` +/// trait. +#[derive(Clone)] +pub struct Handle { + pub(crate) inner: Arc, +} + +impl std::fmt::Debug for Handle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handle").finish_non_exhaustive() + } +} + +impl Handle { + /// Record metrics for a completed run. + pub fn record(&self, summary: &RunSummary<'_>) { + self.inner.record(summary); + } + + /// Shutdown the observer, flushing any pending metrics. + pub fn shutdown(self) { + // We only have an Arc, so we delegate shutdown to the trait + // and let the concrete implementation handle any shared references. + self.inner.shutdown(); + } + + /// Initialize an observability handle from configuration options. + /// + /// Returns `None` if observability is disabled or misconfigured. + /// + /// Currently, this only supports OpenTelemetry backends configured via + /// `ExperimentalObservabilityOptions` (from + /// `experimentalObservability.otel` in turbo.json or via environment + /// variables/CLI flags). In the future, this may dispatch to different + /// backends based on the configuration provided. + pub(crate) fn try_init(options: &ExperimentalObservabilityOptions) -> Option { + if let Some(otel_options) = options.otel.as_ref() { + otel::try_init_otel(otel_options) + } else { + None + } + } +} diff --git a/crates/turborepo-lib/src/observability/otel.rs b/crates/turborepo-lib/src/observability/otel.rs new file mode 100644 index 0000000000000..4dc34f549f5f2 --- /dev/null +++ b/crates/turborepo-lib/src/observability/otel.rs @@ -0,0 +1,400 @@ +use std::{sync::Arc, time::Duration}; + +use turborepo_otel::{RunMetricsPayload, TaskCacheStatus, TaskMetricsPayload}; + +use super::{Handle, RunObserver}; +use crate::{ + config::{ExperimentalOtelMetricsOptions, ExperimentalOtelOptions, ExperimentalOtelProtocol}, + run::summary::{CacheStatus, RunSummary, TaskSummary}, +}; + +/// OpenTelemetry-based observer implementation. +struct OtelObserver { + handle: turborepo_otel::Handle, +} + +impl RunObserver for OtelObserver { + fn record(&self, summary: &RunSummary<'_>) { + if let Some(payload) = build_payload(summary) { + self.handle.record_run(&payload); + } + } + + fn shutdown(&self) { + // `shutdown` consumes the handle. Clone the cheap, Arc-backed handle first. + self.handle.clone().shutdown(); + } +} + +/// Initialize an OpenTelemetry observability handle from configuration options. +/// Returns `None` if observability is disabled or misconfigured. +pub(crate) fn try_init_otel(options: &ExperimentalOtelOptions) -> Option { + let config = config_from_options(options)?; + + match turborepo_otel::Handle::try_new(config) { + Ok(handle) => Some(Handle { + inner: Arc::new(OtelObserver { handle }), + }), + Err(e) => { + tracing::warn!("Failed to initialize OTel exporter: {}", e); + None + } + } +} + +fn config_from_options(options: &ExperimentalOtelOptions) -> Option { + if options.enabled.is_some_and(|enabled| !enabled) { + return None; + } + + let endpoint = options.endpoint.as_ref()?.trim(); + if endpoint.is_empty() { + return None; + } + let endpoint = endpoint.to_string(); + + let protocol = match options.protocol.unwrap_or(ExperimentalOtelProtocol::Grpc) { + ExperimentalOtelProtocol::Grpc => turborepo_otel::Protocol::Grpc, + ExperimentalOtelProtocol::HttpProtobuf => turborepo_otel::Protocol::HttpProtobuf, + }; + + let headers = options.headers.clone().unwrap_or_default(); + let resource_attributes = options.resource.clone().unwrap_or_default(); + let metrics = metrics_config(options.metrics.as_ref()); + let timeout = Duration::from_millis(options.timeout_ms.unwrap_or(10_000)); + + Some(turborepo_otel::Config { + endpoint, + protocol, + headers, + timeout, + resource_attributes, + metrics, + }) +} + +fn metrics_config( + options: Option<&ExperimentalOtelMetricsOptions>, +) -> turborepo_otel::MetricsConfig { + let run_summary = options.and_then(|opts| opts.run_summary).unwrap_or(true); + let task_details = options.and_then(|opts| opts.task_details).unwrap_or(false); + turborepo_otel::MetricsConfig { + run_summary, + task_details, + } +} + +fn build_payload(summary: &RunSummary<'_>) -> Option { + let execution = summary.execution_summary()?; + let duration_ms = (execution.end_time - execution.start_time) as f64; + let attempted_tasks = execution.attempted() as u64; + let failed_tasks = execution.failed() as u64; + let cached_tasks = execution.cached() as u64; + + let tasks = summary + .tasks() + .iter() + .map(build_task_payload) + .collect::>(); + + let scm = summary.scm_state(); + + Some(RunMetricsPayload { + run_id: summary.id().to_string(), + turbo_version: summary.turbo_version().to_string(), + duration_ms, + attempted_tasks, + failed_tasks, + cached_tasks, + exit_code: execution.exit_code, + scm_branch: scm.branch().map(|s| s.to_string()), + scm_revision: scm.sha().map(|s| s.to_string()), + tasks, + }) +} + +fn build_task_payload(task: &TaskSummary) -> TaskMetricsPayload { + let duration_ms = task + .shared + .execution + .as_ref() + .map(|exec| (exec.end_time - exec.start_time) as f64); + let exit_code = task + .shared + .execution + .as_ref() + .and_then(|exec| exec.exit_code); + let cache_status = match task.shared.cache.status() { + CacheStatus::Hit => TaskCacheStatus::Hit, + CacheStatus::Miss => TaskCacheStatus::Miss, + }; + let cache_source = task + .shared + .cache + .cache_source_label() + .map(|label| label.to_string()); + let cache_time_saved_ms = match cache_status { + TaskCacheStatus::Hit => { + let saved = task.shared.cache.time_saved(); + (saved > 0).then_some(saved) + } + TaskCacheStatus::Miss => None, + }; + + TaskMetricsPayload { + task_id: task.task_id.to_string(), + task: task.task.clone(), + package: task.package.clone(), + hash: task.shared.hash.clone(), + external_inputs_hash: task.shared.hash_of_external_dependencies.clone(), + command: task.shared.command.clone(), + duration_ms, + cache_status, + cache_source, + cache_time_saved_ms, + exit_code, + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::*; + + #[test] + fn test_config_from_options_enabled_false() { + let options = ExperimentalOtelOptions { + enabled: Some(false), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_none()); + } + + #[test] + fn test_config_from_options_enabled_false_with_endpoint() { + let options = ExperimentalOtelOptions { + enabled: Some(false), + endpoint: Some("https://example.com/otel".to_string()), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_none()); + } + + #[test] + fn test_config_from_options_no_endpoint() { + let options = ExperimentalOtelOptions::default(); + let result = config_from_options(&options); + assert!(result.is_none()); + } + + #[test] + fn test_config_from_options_empty_endpoint() { + let options = ExperimentalOtelOptions { + endpoint: Some(" ".to_string()), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_none()); + } + + #[test] + fn test_config_from_options_defaults() { + let options = ExperimentalOtelOptions { + endpoint: Some("https://example.com/otel".to_string()), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_some()); + let config = result.unwrap(); + assert_eq!(config.endpoint, "https://example.com/otel"); + assert_eq!(config.protocol, turborepo_otel::Protocol::Grpc); + assert_eq!(config.timeout.as_millis(), 10_000); + assert!(config.metrics.run_summary); + assert!(!config.metrics.task_details); + } + + #[test] + fn test_config_from_options_enabled_none_with_endpoint() { + let options = ExperimentalOtelOptions { + enabled: None, + endpoint: Some("https://example.com/otel".to_string()), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_some()); + let config = result.unwrap(); + assert_eq!(config.endpoint, "https://example.com/otel"); + assert_eq!(config.protocol, turborepo_otel::Protocol::Grpc); + assert!(config.metrics.run_summary); + assert!(!config.metrics.task_details); + } + + #[test] + fn test_config_from_options_enabled_true_no_endpoint() { + let options = ExperimentalOtelOptions { + enabled: Some(true), + endpoint: None, + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_none()); + } + + #[test] + fn test_config_from_options_enabled_true_whitespace_endpoint() { + let options = ExperimentalOtelOptions { + enabled: Some(true), + endpoint: Some(" ".to_string()), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_none()); + } + + #[test] + fn test_config_from_options_http_protobuf() { + let options = ExperimentalOtelOptions { + endpoint: Some("https://example.com/otel".to_string()), + protocol: Some(ExperimentalOtelProtocol::HttpProtobuf), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_some()); + assert_eq!( + result.unwrap().protocol, + turborepo_otel::Protocol::HttpProtobuf + ); + } + + #[test] + fn test_config_from_options_custom_timeout() { + let options = ExperimentalOtelOptions { + endpoint: Some("https://example.com/otel".to_string()), + timeout_ms: Some(15000), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_some()); + assert_eq!(result.unwrap().timeout.as_millis(), 15_000); + } + + #[test] + fn test_config_from_options_headers() { + let mut headers = BTreeMap::new(); + headers.insert("auth".to_string(), "token123".to_string()); + let options = ExperimentalOtelOptions { + endpoint: Some("https://example.com/otel".to_string()), + headers: Some(headers), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_some()); + let config = result.unwrap(); + assert_eq!(config.headers.get("auth"), Some(&"token123".to_string())); + } + + #[test] + fn test_config_from_options_resource() { + let mut resource = BTreeMap::new(); + resource.insert("service.name".to_string(), "my-service".to_string()); + resource.insert("env".to_string(), "production".to_string()); + let options = ExperimentalOtelOptions { + endpoint: Some("https://example.com/otel".to_string()), + resource: Some(resource), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_some()); + let config = result.unwrap(); + assert_eq!( + config.resource_attributes.get("service.name"), + Some(&"my-service".to_string()) + ); + assert_eq!( + config.resource_attributes.get("env"), + Some(&"production".to_string()) + ); + } + + #[test] + fn test_metrics_config_defaults() { + let result = metrics_config(None); + assert!(result.run_summary); + assert!(!result.task_details); + } + + #[test] + fn test_metrics_config_run_summary_override() { + let metrics = ExperimentalOtelMetricsOptions { + run_summary: Some(false), + ..Default::default() + }; + let result = metrics_config(Some(&metrics)); + assert!(!result.run_summary); + assert!(!result.task_details); + } + + #[test] + fn test_metrics_config_task_details_override() { + let metrics = ExperimentalOtelMetricsOptions { + task_details: Some(true), + ..Default::default() + }; + let result = metrics_config(Some(&metrics)); + assert!(result.run_summary); + assert!(result.task_details); + } + + #[test] + fn test_config_from_options_metrics_toggles() { + let options = ExperimentalOtelOptions { + endpoint: Some("https://example.com/otel".to_string()), + metrics: Some(ExperimentalOtelMetricsOptions { + run_summary: Some(false), + task_details: Some(true), + }), + ..Default::default() + }; + let result = config_from_options(&options); + assert!(result.is_some()); + let config = result.unwrap(); + assert!(!config.metrics.run_summary); + assert!(config.metrics.task_details); + } + + #[test] + fn test_try_init_otel_swallows_initialization_errors() { + let options = ExperimentalOtelOptions { + enabled: Some(true), + endpoint: Some("invalid://endpoint".to_string()), + ..Default::default() + }; + let result = try_init_otel(&options); + assert!(result.is_none()); + } + + #[test] + fn test_try_init_otel_returns_none_for_disabled() { + let options = ExperimentalOtelOptions { + enabled: Some(false), + endpoint: Some("https://example.com/otel".to_string()), + ..Default::default() + }; + let result = try_init_otel(&options); + assert!(result.is_none()); + } + + #[test] + fn test_try_init_otel_returns_none_for_no_endpoint() { + let options = ExperimentalOtelOptions { + enabled: Some(true), + endpoint: None, + ..Default::default() + }; + let result = try_init_otel(&options); + assert!(result.is_none()); + } +} diff --git a/crates/turborepo-lib/src/observability/otel_disabled.rs b/crates/turborepo-lib/src/observability/otel_disabled.rs new file mode 100644 index 0000000000000..32dedf4f35cbe --- /dev/null +++ b/crates/turborepo-lib/src/observability/otel_disabled.rs @@ -0,0 +1,7 @@ +use crate::config::ExperimentalOtelOptions; + +/// Initialize an OpenTelemetry observability handle from configuration options. +/// Returns `None` when OTel is disabled at compile time. +pub(crate) fn try_init_otel(_options: &ExperimentalOtelOptions) -> Option { + None +} diff --git a/crates/turborepo-lib/src/opts.rs b/crates/turborepo-lib/src/opts.rs index 173fc6960d272..711ba5e5fc4b8 100644 --- a/crates/turborepo-lib/src/opts.rs +++ b/crates/turborepo-lib/src/opts.rs @@ -78,6 +78,7 @@ pub struct Opts { pub scope_opts: ScopeOpts, pub tui_opts: TuiOpts, pub future_flags: FutureFlags, + pub experimental_observability: Option, } impl Opts { @@ -182,6 +183,7 @@ impl Opts { let repo_opts = RepoOpts::from(inputs); let tui_opts = TuiOpts::from(inputs); let future_flags = config.future_flags(); + let experimental_observability = config.experimental_observability().cloned(); Ok(Self { repo_opts, @@ -192,6 +194,7 @@ impl Opts { api_client_opts, tui_opts, future_flags, + experimental_observability, }) } } @@ -751,6 +754,7 @@ mod test { runcache_opts, tui_opts, future_flags: Default::default(), + experimental_observability: None, }; let synthesized = opts.synthesize_command(); assert_eq!(synthesized, expected); diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index 00c87cf6dc2a8..ccbaccfd595c6 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -45,6 +45,7 @@ use crate::{ config::resolve_turbo_config_path, engine::{Engine, EngineBuilder}, microfrontends::MicrofrontendsConfigs, + observability, opts::Opts, run::{scope, task_access::TaskAccess, Error, Run, RunCache}, shim::TurboState, @@ -483,6 +484,11 @@ impl RunBuilder { .should_print_prelude_override .unwrap_or_else(|| self.will_execute_tasks()); + let observability_handle = match self.opts.experimental_observability.as_ref() { + Some(opts) => observability::Handle::try_init(opts), + None => None, + }; + Ok(Run { version: self.version, color_config: self.color_config, @@ -506,6 +512,7 @@ impl RunBuilder { daemon, should_print_prelude, micro_frontend_configs, + observability_handle, }) } diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index e5e61ce9123c7..1930809c27ad5 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -78,6 +78,7 @@ pub struct Run { daemon: Option>, should_print_prelude: bool, micro_frontend_configs: Option, + observability_handle: Option, } type UIResult = Result>)>, Error>; @@ -646,6 +647,7 @@ impl Run { self.version, Vendor::get_user(), &self.scm, + self.observability_handle.clone(), ); let mut visitor = Visitor::new( diff --git a/crates/turborepo-lib/src/run/summary/execution.rs b/crates/turborepo-lib/src/run/summary/execution.rs index c8ba1d042c9bf..04dbbda581d50 100644 --- a/crates/turborepo-lib/src/run/summary/execution.rs +++ b/crates/turborepo-lib/src/run/summary/execution.rs @@ -165,6 +165,21 @@ impl<'a> ExecutionSummary<'a> { fn successful(&self) -> usize { self.success + self.cached } + + // Used in observability/otel.rs to populate RunMetricsPayload.attempted_tasks + pub(crate) fn attempted(&self) -> usize { + self.attempted + } + + // Used in observability/otel.rs to populate RunMetricsPayload.failed_tasks + pub(crate) fn failed(&self) -> usize { + self.failed + } + + // Used in observability/otel.rs to populate RunMetricsPayload.cached_tasks + pub(crate) fn cached(&self) -> usize { + self.cached + } } /// The final states of all task executions diff --git a/crates/turborepo-lib/src/run/summary/mod.rs b/crates/turborepo-lib/src/run/summary/mod.rs index c6f82ab7547af..7a3a0f09c52e7 100644 --- a/crates/turborepo-lib/src/run/summary/mod.rs +++ b/crates/turborepo-lib/src/run/summary/mod.rs @@ -29,6 +29,8 @@ use turborepo_scm::SCM; use turborepo_task_id::TaskId; use turborepo_ui::{color, cprintln, cwriteln, ColorConfig, BOLD, BOLD_CYAN, GREY}; +/// Re-exported for use in observability/otel.rs +pub(crate) use self::task::{CacheStatus, TaskSummary}; use self::{ execution::TaskState, task::SinglePackageTaskSummary, task_factory::TaskSummaryFactory, }; @@ -40,7 +42,6 @@ use crate::{ run::summary::{ execution::{ExecutionSummary, ExecutionTracker}, scm::SCMState, - task::TaskSummary, }, task_hash::TaskHashTracker, }; @@ -98,6 +99,8 @@ pub struct RunSummary<'a> { should_save: bool, #[serde(skip)] run_type: RunType, + #[serde(skip)] + observability_handle: Option, } /// We use this to track the run, so it's constructed before the run. @@ -109,6 +112,7 @@ pub struct RunTracker { execution_tracker: ExecutionTracker, user: Option, synthesized_command: String, + observability_handle: Option, } impl RunTracker { @@ -121,6 +125,7 @@ impl RunTracker { version: &'static str, user: Option, scm: &SCM, + observability_handle: Option, ) -> Self { let scm = SCMState::get(env_at_execution_start, scm, repo_root); @@ -131,6 +136,7 @@ impl RunTracker { execution_tracker: ExecutionTracker::new(), user, synthesized_command, + observability_handle, } } @@ -197,6 +203,7 @@ impl RunTracker { repo_root, should_save, run_type, + observability_handle: self.observability_handle, }) } @@ -311,6 +318,31 @@ impl<'a> From<&'a RunSummary<'a>> for SinglePackageRunSummary<'a> { } impl<'a> RunSummary<'a> { + // Used in observability/otel.rs to populate RunMetricsPayload.run_id + pub(crate) fn id(&self) -> &Ksuid { + &self.id + } + + // Used in observability/otel.rs to populate RunMetricsPayload.turbo_version + pub(crate) fn turbo_version(&self) -> &'static str { + self.turbo_version + } + + // Used in observability/otel.rs to access execution summary for metrics + pub(crate) fn execution_summary(&self) -> Option<&ExecutionSummary<'a>> { + self.execution.as_ref() + } + + // Used in observability/otel.rs to iterate over tasks for TaskMetricsPayload + pub(crate) fn tasks(&self) -> &[TaskSummary] { + &self.tasks + } + + // Used in observability/otel.rs to populate RunMetricsPayload SCM fields + pub(crate) fn scm_state(&self) -> &SCMState { + &self.scm + } + #[tracing::instrument(skip(self, pkg_dep_graph, ui))] async fn finish( mut self, @@ -324,6 +356,11 @@ impl<'a> RunSummary<'a> { return self.close_dry_run(pkg_dep_graph, ui); } + if let Some(handle) = self.observability_handle.take() { + handle.record(&self); + handle.shutdown(); + } + if self.should_save { if let Err(err) = self.save() { warn!("Error writing run summary: {}", err) diff --git a/crates/turborepo-lib/src/run/summary/scm.rs b/crates/turborepo-lib/src/run/summary/scm.rs index a546b8c8063c2..781693398eb6a 100644 --- a/crates/turborepo-lib/src/run/summary/scm.rs +++ b/crates/turborepo-lib/src/run/summary/scm.rs @@ -50,4 +50,14 @@ impl SCMState { state } + + // Used in observability/otel.rs to populate RunMetricsPayload.scm_branch + pub(crate) fn branch(&self) -> Option<&str> { + self.branch.as_deref() + } + + // Used in observability/otel.rs to populate RunMetricsPayload.scm_revision + pub(crate) fn sha(&self) -> Option<&str> { + self.sha.as_deref() + } } diff --git a/crates/turborepo-lib/src/run/summary/task.rs b/crates/turborepo-lib/src/run/summary/task.rs index 4ed0d4e688ae6..a8cb2f874730e 100644 --- a/crates/turborepo-lib/src/run/summary/task.rs +++ b/crates/turborepo-lib/src/run/summary/task.rs @@ -29,7 +29,8 @@ pub struct TaskCacheSummary { #[derive(Debug, Serialize, Copy, Clone)] #[serde(rename_all = "UPPERCASE")] -enum CacheStatus { +// Used in observability/otel.rs to map to TaskCacheStatus +pub(crate) enum CacheStatus { Hit, Miss, } @@ -122,6 +123,25 @@ pub struct TaskEnvVarSummary { } impl TaskCacheSummary { + // Used in observability/otel.rs to populate TaskMetricsPayload.cache_status + pub(crate) fn status(&self) -> CacheStatus { + self.status + } + + // Used in observability/otel.rs to populate + // TaskMetricsPayload.cache_time_saved_ms + pub(crate) fn time_saved(&self) -> u64 { + self.time_saved + } + + // Used in observability/otel.rs to populate TaskMetricsPayload.cache_source + pub(crate) fn cache_source_label(&self) -> Option<&'static str> { + self.source.map(|source| match source { + CacheSource::Local => "LOCAL", + CacheSource::Remote => "REMOTE", + }) + } + pub fn cache_miss() -> Self { Self { local: false, diff --git a/crates/turborepo-lib/src/turbo_json/future_flags.rs b/crates/turborepo-lib/src/turbo_json/future_flags.rs index 8d613c9f0455d..cab221e3e6dc4 100644 --- a/crates/turborepo-lib/src/turbo_json/future_flags.rs +++ b/crates/turborepo-lib/src/turbo_json/future_flags.rs @@ -42,6 +42,12 @@ pub struct FutureFlags { /// root. All `turbo.json` must still extend from the root `turbo.json` /// first. pub non_root_extends: bool, + /// Enable experimental OpenTelemetry exporter support. + /// + /// When enabled, Turborepo will honor the `experimentalObservability` + /// configuration block (if present) to send run summaries to an + /// observability backend. + pub experimental_observability: bool, } impl FutureFlags { diff --git a/crates/turborepo-lib/src/turbo_json/mod.rs b/crates/turborepo-lib/src/turbo_json/mod.rs index 9e4d781e0c99a..e3718e69e4a94 100644 --- a/crates/turborepo-lib/src/turbo_json/mod.rs +++ b/crates/turborepo-lib/src/turbo_json/mod.rs @@ -33,7 +33,8 @@ pub use future_flags::FutureFlags; pub use loader::{TurboJsonLoader, TurboJsonReader}; pub use processed::ProcessedTaskDefinition; pub use raw::{ - RawPackageTurboJson, RawRemoteCacheOptions, RawRootTurboJson, RawTaskDefinition, RawTurboJson, + RawExperimentalObservability, RawKeyValue, RawObservabilityOtel, RawPackageTurboJson, + RawRemoteCacheOptions, RawRootTurboJson, RawTaskDefinition, RawTurboJson, }; use crate::boundaries::BoundariesConfig; @@ -1022,6 +1023,7 @@ mod tests { &FutureFlags { turbo_extends_keyword: true, non_root_extends: false, + experimental_observability: false, } ); diff --git a/crates/turborepo-lib/src/turbo_json/processed.rs b/crates/turborepo-lib/src/turbo_json/processed.rs index cd9548d245b6b..eae5cea6c8c32 100644 --- a/crates/turborepo-lib/src/turbo_json/processed.rs +++ b/crates/turborepo-lib/src/turbo_json/processed.rs @@ -408,6 +408,7 @@ mod tests { &FutureFlags { turbo_extends_keyword: true, non_root_extends: false, + experimental_observability: false, }, ); @@ -430,6 +431,7 @@ mod tests { &FutureFlags { turbo_extends_keyword: false, non_root_extends: false, + experimental_observability: false, }, ); @@ -450,6 +452,7 @@ mod tests { &FutureFlags { turbo_extends_keyword: true, non_root_extends: false, + experimental_observability: false, }, ); @@ -596,6 +599,7 @@ mod tests { &FutureFlags { turbo_extends_keyword: true, non_root_extends: false, + experimental_observability: false, }, ) .unwrap(); @@ -620,6 +624,7 @@ mod tests { &FutureFlags { turbo_extends_keyword: false, non_root_extends: false, + experimental_observability: false, }, ); assert!(result.is_err()); @@ -640,6 +645,7 @@ mod tests { &FutureFlags { turbo_extends_keyword: false, non_root_extends: false, + experimental_observability: false, }, ); assert!(result.is_err()); diff --git a/crates/turborepo-lib/src/turbo_json/raw.rs b/crates/turborepo-lib/src/turbo_json/raw.rs index 38a5eaa281b41..bc4dc5d02bed5 100644 --- a/crates/turborepo-lib/src/turbo_json/raw.rs +++ b/crates/turborepo-lib/src/turbo_json/raw.rs @@ -41,6 +41,46 @@ pub struct RawRemoteCacheOptions { pub upload_timeout: Option>, } +#[derive(Serialize, Default, Debug, Clone, Iterable, Deserializable)] +#[serde(rename_all = "camelCase")] +pub struct RawObservabilityOtel { + pub enabled: Option>, + pub protocol: Option>, + pub endpoint: Option>, + pub headers: Option>, + pub timeout_ms: Option>, + pub resource: Option>, + pub metrics: Option, +} + +#[derive(Serialize, Default, Debug, Clone, Iterable, Deserializable)] +#[serde(rename_all = "camelCase")] +pub struct RawExperimentalObservability { + pub otel: Option, +} + +#[derive(Serialize, Debug, Clone, Iterable, Deserializable)] +pub struct RawKeyValue { + pub key: Spanned, + pub value: Spanned, +} + +#[derive(Serialize, Default, Debug, Clone, Iterable, Deserializable)] +#[serde(rename_all = "camelCase")] +pub struct RawObservabilityOtelMetrics { + pub run_summary: Option>, + pub task_details: Option>, +} + +impl Default for RawKeyValue { + fn default() -> Self { + Self { + key: Spanned::new(UnescapedString::from(String::new())), + value: Spanned::new(UnescapedString::from(String::new())), + } + } +} + // Root turbo.json #[derive(Default, Debug, Clone, Iterable, Deserializable)] pub struct RawRootTurboJson { @@ -49,6 +89,8 @@ pub struct RawRootTurboJson { #[deserializable(rename = "$schema")] pub(crate) schema: Option, pub(crate) experimental_spaces: Option, + #[deserializable(rename = "experimentalObservability")] + pub(crate) experimental_observability: Option, // Global root filesystem dependencies pub(crate) global_dependencies: Option>>, @@ -142,6 +184,9 @@ pub struct RawTurboJson { pub concurrency: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub future_flags: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "experimentalObservability")] + pub experimental_observability: Option, #[deserializable(rename = "//")] #[serde(skip)] pub(crate) _comment: Option, @@ -186,6 +231,7 @@ impl From for RawTurboJson { span: root.span, schema: root.schema, experimental_spaces: root.experimental_spaces, + experimental_observability: root.experimental_observability, global_dependencies: root.global_dependencies, global_env: root.global_env, global_pass_through_env: root.global_pass_through_env, diff --git a/crates/turborepo-otel/Cargo.toml b/crates/turborepo-otel/Cargo.toml new file mode 100644 index 0000000000000..3fe17c4f76e63 --- /dev/null +++ b/crates/turborepo-otel/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "turborepo-otel" +version = "0.1.0" +edition = { workspace = true } +license = "MIT" + +[dependencies] +opentelemetry = { version = "0.31", features = ["metrics"] } +opentelemetry-otlp = { version = "0.31", features = [ + "grpc-tonic", + "http-proto", + "metrics", +] } +opentelemetry-semantic-conventions = "0.31" +opentelemetry_sdk = { version = "0.31", features = [ + "metrics", + "rt-tokio", + "experimental_async_runtime", + "experimental_metrics_periodicreader_with_async_runtime", +] } +serde = { workspace = true, optional = true } +serde_json = { workspace = true, optional = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tonic = "0.14" +tracing = { workspace = true } + +[features] +default = [] +serde = ["dep:serde", "dep:serde_json"] diff --git a/crates/turborepo-otel/examples/local-collector/README.md b/crates/turborepo-otel/examples/local-collector/README.md new file mode 100644 index 0000000000000..d372748dc8342 --- /dev/null +++ b/crates/turborepo-otel/examples/local-collector/README.md @@ -0,0 +1,137 @@ +# Local OTEL collector example + +This example shows how to build the `turbo` binary with the **OpenTelemetry (OTEL)** feature enabled and send metrics to a local collector running via **Docker Compose**. It’s intended as a lightweight, local integration harness for the OTEL exporter. + +## 1. Prerequisites + +- **Docker & docker compose** installed and running +- **Rust toolchain** (matching this repo’s `rust-toolchain.toml`) +- **pnpm** installed (per `CONTRIBUTING.md`) + +All commands below assume the repo root is `turborepo/`. + +## 2. Build `turbo` with the OTEL feature + +From the repo root: + +```bash +pnpm install +cargo build -p turbo --features otel +``` + +This produces an OTEL-enabled `turbo` binary at: `./target/debug/turbo` + +## 3. Start the local collector stack + +From this example directory: + +```bash +cd crates/turborepo-otel/examples/local-collector +docker compose up -d +``` + +This starts: + +- **`otel-collector`** (OTLP receiver + debug + Prometheus exporter) +- **`prometheus`** (scrapes metrics from the collector) +- **`grafana`** (optional visualization) + +Ports: + +- **OTLP gRPC**: `4317` +- **OTLP HTTP**: `4318` +- **Collector metrics / Prometheus exporter**: `8888`, `8889` +- **Prometheus UI**: `9090` +- **Grafana UI**: `3000` + +You can confirm the collector is ready: + +```bash +docker compose logs otel-collector +``` + +You should see a line similar to: + +```text +Everything is ready. Begin running and processing data. +``` + +## 4. Configure `turbo` to send metrics to the local collector + +In a new shell, from the repo root, export the OTEL env vars: + +```bash +cd /path/to/turborepo + +export TURBO_EXPERIMENTAL_OTEL_ENABLED=1 +export TURBO_EXPERIMENTAL_OTEL_PROTOCOL=grpc +export TURBO_EXPERIMENTAL_OTEL_ENDPOINT=http://127.0.0.1:4317 +export TURBO_EXPERIMENTAL_OTEL_RESOURCE="service.name=turborepo,env=local" +# Optional (defaults shown) +export TURBO_EXPERIMENTAL_OTEL_METRICS_RUN_SUMMARY=1 +export TURBO_EXPERIMENTAL_OTEL_METRICS_TASK_DETAILS=0 +``` + +These environment variables bypass `turbo.json` and directly configure the OTEL exporter. + +## 5. Run a task and emit metrics + +Use the **locally built** OTEL-enabled binary: + +```bash +./target/debug/turbo run lint --filter=turbo-ignore +``` + +You can replace `lint --filter=turbo-ignore` with any real task in this repo; the important part is that the command finishes so a run summary can be exported. + +## 6. Verify metrics reached the collector + +- **Collector logs (debug exporter)**: + + ```bash + cd crates/turborepo-otel/examples/local-collector + docker compose logs --tail=100 otel-collector + ``` + + You should see entries like: + + ```text + Metrics {"otelcol.component.id": "debug", "otelcol.signal": "metrics", "resource metrics": 1, "metrics": 4, "data points": 4} + Resource attributes: + -> env: Str(local) + -> service.name: Str(turborepo) + Metric #0 + -> Name: turbo.run.duration_ms + Metric #1 + -> Name: turbo.run.tasks.attempted + Metric #2 + -> Name: turbo.run.tasks.failed + Metric #3 + -> Name: turbo.run.tasks.cached + ``` + +- **Prometheus UI** (optional): + + - Open `http://localhost:9090` + - Query for metrics such as: + - `turbo.run.duration_ms` + - `turbo.run.tasks.attempted` + - `turbo.run.tasks.failed` + - `turbo.run.tasks.cached` + +- **Grafana UI** (optional): + + - Open `http://localhost:3000` (default credentials are usually `admin` / `admin`) + - Add a Prometheus data source pointing at `http://prometheus:9090` + - Build dashboards using the `turbo.*` metrics + +## 7. Cleanup + +To stop the local collector stack: + +```bash +cd crates/turborepo-otel/examples/local-collector +docker compose down +``` + +The OTEL-enabled `turbo` binary remains available at `./target/debug/turbo`. You can continue using it with the same environment variables to send metrics to this collector or to another OTLP-compatible backend. diff --git a/crates/turborepo-otel/examples/local-collector/docker-compose.yml b/crates/turborepo-otel/examples/local-collector/docker-compose.yml new file mode 100644 index 0000000000000..2c5bdca60836d --- /dev/null +++ b/crates/turborepo-otel/examples/local-collector/docker-compose.yml @@ -0,0 +1,27 @@ +services: + otel-collector: + image: otel/opentelemetry-collector-contrib:0.139.0 + volumes: + - ./otel-collector.yml:/etc/otelcol-contrib/config.yaml + ports: + - 1888:1888 # pprof extension + - 8888:8888 # Prometheus metrics exposed by the Collector + - 8889:8889 # Prometheus exporter metrics + - 13133:13133 # health_check extension + - 4317:4317 # OTLP gRPC receiver + - 4318:4318 # OTLP http receiver + - 55679:55679 # zpages extension + + prometheus: + image: prom/prometheus:latest + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + ports: + - "9090:9090" + depends_on: [otel-collector] + + grafana: + image: grafana/grafana:latest + ports: + - "3000:3000" + depends_on: [prometheus] diff --git a/crates/turborepo-otel/examples/local-collector/otel-collector.yml b/crates/turborepo-otel/examples/local-collector/otel-collector.yml new file mode 100644 index 0000000000000..b431acd603a57 --- /dev/null +++ b/crates/turborepo-otel/examples/local-collector/otel-collector.yml @@ -0,0 +1,21 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +exporters: + debug: + verbosity: detailed + prometheus: + endpoint: "0.0.0.0:8889" + resource_to_telemetry_conversion: + enabled: true + +service: + pipelines: + metrics: + receivers: [otlp] + exporters: [debug, prometheus] diff --git a/crates/turborepo-otel/examples/local-collector/prometheus.yml b/crates/turborepo-otel/examples/local-collector/prometheus.yml new file mode 100644 index 0000000000000..4cf1264eb933a --- /dev/null +++ b/crates/turborepo-otel/examples/local-collector/prometheus.yml @@ -0,0 +1,7 @@ +global: + scrape_interval: 5s + +scrape_configs: + - job_name: "otel-collector" + static_configs: + - targets: ["otel-collector:8889"] diff --git a/crates/turborepo-otel/src/lib.rs b/crates/turborepo-otel/src/lib.rs new file mode 100644 index 0000000000000..50513343ca182 --- /dev/null +++ b/crates/turborepo-otel/src/lib.rs @@ -0,0 +1,574 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + time::Duration, +}; + +use opentelemetry::{ + KeyValue, + metrics::{Counter, Histogram, Meter, MeterProvider as _}, +}; +use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig}; +use opentelemetry_sdk::{ + Resource, + metrics::{SdkMeterProvider, Temporality, periodic_reader_with_async_runtime}, + runtime::Tokio, +}; +use opentelemetry_semantic_conventions::resource::SERVICE_NAME; +use thiserror::Error; +use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue}; +use tracing::warn; + +/// Protocol supported by the OTLP exporter. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Protocol { + Grpc, + HttpProtobuf, +} + +/// Metric toggle configuration. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct MetricsConfig { + pub run_summary: bool, + pub task_details: bool, +} + +/// Resolved configuration for the exporter. +#[derive(Debug, Clone)] +pub struct Config { + pub endpoint: String, + pub protocol: Protocol, + pub headers: BTreeMap, + pub timeout: Duration, + pub resource_attributes: BTreeMap, + pub metrics: MetricsConfig, +} + +/// Summary of a Turborepo run encoded for metrics export. +#[derive(Debug)] +pub struct RunMetricsPayload { + pub run_id: String, + pub turbo_version: String, + pub duration_ms: f64, + pub attempted_tasks: u64, + pub failed_tasks: u64, + pub cached_tasks: u64, + pub exit_code: i32, + pub scm_branch: Option, + pub scm_revision: Option, + pub tasks: Vec, +} + +/// Per-task metrics details. +#[derive(Debug)] +pub struct TaskMetricsPayload { + pub task_id: String, + pub task: String, + pub package: String, + pub hash: String, + pub external_inputs_hash: String, + pub command: String, + pub duration_ms: Option, + pub cache_status: TaskCacheStatus, + pub cache_source: Option, + pub cache_time_saved_ms: Option, + pub exit_code: Option, +} + +/// Cache status for a task. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TaskCacheStatus { + Hit, + Miss, +} + +impl TaskCacheStatus { + fn as_str(&self) -> &'static str { + match self { + TaskCacheStatus::Hit => "hit", + TaskCacheStatus::Miss => "miss", + } + } +} + +/// Errors that can occur while configuring or using the exporter. +#[derive(Error, Debug)] +pub enum Error { + #[error("experimentalOtel requires an endpoint")] + MissingEndpoint, + #[error("failed to build OTLP exporter: {0}")] + Exporter(opentelemetry_otlp::ExporterBuildError), + #[error("invalid OTLP header `{0}`")] + InvalidHeader(String), +} + +struct Instruments { + run_duration: Histogram, + run_attempted: Counter, + run_failed: Counter, + run_cached: Counter, + task_duration: Histogram, + task_cache: Counter, +} + +struct HandleInner { + provider: SdkMeterProvider, + instruments: Arc, + metrics: MetricsConfig, +} + +/// Handle to the configured exporter. +#[derive(Clone)] +pub struct Handle { + inner: Arc, +} + +impl std::fmt::Debug for Handle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handle").finish_non_exhaustive() + } +} + +impl Handle { + pub fn try_new(config: Config) -> Result { + if config.endpoint.trim().is_empty() { + return Err(Error::MissingEndpoint); + } + + let provider = build_provider(&config)?; + let meter = provider.meter("turborepo"); + let instruments = Arc::new(create_instruments(&meter)); + + tracing::debug!( + target: "turborepo_otel", + "initialized otel exporter: endpoint={} protocol={:?} run_summary={} task_details={}", + config.endpoint, + config.protocol, + config.metrics.run_summary, + config.metrics.task_details + ); + + Ok(Self { + inner: Arc::new(HandleInner { + provider, + instruments, + metrics: config.metrics, + }), + }) + } + + pub fn record_run(&self, payload: &RunMetricsPayload) { + tracing::debug!( + target: "turborepo_otel", + "record_run payload: run_id={} attempted={} failed={} cached={}", + payload.run_id, + payload.attempted_tasks, + payload.failed_tasks, + payload.cached_tasks + ); + if self.inner.metrics.run_summary { + self.inner.instruments.record_run_summary(payload); + } + if self.inner.metrics.task_details { + self.inner.instruments.record_task_details(payload); + } + } + + pub fn shutdown(self) { + tracing::debug!(target = "turborepo_otel", "shutting down otel exporter"); + match Arc::try_unwrap(self.inner) { + Ok(inner) => { + if let Err(err) = inner.provider.shutdown() { + warn!("failed to shutdown otel exporter: {err}"); + } + } + Err(shared) => { + if let Err(err) = shared.provider.shutdown() { + warn!("failed to shutdown otel exporter: {err}"); + } + } + } + } +} + +impl Instruments { + fn record_run_summary(&self, payload: &RunMetricsPayload) { + tracing::debug!( + target: "turborepo_otel", + "record_run_summary run_id={} duration_ms={} attempted={}", + payload.run_id, + payload.duration_ms, + payload.attempted_tasks + ); + let attrs = build_run_attributes(payload); + self.run_duration.record(payload.duration_ms, &attrs); + self.run_attempted.add(payload.attempted_tasks, &attrs); + self.run_failed.add(payload.failed_tasks, &attrs); + self.run_cached.add(payload.cached_tasks, &attrs); + } + + fn record_task_details(&self, payload: &RunMetricsPayload) { + tracing::debug!( + target: "turborepo_otel", + "record_task_details run_id={} task_count={}", + payload.run_id, + payload.tasks.len() + ); + let base_attrs = build_run_attributes(payload); + for task in payload.tasks.iter() { + let mut attrs = base_attrs.clone(); + attrs.push(KeyValue::new("turbo.task.id", task.task_id.clone())); + attrs.push(KeyValue::new("turbo.task.name", task.task.clone())); + attrs.push(KeyValue::new("turbo.task.package", task.package.clone())); + attrs.push(KeyValue::new("turbo.task.hash", task.hash.clone())); + attrs.push(KeyValue::new( + "turbo.task.external_inputs_hash", + task.external_inputs_hash.clone(), + )); + attrs.push(KeyValue::new("turbo.task.command", task.command.clone())); + attrs.push(KeyValue::new( + "turbo.task.cache_status", + task.cache_status.as_str(), + )); + if let Some(source) = &task.cache_source { + attrs.push(KeyValue::new("turbo.task.cache_source", source.clone())); + } + if let Some(time_saved) = task.cache_time_saved_ms { + attrs.push(KeyValue::new( + "turbo.task.cache_time_saved_ms", + time_saved as i64, + )); + } + if let Some(exit_code) = task.exit_code { + attrs.push(KeyValue::new("turbo.task.exit_code", exit_code as i64)); + } + if let Some(duration) = task.duration_ms { + self.task_duration.record(duration, &attrs); + } + self.task_cache.add(1, &attrs); + } + } +} + +fn build_provider(config: &Config) -> Result { + let resource = build_resource(config); + + let temporality = default_temporality(); + let exporter = match config.protocol { + Protocol::Grpc => { + let export_config = opentelemetry_otlp::ExportConfig { + endpoint: Some(config.endpoint.clone()), + protocol: opentelemetry_otlp::Protocol::Grpc, + timeout: Some(config.timeout), + }; + let mut builder = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_temporality(temporality) + .with_export_config(export_config); + if !config.headers.is_empty() { + builder = builder.with_metadata(build_metadata(&config.headers)?); + } + builder.build().map_err(Error::Exporter)? + } + Protocol::HttpProtobuf => { + let export_config = opentelemetry_otlp::ExportConfig { + endpoint: Some(config.endpoint.clone()), + protocol: opentelemetry_otlp::Protocol::HttpBinary, + timeout: Some(config.timeout), + }; + let mut builder = opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_temporality(temporality) + .with_export_config(export_config); + if !config.headers.is_empty() { + let headers: HashMap<_, _> = config.headers.clone().into_iter().collect(); + builder = builder.with_headers(headers); + } + builder.build().map_err(Error::Exporter)? + } + }; + + let reader = periodic_reader_with_async_runtime::PeriodicReader::builder(exporter, Tokio) + .with_interval(Duration::from_secs(15)) + .build(); + + Ok(SdkMeterProvider::builder() + .with_resource(resource) + .with_reader(reader) + .build()) +} + +fn build_metadata(headers: &BTreeMap) -> Result { + let mut map = MetadataMap::new(); + for (key, value) in headers { + let metadata_key = MetadataKey::from_bytes(key.as_bytes()) + .map_err(|_| Error::InvalidHeader(key.clone()))?; + let metadata_value = MetadataValue::try_from(value.as_str()) + .map_err(|_| Error::InvalidHeader(key.clone()))?; + map.insert(metadata_key, metadata_value); + } + Ok(map) +} + +fn build_resource(config: &Config) -> Resource { + let mut attrs = Vec::with_capacity(config.resource_attributes.len() + 1); + let service_name = config + .resource_attributes + .get("service.name") + .cloned() + .unwrap_or_else(|| "turborepo".to_string()); + attrs.push(KeyValue::new(SERVICE_NAME, service_name)); + for (key, value) in config.resource_attributes.iter() { + if key == "service.name" { + continue; + } + attrs.push(KeyValue::new(key.clone(), value.clone())); + } + Resource::builder_empty().with_attributes(attrs).build() +} + +fn default_temporality() -> Temporality { + Temporality::Cumulative +} + +fn create_instruments(meter: &Meter) -> Instruments { + let run_duration = meter + .f64_histogram("turbo.run.duration_ms") + .with_description("Turborepo run duration in milliseconds") + .build(); + let run_attempted = meter + .u64_counter("turbo.run.tasks.attempted") + .with_description("Tasks attempted per run") + .build(); + let run_failed = meter + .u64_counter("turbo.run.tasks.failed") + .with_description("Tasks failed per run") + .build(); + let run_cached = meter + .u64_counter("turbo.run.tasks.cached") + .with_description("Tasks served from cache per run") + .build(); + let task_duration = meter + .f64_histogram("turbo.task.duration_ms") + .with_description("Task execution duration in milliseconds") + .build(); + let task_cache = meter + .u64_counter("turbo.task.cache.events") + .with_description("Cache hit/miss events") + .build(); + + Instruments { + run_duration, + run_attempted, + run_failed, + run_cached, + task_duration, + task_cache, + } +} + +fn build_run_attributes(payload: &RunMetricsPayload) -> Vec { + let mut attrs = Vec::with_capacity(6); + attrs.push(KeyValue::new("turbo.run.id", payload.run_id.clone())); + attrs.push(KeyValue::new( + "turbo.run.exit_code", + payload.exit_code.to_string(), + )); + attrs.push(KeyValue::new( + "turbo.version", + payload.turbo_version.clone(), + )); + if let Some(branch) = &payload.scm_branch { + attrs.push(KeyValue::new("turbo.scm.branch", branch.clone())); + } + if let Some(revision) = &payload.scm_revision { + attrs.push(KeyValue::new("turbo.scm.revision", revision.clone())); + } + attrs +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::*; + + #[test] + fn test_handle_try_new_empty_endpoint() { + let config = Config { + endpoint: "".to_string(), + protocol: Protocol::Grpc, + headers: BTreeMap::new(), + timeout: Duration::from_secs(10), + resource_attributes: BTreeMap::new(), + metrics: MetricsConfig::default(), + }; + let result = Handle::try_new(config); + assert!(result.is_err()); + match result.unwrap_err() { + Error::MissingEndpoint => {} + _ => panic!("Expected MissingEndpoint error"), + } + } + + #[test] + fn test_handle_try_new_whitespace_endpoint() { + let config = Config { + endpoint: " ".to_string(), + protocol: Protocol::Grpc, + headers: BTreeMap::new(), + timeout: Duration::from_secs(10), + resource_attributes: BTreeMap::new(), + metrics: MetricsConfig::default(), + }; + let result = Handle::try_new(config); + assert!(result.is_err()); + match result.unwrap_err() { + Error::MissingEndpoint => {} + _ => panic!("Expected MissingEndpoint error"), + } + } + + #[test] + fn test_build_metadata_valid() { + let mut headers = BTreeMap::new(); + headers.insert("authorization".to_string(), "Bearer token123".to_string()); + headers.insert("x-custom-header".to_string(), "value".to_string()); + + let result = build_metadata(&headers); + assert!(result.is_ok()); + let metadata = result.unwrap(); + assert_eq!(metadata.len(), 2); + } + + #[test] + fn test_build_metadata_invalid_key() { + let mut headers = BTreeMap::new(); + headers.insert("\0invalid".to_string(), "value".to_string()); + + let result = build_metadata(&headers); + assert!(result.is_err()); + match result.unwrap_err() { + Error::InvalidHeader(key) => { + assert_eq!(key, "\0invalid"); + } + _ => panic!("Expected InvalidHeader error"), + } + } + + #[test] + fn test_build_metadata_invalid_value() { + let mut headers = BTreeMap::new(); + headers.insert("valid-key".to_string(), "\0invalid-value".to_string()); + + let result = build_metadata(&headers); + assert!(result.is_err()); + match result.unwrap_err() { + Error::InvalidHeader(key) => { + assert_eq!(key, "valid-key"); + } + _ => panic!("Expected InvalidHeader error"), + } + } + + #[test] + fn test_build_resource_default_service_name() { + let config = Config { + endpoint: "https://example.com".to_string(), + protocol: Protocol::Grpc, + headers: BTreeMap::new(), + timeout: Duration::from_secs(10), + resource_attributes: BTreeMap::new(), + metrics: MetricsConfig::default(), + }; + let resource = build_resource(&config); + let attrs: Vec<_> = resource + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + assert!( + attrs + .iter() + .any(|(k, v)| *k == SERVICE_NAME && *v == "turborepo") + ); + } + + #[test] + fn test_build_resource_custom_service_name() { + let mut resource_attrs = BTreeMap::new(); + resource_attrs.insert("service.name".to_string(), "my-service".to_string()); + let config = Config { + endpoint: "https://example.com".to_string(), + protocol: Protocol::Grpc, + headers: BTreeMap::new(), + timeout: Duration::from_secs(10), + resource_attributes: resource_attrs, + metrics: MetricsConfig::default(), + }; + let resource = build_resource(&config); + let attrs: Vec<_> = resource + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + assert!( + attrs + .iter() + .any(|(k, v)| *k == SERVICE_NAME && *v == "my-service") + ); + } + + #[test] + fn test_build_resource_additional_attributes() { + let mut resource_attrs = BTreeMap::new(); + resource_attrs.insert("env".to_string(), "production".to_string()); + resource_attrs.insert("version".to_string(), "1.0.0".to_string()); + let config = Config { + endpoint: "https://example.com".to_string(), + protocol: Protocol::Grpc, + headers: BTreeMap::new(), + timeout: Duration::from_secs(10), + resource_attributes: resource_attrs, + metrics: MetricsConfig::default(), + }; + let resource = build_resource(&config); + let attrs: Vec<_> = resource + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + assert_eq!(attrs.len(), 3); + assert!( + attrs + .iter() + .any(|(k, v)| *k == SERVICE_NAME && *v == "turborepo") + ); + assert!(attrs.iter().any(|(k, v)| *k == "env" && *v == "production")); + assert!(attrs.iter().any(|(k, v)| *k == "version" && *v == "1.0.0")); + } + + #[test] + fn test_build_resource_no_duplicate_service_name() { + let mut resource_attrs = BTreeMap::new(); + resource_attrs.insert("service.name".to_string(), "custom".to_string()); + resource_attrs.insert("env".to_string(), "production".to_string()); + let config = Config { + endpoint: "https://example.com".to_string(), + protocol: Protocol::Grpc, + headers: BTreeMap::new(), + timeout: Duration::from_secs(10), + resource_attributes: resource_attrs, + metrics: MetricsConfig::default(), + }; + let resource = build_resource(&config); + let attrs: Vec<_> = resource + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect(); + let service_name_count = attrs.iter().filter(|(k, _)| *k == SERVICE_NAME).count(); + assert_eq!(service_name_count, 1); + assert!( + attrs + .iter() + .any(|(k, v)| *k == SERVICE_NAME && *v == "custom") + ); + } +} diff --git a/crates/turborepo/Cargo.toml b/crates/turborepo/Cargo.toml index c38a3018992ee..26b47fd8fa6c4 100644 --- a/crates/turborepo/Cargo.toml +++ b/crates/turborepo/Cargo.toml @@ -13,6 +13,7 @@ default = ["rustls-tls", "turborepo-lib/daemon-package-discovery"] native-tls = ["turborepo-lib/native-tls"] rustls-tls = ["turborepo-lib/rustls-tls"] pprof = ["turborepo-lib/pprof"] +otel = ["turborepo-lib/otel"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [build-dependencies] diff --git a/docs/site/content/docs/reference/configuration.mdx b/docs/site/content/docs/reference/configuration.mdx index 3fabffde8b6fa..9e0a46a8f27f0 100644 --- a/docs/site/content/docs/reference/configuration.mdx +++ b/docs/site/content/docs/reference/configuration.mdx @@ -705,3 +705,191 @@ Must start with `team_` or it will not be used. The slug of the Remote Cache team. Value will be passed as `slug` in the querystring for all Remote Cache HTTP calls. + +## Future flags + +Future flags allow you to opt-in to experimental features before they become the default behavior. + +```jsonc title="./turbo.json" +{ + "futureFlags": { + "experimentalObservability": true + } +} +``` + +### `experimentalObservability` + +Default: `false` + +When enabled, Turborepo will honor the `experimentalObservability.otel` configuration block (if present) to send run summaries to an OpenTelemetry Protocol (OTLP) collector. + + + Invalid or incomplete OTEL configuration (e.g., missing endpoint, invalid + protocol) results in metrics being disabled rather than causing runs to fail. + Turborepo will continue executing tasks normally even if the exporter cannot + be initialized. + + +## Experimental observability + +Experimental + +Configure Turborepo to export metrics to observability backends like Datadog, Prometheus, or other OTLP-compatible collectors. + +### `experimentalObservability` + +The `experimentalObservability` block is only read when `futureFlags.experimentalObservability` is set to `true` in your root `turbo.json`. Environment variables and CLI flags can still enable observability even if the future flag is disabled. + +```jsonc title="./turbo.json" +{ + "futureFlags": { + "experimentalObservability": true + }, + "experimentalObservability": { + "otel": { + "enabled": true, + "protocol": "grpc", + "endpoint": "https://api.datadoghq.com/api/v2/otlp", + "headers": { + "DD-API-KEY": "your-api-key" + }, + "timeoutMs": 10000, + "resource": { + "service.name": "turborepo" + }, + "metrics": { + "runSummary": true, + "taskDetails": false + } + } + } +} +``` + +#### `experimentalObservability.otel.enabled` + +Default: `true` (when endpoint is provided) + +Enable or disable the OpenTelemetry metrics exporter. + +#### `experimentalObservability.otel.protocol` + +Default: `"grpc"` + +The OTLP protocol to use. Supported values: + +- `"grpc"` - OTLP over gRPC +- `"http/protobuf"` - OTLP over HTTP with protobuf encoding + +#### `experimentalObservability.otel.endpoint` + +**Required** when using file-based configuration. + +The OTLP collector endpoint URL. For example: + +- Datadog: `"https://api.datadoghq.com/api/v2/otlp"` +- Custom collector: `"http://localhost:4317"` (gRPC) or `"http://localhost:4318"` (HTTP) + +If the endpoint is missing or empty when OTEL is enabled, the exporter will not be initialized and metrics will be disabled. The run will continue normally. + +#### `experimentalObservability.otel.headers` + +Optional HTTP headers to include with export requests. Useful for authentication (e.g., API keys) or custom metadata. + +```jsonc title="./turbo.json" +{ + "experimentalObservability": { + "otel": { + "headers": { + "DD-API-KEY": "your-datadog-api-key", + "X-Custom-Header": "value" + } + } + } +} +``` + +#### `experimentalObservability.otel.timeoutMs` + +Default: `10000` (10 seconds) + +Timeout in milliseconds for export requests to the collector. + +#### `experimentalObservability.otel.resource` + +Optional resource attributes to attach to all exported metrics. These help identify the source of metrics in your observability platform. + +```jsonc title="./turbo.json" +{ + "experimentalObservability": { + "otel": { + "resource": { + "service.name": "turborepo", + "service.namespace": "ci", + "deployment.environment": "production" + } + } + } +} +``` + +#### `experimentalObservability.otel.metrics` + +Control which metric groups are exported. + +##### `metrics.runSummary` + +Default: `true` + +Export run-level metrics: + +- Run duration +- Tasks attempted, failed, and cached +- Exit code +- SCM branch and revision + +##### `metrics.taskDetails` + +Default: `false` + +Export per-task metrics: + +- Task execution duration +- Cache hit/miss status and source +- Cache time saved +- Task identifiers (task ID, package, hash) + + + Exporter failures are logged but do not cause the run to fail. If the + collector is unavailable or misconfigured, Turborepo will continue executing + tasks normally. Invalid or incomplete configuration (e.g., missing endpoint + when enabled) results in the exporter not being initialized, and metrics will + be disabled for that run. + + +### Environment variables + +You can also configure observability via environment variables, which take precedence over `turbo.json` settings: + +- `TURBO_EXPERIMENTAL_OTEL_ENABLED` - Enable/disable exporter (`1` or `0`) +- `TURBO_EXPERIMENTAL_OTEL_PROTOCOL` - Protocol (`grpc` or `http/protobuf`) +- `TURBO_EXPERIMENTAL_OTEL_ENDPOINT` - Collector endpoint URL +- `TURBO_EXPERIMENTAL_OTEL_TIMEOUT_MS` - Timeout in milliseconds +- `TURBO_EXPERIMENTAL_OTEL_HEADERS` - Comma-separated key=value pairs (e.g., `"DD-API-KEY=key,Header=value"`) +- `TURBO_EXPERIMENTAL_OTEL_RESOURCE` - Comma-separated key=value pairs for resource attributes +- `TURBO_EXPERIMENTAL_OTEL_METRICS_RUN_SUMMARY` - Enable run summary metrics (`1` or `0`) +- `TURBO_EXPERIMENTAL_OTEL_METRICS_TASK_DETAILS` - Enable task details metrics (`1` or `0`) + +### CLI flags + +You can override observability settings via CLI flags: + +- `--experimental-otel-enabled` - Enable/disable exporter +- `--experimental-otel-protocol` - Protocol (`grpc` or `http/protobuf`) +- `--experimental-otel-endpoint` - Collector endpoint URL +- `--experimental-otel-timeout-ms` - Timeout in milliseconds +- `--experimental-otel-header KEY=VALUE` - Add HTTP header (can be repeated) +- `--experimental-otel-resource KEY=VALUE` - Add resource attribute (can be repeated) +- `--experimental-otel-metrics-run-summary` - Enable run summary metrics +- `--experimental-otel-metrics-task-details` - Enable task details metrics diff --git a/docs/site/dictionary.txt b/docs/site/dictionary.txt index b3c4d20fbcab6..6db9fb60d287c 100644 --- a/docs/site/dictionary.txt +++ b/docs/site/dictionary.txt @@ -62,6 +62,7 @@ zero-config DevTools TSX backend +backends pre-configured Vue SvelteKit @@ -278,3 +279,19 @@ TailwindCSS TailwindCSS's frontend frontends +grpc +gRPC +observability +opentelemetry +OpenTelemetry +otlp +OTLP +otlp-compatible +OTLP-compatible +protobuf +scm +SCM +datadog +Datadog +otel +OTEL diff --git a/turborepo-tests/integration/tests/other/experimental-otel.t b/turborepo-tests/integration/tests/other/experimental-otel.t new file mode 100644 index 0000000000000..55c240df5f327 --- /dev/null +++ b/turborepo-tests/integration/tests/other/experimental-otel.t @@ -0,0 +1,104 @@ +# Smoke tests for experimental OTEL configuration. +# These tests verify that enabling/disabling OTEL via environment variables and CLI flags +# does not break normal turbo run behavior. Exporter correctness is primarily covered +# by Rust unit tests in crates/turborepo-lib/src/config/experimental_otel.rs and +# crates/turborepo-lib/src/observability/otel.rs. + +Setup + $ . ${TESTDIR}/../../../helpers/setup_integration_test.sh + +Smoke test: OTEL enabled via environment variables does not break turbo run + $ export TURBO_EXPERIMENTAL_OTEL_ENABLED=1 + $ export TURBO_EXPERIMENTAL_OTEL_ENDPOINT=http://localhost:4318 + $ ${TURBO} run build --filter=my-app + \xe2\x80\xa2 Packages in scope: my-app (esc) + \xe2\x80\xa2 Running build in 1 packages (esc) + \xe2\x80\xa2 Remote caching disabled (esc) + my-app:build: cache miss, executing .* (re) + my-app:build: + my-app:build: > build + my-app:build: > echo building + my-app:build: + my-app:build: building + + Tasks: 1 successful, 1 total + Cached: 0 cached, 1 total + Time:\s*[\.0-9]+m?s (re) + + WARNING no output files found for task my-app#build. Please check your `outputs` key in `turbo.json` + + +Smoke test: OTEL enabled via CLI flags does not break turbo run + $ unset TURBO_EXPERIMENTAL_OTEL_ENABLED + $ unset TURBO_EXPERIMENTAL_OTEL_ENDPOINT + $ ${TURBO} run build --filter=my-app --experimental-otel-enabled --experimental-otel-endpoint=http://localhost:4318 + \xe2\x80\xa2 Packages in scope: my-app (esc) + \xe2\x80\xa2 Running build in 1 packages (esc) + \xe2\x80\xa2 Remote caching disabled (esc) + my-app:build: cache hit, replaying logs .* (re) + my-app:build: + my-app:build: > build + my-app:build: > echo building + my-app:build: + my-app:build: building + + Tasks: 1 successful, 1 total + Cached: 1 cached, 1 total + Time:\s*[\.0-9]+m?s\s*>>> FULL TURBO (re) + + +Smoke test: http/protobuf protocol flag is accepted without error + $ ${TURBO} run build --filter=my-app --experimental-otel-enabled --experimental-otel-endpoint=http://localhost:4318 --experimental-otel-protocol=http-protobuf + \xe2\x80\xa2 Packages in scope: my-app (esc) + \xe2\x80\xa2 Running build in 1 packages (esc) + \xe2\x80\xa2 Remote caching disabled (esc) + my-app:build: cache hit, replaying logs .* (re) + my-app:build: + my-app:build: > build + my-app:build: > echo building + my-app:build: + my-app:build: building + + Tasks: 1 successful, 1 total + Cached: 1 cached, 1 total + Time:\s*[\.0-9]+m?s\s*>>> FULL TURBO (re) + + +Smoke test: OTEL disabled via environment variable does not break turbo run + $ export TURBO_EXPERIMENTAL_OTEL_ENABLED=0 + $ export TURBO_EXPERIMENTAL_OTEL_ENDPOINT=http://localhost:4318 + $ ${TURBO} run build --filter=my-app + \xe2\x80\xa2 Packages in scope: my-app (esc) + \xe2\x80\xa2 Running build in 1 packages (esc) + \xe2\x80\xa2 Remote caching disabled (esc) + my-app:build: cache hit, replaying logs .* (re) + my-app:build: + my-app:build: > build + my-app:build: > echo building + my-app:build: + my-app:build: building + + Tasks: 1 successful, 1 total + Cached: 1 cached, 1 total + Time:\s*[\.0-9]+m?s\s*>>> FULL TURBO (re) + + +Smoke test: enabled via env without endpoint is a no-op (exporter not configured) + $ export TURBO_EXPERIMENTAL_OTEL_ENABLED=1 + $ unset TURBO_EXPERIMENTAL_OTEL_ENDPOINT + $ ${TURBO} run build --filter=my-app + \xe2\x80\xa2 Packages in scope: my-app (esc) + \xe2\x80\xa2 Running build in 1 packages (esc) + \xe2\x80\xa2 Remote caching disabled (esc) + my-app:build: cache hit, replaying logs .* (re) + my-app:build: + my-app:build: > build + my-app:build: > echo building + my-app:build: + my-app:build: building + + Tasks: 1 successful, 1 total + Cached: 1 cached, 1 total + Time:\s*[\.0-9]+m?s\s*>>> FULL TURBO (re) + + diff --git a/turborepo-tests/integration/tests/other/no-args.t b/turborepo-tests/integration/tests/other/no-args.t index cd6f13b67afac..c5480f3151b60 100644 --- a/turborepo-tests/integration/tests/other/no-args.t +++ b/turborepo-tests/integration/tests/other/no-args.t @@ -59,6 +59,22 @@ Make sure exit code is 2 when no args are passed Specify a file to save a pprof trace --verbosity Verbosity level. Useful when debugging Turborepo or creating logs for issue reports + --experimental-otel-enabled [] + [possible values: true, false] + --experimental-otel-protocol + [possible values: grpc, http-protobuf] + --experimental-otel-endpoint + + --experimental-otel-timeout-ms + + --experimental-otel-header + + --experimental-otel-resource + + --experimental-otel-metrics-run-summary [] + [possible values: true, false] + --experimental-otel-metrics-task-details [] + [possible values: true, false] --dangerously-disable-package-manager-check Allow for missing `packageManager` in `package.json` --root-turbo-json diff --git a/turborepo-tests/integration/tests/other/turbo-help.t b/turborepo-tests/integration/tests/other/turbo-help.t index 71df343af280d..ea1bfc5ca35f8 100644 --- a/turborepo-tests/integration/tests/other/turbo-help.t +++ b/turborepo-tests/integration/tests/other/turbo-help.t @@ -59,6 +59,22 @@ Test help flag Specify a file to save a pprof trace --verbosity Verbosity level. Useful when debugging Turborepo or creating logs for issue reports + --experimental-otel-enabled [] + [possible values: true, false] + --experimental-otel-protocol + [possible values: grpc, http-protobuf] + --experimental-otel-endpoint + + --experimental-otel-timeout-ms + + --experimental-otel-header + + --experimental-otel-resource + + --experimental-otel-metrics-run-summary [] + [possible values: true, false] + --experimental-otel-metrics-task-details [] + [possible values: true, false] --dangerously-disable-package-manager-check Allow for missing `packageManager` in `package.json` --root-turbo-json @@ -205,6 +221,30 @@ Test help flag --verbosity Verbosity level. Useful when debugging Turborepo or creating logs for issue reports + --experimental-otel-enabled [] + [possible values: true, false] + + --experimental-otel-protocol + [possible values: grpc, http-protobuf] + + --experimental-otel-endpoint + + + --experimental-otel-timeout-ms + + + --experimental-otel-header + + + --experimental-otel-resource + + + --experimental-otel-metrics-run-summary [] + [possible values: true, false] + + --experimental-otel-metrics-task-details [] + [possible values: true, false] + --dangerously-disable-package-manager-check Allow for missing `packageManager` in `package.json`. @@ -324,6 +364,12 @@ Test help flag [possible values: auto, none, task] + + + + + + Test help flag for link command $ ${TURBO} link -h Link your local directory to a Vercel organization and enable remote caching @@ -371,6 +417,22 @@ Test help flag for link command Specify a file to save a pprof trace --verbosity Verbosity level. Useful when debugging Turborepo or creating logs for issue reports + --experimental-otel-enabled [] + [possible values: true, false] + --experimental-otel-protocol + [possible values: grpc, http-protobuf] + --experimental-otel-endpoint + + --experimental-otel-timeout-ms + + --experimental-otel-header + + --experimental-otel-resource + + --experimental-otel-metrics-run-summary [] + [possible values: true, false] + --experimental-otel-metrics-task-details [] + [possible values: true, false] --dangerously-disable-package-manager-check Allow for missing `packageManager` in `package.json` --root-turbo-json @@ -419,6 +481,22 @@ Test help flag for unlink command Specify a file to save a pprof trace --verbosity Verbosity level. Useful when debugging Turborepo or creating logs for issue reports + --experimental-otel-enabled [] + [possible values: true, false] + --experimental-otel-protocol + [possible values: grpc, http-protobuf] + --experimental-otel-endpoint + + --experimental-otel-timeout-ms + + --experimental-otel-header + + --experimental-otel-resource + + --experimental-otel-metrics-run-summary [] + [possible values: true, false] + --experimental-otel-metrics-task-details [] + [possible values: true, false] --dangerously-disable-package-manager-check Allow for missing `packageManager` in `package.json` --root-turbo-json @@ -471,6 +549,22 @@ Test help flag for login command Specify a file to save a pprof trace --verbosity Verbosity level. Useful when debugging Turborepo or creating logs for issue reports + --experimental-otel-enabled [] + [possible values: true, false] + --experimental-otel-protocol + [possible values: grpc, http-protobuf] + --experimental-otel-endpoint + + --experimental-otel-timeout-ms + + --experimental-otel-header + + --experimental-otel-resource + + --experimental-otel-metrics-run-summary [] + [possible values: true, false] + --experimental-otel-metrics-task-details [] + [possible values: true, false] --dangerously-disable-package-manager-check Allow for missing `packageManager` in `package.json` --root-turbo-json @@ -519,6 +613,22 @@ Test help flag for logout command Specify a file to save a pprof trace --verbosity Verbosity level. Useful when debugging Turborepo or creating logs for issue reports + --experimental-otel-enabled [] + [possible values: true, false] + --experimental-otel-protocol + [possible values: grpc, http-protobuf] + --experimental-otel-endpoint + + --experimental-otel-timeout-ms + + --experimental-otel-header + + --experimental-otel-resource + + --experimental-otel-metrics-run-summary [] + [possible values: true, false] + --experimental-otel-metrics-task-details [] + [possible values: true, false] --dangerously-disable-package-manager-check Allow for missing `packageManager` in `package.json` --root-turbo-json