+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,8 +827,13 @@ func (api *Api) Integration(w http.ResponseWriter, r *http.Request, u *db.User)
config.Database = cfg.Database
config.TlsEnable = cfg.TlsEnable
config.TlsSkipVerify = cfg.TlsSkipVerify

if ci, err = clickhouse.GetClusterInfo(r.Context(), config); err != nil {
cInfo, err := api.collector.GetClickhouseClusterInfo(project)
if err != nil {
klog.Errorln(err)
http.Error(w, "", http.StatusInternalServerError)
return
}
if ci, err = clickhouse.GetClusterInfo(r.Context(), config, cInfo); err != nil {
klog.Errorln(err)
}
}
Expand Down Expand Up @@ -1667,11 +1672,11 @@ func (api *Api) GetClickhouseClient(project *db.Project) (*clickhouse.Client, er
config.Database = cfg.Database
config.TlsEnable = cfg.TlsEnable
config.TlsSkipVerify = cfg.TlsSkipVerify
distributed, err := api.collector.IsClickhouseDistributed(project)
clusterInfo, err := api.collector.GetClickhouseClusterInfo(project)
if err != nil {
return nil, err
}
return clickhouse.NewClient(config, distributed)
return clickhouse.NewClient(config, clusterInfo)
}

func GetApplicationId(r *http.Request) (model.ApplicationId, error) {
Expand Down
3 changes: 2 additions & 1 deletion api/forms/forms.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"

"github.com/coroot/coroot/clickhouse"
"github.com/coroot/coroot/collector"
"github.com/coroot/coroot/db"
"github.com/coroot/coroot/model"
"github.com/coroot/coroot/notifications"
Expand Down Expand Up @@ -422,7 +423,7 @@ func (f *IntegrationFormClickhouse) Test(ctx context.Context, project *db.Projec
config.Database = f.Database
config.TlsEnable = f.TlsEnable
config.TlsSkipVerify = f.TlsSkipVerify
client, err := clickhouse.NewClient(config, false)
client, err := clickhouse.NewClient(config, collector.ClickHouseInfo{})
if err != nil {
return err
}
Expand Down
66 changes: 44 additions & 22 deletions clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ func NewClientConfig(address, user, password string) ClientConfig {
}

type Client struct {
config ClientConfig
conn clickhouse.Conn
useDistributedTables bool
config ClientConfig
conn clickhouse.Conn
chInfo collector.ClickHouseInfo
}

func NewClient(config ClientConfig, distributed bool) (*Client, error) {
func NewClient(config ClientConfig, chInfo collector.ClickHouseInfo) (*Client, error) {
opts := &clickhouse.Options{
Addr: []string{config.Address},
Auth: clickhouse.Auth{
Expand Down Expand Up @@ -81,20 +81,20 @@ func NewClient(config ClientConfig, distributed bool) (*Client, error) {
if err != nil {
return nil, err
}
return &Client{config: config, conn: conn, useDistributedTables: distributed}, nil
return &Client{config: config, conn: conn, chInfo: chInfo}, nil
}

func (c *Client) Ping(ctx context.Context) error {
return c.conn.Ping(ctx)
}

func (c *Client) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
query = collector.ReplaceTables(query, c.useDistributedTables)
query = collector.ReplaceTables(query, c.chInfo.UseDistributed())
return c.conn.Query(ctx, query, args...)
}

func (c *Client) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
query = collector.ReplaceTables(query, c.useDistributedTables)
query = collector.ReplaceTables(query, c.chInfo.UseDistributed())
return c.conn.QueryRow(ctx, query, args...)
}

Expand All @@ -106,20 +106,23 @@ func (c *Client) Close() error {
}

func (c *Client) getClusterTopology(ctx context.Context) ([]ClusterNode, error) {
clusterQuery := `
var clusterName string
if c.chInfo.Cloud {
clusterName = "default"
} else {
clusterQuery := `
SELECT DISTINCT replaceRegexpOne(engine_full, '^Distributed\\(''([^'']+)''.*', '\\1') as cluster_name
FROM system.tables
WHERE engine = 'Distributed'
AND database = currentDatabase()
LIMIT 1`

var clusterName string
row := c.conn.QueryRow(ctx, clusterQuery)
if err := row.Scan(&clusterName); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []ClusterNode{}, nil
row := c.conn.QueryRow(ctx, clusterQuery)
if err := row.Scan(&clusterName); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []ClusterNode{}, nil
}
return nil, err
}
return nil, err
}

topologyQuery := `
Expand Down Expand Up @@ -255,6 +258,15 @@ func (c *Client) GetDiskInfo(ctx context.Context) ([]DiskInfo, error) {
return disks, nil
}

func (c *Client) IsCloud(ctx context.Context) (bool, error) {
var cloudMode bool
err := c.conn.QueryRow(ctx, "SELECT toBool(value) FROM system.settings WHERE name = 'cloud_mode';").Scan(&cloudMode)
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return cloudMode, err
}

func parseTTLToSeconds(ttlExpr string) uint64 {
ttlExpr = strings.TrimSpace(ttlExpr)

Expand Down Expand Up @@ -349,8 +361,8 @@ type ClusterInfo struct {
ServerDisks []ServerDiskInfo `json:"server_disks,omitempty"`
}

func GetClusterInfo(ctx context.Context, cfg ClientConfig) (*ClusterInfo, error) {
ch, err := NewClient(cfg, false)
func GetClusterInfo(ctx context.Context, cfg ClientConfig, info collector.ClickHouseInfo) (*ClusterInfo, error) {
ch, err := NewClient(cfg, info)
if err != nil {
return nil, err
}
Expand All @@ -360,11 +372,11 @@ func GetClusterInfo(ctx context.Context, cfg ClientConfig) (*ClusterInfo, error)
klog.Errorln("failed to get ClickHouse cluster topology:", err)
return ci, nil
}
if ci.TableSizes, err = getClusterTableSizes(ctx, cfg, ci.Topology); err != nil {
if ci.TableSizes, err = getClusterTableSizes(ctx, cfg, ci.Topology, ch, info); err != nil {
klog.Errorln("failed to get ClickHouse table sizes:", err)
return ci, nil
}
if ci.ServerDisks, err = getClusterServerDisks(ctx, cfg, ci.Topology); err != nil {
if ci.ServerDisks, err = getClusterServerDisks(ctx, cfg, ci.Topology, info); err != nil {
klog.Errorln("failed to get ClickHouse server disks:", err)
return ci, nil
}
Expand Down Expand Up @@ -398,7 +410,7 @@ func executeOnAllServers(ctx context.Context, config ClientConfig, topology []Cl
clientConfig := config
clientConfig.Address = addr

client, err := NewClient(clientConfig, false)
client, err := NewClient(clientConfig, collector.ClickHouseInfo{})
if err != nil {
resultsChan <- serverExecResult{addr: addr, err: err}
return
Expand Down Expand Up @@ -432,7 +444,14 @@ func executeOnAllServers(ctx context.Context, config ClientConfig, topology []Cl
}
}

func getClusterTableSizes(ctx context.Context, config ClientConfig, topology []ClusterNode) ([]TableInfo, error) {
func getClusterTableSizes(ctx context.Context, config ClientConfig, topology []ClusterNode, ch *Client, info collector.ClickHouseInfo) ([]TableInfo, error) {
if info.Cloud {
allTables, err := ch.GetTableSizes(ctx)
if err != nil {
return nil, err
}
return aggregateTableStats(allTables), nil
}
results, err := executeOnAllServers(ctx, config, topology, func(client *Client) (interface{}, error) {
return client.GetTableSizes(ctx)
})
Expand All @@ -453,7 +472,10 @@ func getClusterTableSizes(ctx context.Context, config ClientConfig, topology []C
return aggregateTableStats(allTables), nil
}

func getClusterServerDisks(ctx context.Context, config ClientConfig, topology []ClusterNode) ([]ServerDiskInfo, error) {
func getClusterServerDisks(ctx context.Context, config ClientConfig, topology []ClusterNode, info collector.ClickHouseInfo) ([]ServerDiskInfo, error) {
if info.Cloud {
return nil, nil
}
results, err := executeOnAllServers(ctx, config, topology, func(client *Client) (interface{}, error) {
return client.GetDiskInfo(ctx)
})
Expand Down
14 changes: 12 additions & 2 deletions clickhouse/space_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"strconv"

"github.com/coroot/coroot/collector"
"github.com/coroot/coroot/config"
"github.com/coroot/coroot/db"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -51,7 +52,7 @@ func (sm *SpaceManager) runCleanupOnReplica(ctx context.Context, replicaAddr str
config.TlsEnable = sm.client.config.TlsEnable
config.TlsSkipVerify = sm.client.config.TlsSkipVerify

client, err := NewClient(config, false)
client, err := NewClient(config, collector.ClickHouseInfo{})
if err != nil {
return fmt.Errorf("failed to create client for replica %s: %w", replicaAddr, err)
}
Expand Down Expand Up @@ -217,12 +218,21 @@ func runSpaceManagerOnCluster(ctx context.Context, managerCfg config.ClickHouseS
config.TlsEnable = cfg.TlsEnable
config.TlsSkipVerify = cfg.TlsSkipVerify

client, err := NewClient(config, false)
client, err := NewClient(config, collector.ClickHouseInfo{})
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
defer client.Close()

cloud, err := client.IsCloud(ctx)
if err != nil {
return err
}
if cloud {
klog.Infoln("storage manager is disabled for ClickHouse Cloud")
return nil
}

spaceManager := &SpaceManager{
client: client,
cfg: managerCfg,
Expand Down
43 changes: 31 additions & 12 deletions collector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"fmt"
"strconv"
"strings"
"time"

Expand All @@ -12,11 +13,13 @@ import (
chproto "github.com/ClickHouse/ch-go/proto"
"github.com/coroot/coroot/db"
"golang.org/x/exp/maps"
"k8s.io/klog"
)

type ClickhouseClient struct {
pool *chpool.Pool
cluster string
cloud bool
}

func NewClickhouseClient(ctx context.Context, cfg *db.IntegrationClickhouse) (*ClickhouseClient, error) {
Expand All @@ -40,11 +43,9 @@ func NewClickhouseClient(ctx context.Context, cfg *db.IntegrationClickhouse) (*C
return nil, err
}
c := &ClickhouseClient{pool: pool}
cluster, err := c.getCluster(ctx)
if err != nil {
if err = c.info(ctx, opts.Address); err != nil {
return nil, err
}
c.cluster = cluster
return c, nil
}

Expand Down Expand Up @@ -72,14 +73,31 @@ func (c *ClickhouseClient) Close() {
}
}

func (c *ClickhouseClient) getCluster(ctx context.Context) (string, error) {
func (c *ClickhouseClient) info(ctx context.Context, address string) error {
var exists chproto.ColUInt8
q := ch.Query{Body: "EXISTS system.zookeeper", Result: chproto.Results{{Name: "result", Data: &exists}}}
if err := c.pool.Do(ctx, q); err != nil {
return "", err
return err
}
if exists.Row(0) != 1 {
return "", nil
return nil
}

var modeStr chproto.ColStr
q = ch.Query{
Body: "SELECT value FROM system.settings WHERE name = 'cloud_mode_engine'",
Result: chproto.Results{{Name: "value", Data: &modeStr}},
}
if err := c.pool.Do(ctx, q); err != nil {
return err
}
if modeStr.Rows() > 0 {
mode, _ := strconv.ParseUint(modeStr.Row(0), 10, 64)
if mode >= 2 {
klog.Infoln(address, "is a ClickHouse cloud instance")
c.cloud = true
return nil
}
}
var clusterCol chproto.ColStr
clusters := map[string]bool{}
Expand All @@ -96,17 +114,18 @@ func (c *ClickhouseClient) getCluster(ctx context.Context) (string, error) {
},
}
if err := c.pool.Do(ctx, q); err != nil {
return "", err
return err
}
switch {
case len(clusters) == 0:
return "", nil
case len(clusters) == 1:
return maps.Keys(clusters)[0], nil
c.cluster = maps.Keys(clusters)[0]
case clusters["coroot"]:
return "coroot", nil
c.cluster = "coroot"
case clusters["default"]:
return "default", nil
c.cluster = "default"
default:
return fmt.Errorf(`multiple ClickHouse clusters found, but neither "coroot" nor "default" cluster found`)
}
return "", fmt.Errorf(`multiple ClickHouse clusters found, but neither "coroot" nor "default" cluster found`)
return nil
}
15 changes: 12 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ var (
ErrClickhouseNotConfigured = errors.New("clickhouse integration is not configured")
)

type ClickHouseInfo struct {
Name string
Cloud bool
}

func (ci ClickHouseInfo) UseDistributed() bool {
return !ci.Cloud && ci.Name != ""
}

type Config struct {
TracesTTL timeseries.Duration
LogsTTL timeseries.Duration
Expand Down Expand Up @@ -274,10 +283,10 @@ func (c *Collector) getProfilesBatch(project *db.Project) *ProfilesBatch {
return b
}

func (c *Collector) IsClickhouseDistributed(project *db.Project) (bool, error) {
func (c *Collector) GetClickhouseClusterInfo(project *db.Project) (ClickHouseInfo, error) {
client, err := c.getClickhouseClient(project)
if err != nil {
return false, err
return ClickHouseInfo{}, err
}
return client.cluster != "", nil
return ClickHouseInfo{Name: client.cluster, Cloud: client.cloud}, nil
}
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载