+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
yaml "gopkg.in/yaml.v3"
)

// DefaultConfigBaseName is the default conduit configuration filename without the extension.
Expand Down
2 changes: 1 addition & 1 deletion conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
yaml "gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

Expand Down
23 changes: 12 additions & 11 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -914,32 +915,32 @@ func TestMetrics(t *testing.T) {
if strings.HasSuffix(*stat.Name, metrics.BlockImportTimeName) {
found++
// 1 hour in seconds
assert.Contains(t, stat.String(), "sample_count:1 sample_sum:3600")
assert.Regexp(t, regexp.MustCompile("sample_count:1 +sample_sum:3600"), stat.String())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@winder
For mysterious reasons, this tests started failing on me and I had to be more lenient on the assertions in order to pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would guess it's related to upgrading the prometheus versions.

Am I reading the regex correctly, that there is additional whitespace and some control character changes (quotes instead of angles?) but the data is otherwise the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We now have { instead of < and unpredictable white space.

}
if strings.HasSuffix(*stat.Name, metrics.ImportedRoundGaugeName) {
found++
assert.Contains(t, stat.String(), "value:1234")
}
if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsPerBlockName) {
found++
assert.Contains(t, stat.String(), "sample_count:1 sample_sum:14")
assert.Regexp(t, regexp.MustCompile("sample_count:1 +sample_sum:14"), stat.String())
}
if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsName) {
found++
str := stat.String()
// the 6 single txns
assert.Contains(t, str, `label:<name:"txn_type" value:"acfg" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"afrz" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"axfer" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"keyreg" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"pay" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"stpf" > gauge:<value:1 >`)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"acfg". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"afrz". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"axfer". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"keyreg". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"pay". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"stpf". +gauge:.value:1.`), str)

// 2 app call txns
assert.Contains(t, str, `label:<name:"txn_type" value:"appl" > gauge:<value:2 >`)
// // 2 app call txns
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"appl". +gauge:.value:2.`), str)

// 1 app had 6 inner txns
assert.Contains(t, str, `label:<name:"txn_type" value:"inner" > gauge:<value:6 >`)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"inner". +gauge:.value:6.`), str)
}
}
assert.Equal(t, 4, found)
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package plugins

import "gopkg.in/yaml.v3"
import yaml "gopkg.in/yaml.v3"

// PluginConfig is a generic string which can be deserialized by each individual Plugin
type PluginConfig struct {
Expand Down
21 changes: 12 additions & 9 deletions conduit/plugins/exporters/postgresql/postgresql_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ func (exp *postgresqlExporter) Metadata() plugins.Metadata {
}

// createIndexerDB common code for creating the IndexerDb instance.
func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginConfig) (idb.IndexerDb, chan struct{}, error) {
func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginConfig) (idb.IndexerDb, chan struct{}, ExporterConfig, error) {
var eCfg ExporterConfig
if err := cfg.UnmarshalConfig(&eCfg); err != nil {
return nil, nil, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
return nil, nil, eCfg, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
}
logger.Debugf("createIndexerDB: eCfg.Delete=%+v", eCfg.Delete)

// Inject a dummy db for unit testing
dbName := "postgres"
Expand All @@ -73,14 +74,14 @@ func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginCon
// connecting to a local instance that's running.
// this behavior can be reproduced in TestConnectDbFailure.
if !eCfg.Test && eCfg.ConnectionString == "" {
return nil, nil, fmt.Errorf("connection string is empty for %s", dbName)
return nil, nil, eCfg, fmt.Errorf("connection string is empty for %s", dbName)
}
db, ready, err := idb.IndexerDbByName(dbName, eCfg.ConnectionString, opts, logger)
if err != nil {
return nil, nil, fmt.Errorf("connect failure constructing db, %s: %v", dbName, err)
return nil, nil, eCfg, fmt.Errorf("connect failure constructing db, %s: %v", dbName, err)
}

return db, ready, nil
return db, ready, eCfg, nil
}

// RoundRequest connects to the database, queries the round, and closes the
Expand All @@ -90,7 +91,7 @@ func (exp *postgresqlExporter) RoundRequest(cfg plugins.PluginConfig) (uint64, e
nullLogger := logrus.New()
nullLogger.Out = io.Discard // no logging

db, _, err := createIndexerDB(nullLogger, true, cfg)
db, _, _, err := createIndexerDB(nullLogger, true, cfg)
if err != nil {
// Assume the error is related to an uninitialized DB.
// If it is something more serious, the failure will be detected during Init.
Expand All @@ -112,10 +113,11 @@ func (exp *postgresqlExporter) Init(ctx context.Context, initProvider data.InitP
exp.ctx, exp.cf = context.WithCancel(ctx)
exp.logger = logger

db, ready, err := createIndexerDB(exp.logger, false, cfg)
db, ready, exporterConfig, err := createIndexerDB(exp.logger, false, cfg)
if err != nil {
return fmt.Errorf("db create error: %v", err)
}
exp.cfg = exporterConfig
<-ready

exp.db = db
Expand All @@ -132,8 +134,9 @@ func (exp *postgresqlExporter) Init(ctx context.Context, initProvider data.InitP
}
exp.round = uint64(initProvider.NextDBRound())

// if data pruning is enabled
if !exp.cfg.Test && exp.cfg.Delete.Rounds > 0 {
dataPruningEnabled := !exp.cfg.Test && exp.cfg.Delete.Rounds > 0
exp.logger.Debugf("postgresql exporter Init(): data pruning enabled: %t; exp.cfg.Delete: %+v", dataPruningEnabled, exp.cfg.Delete)
if dataPruningEnabled {
exp.dm = util.MakeDataManager(exp.ctx, &exp.cfg.Delete, exp.db, logger)
exp.wg.Add(1)
go exp.dm.DeleteLoop(&exp.wg, &exp.round)
Expand Down
2 changes: 2 additions & 0 deletions conduit/plugins/exporters/postgresql/util/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func MakeDataManager(ctx context.Context, cfg *PruneConfigurations, db idb.Index
func (p *postgresql) DeleteLoop(wg *sync.WaitGroup, nextRound *uint64) {

defer wg.Done()

p.logger.Debugf("DeleteLoop(): starting delete loop")
// If the interval is disabled
if p.config.Interval == disabled {
// A helpful warning to say that despite a number of rounds being above 0
Expand Down
2 changes: 1 addition & 1 deletion conduit/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/opensearch-project/opensearch-go/v2"
opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"

"github.com/algorand/conduit/version"
Expand Down
2 changes: 1 addition & 1 deletion conduit/telemetry/telemetryCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package telemetry
import (
"time"

"github.com/opensearch-project/opensearch-go/v2"
opensearch "github.com/opensearch-project/opensearch-go/v2"
)

// Config represents the configuration of Telemetry logging
Expand Down
1 change: 1 addition & 0 deletions e2e_tests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ requires-python = ">=3.8"
dependencies = [
"boto3==1.24.71",
"msgpack==1.0.4",
"psycopg2==2.9.6",
"py-algorand-sdk==1.17.0",
"pytest==6.2.5",
"PyYAML==6.0",
Expand Down
46 changes: 46 additions & 0 deletions e2e_tests/src/e2e_common/indexer_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import psycopg2


class IndexerDB:
def __init__(self, host, port, user, password, dbname):
self.host = host
self.port = port
self.user = user
self.password = password
self.dbname = dbname

@classmethod
def from_connection_string(cls, connection_string):
init_args = {
keyval.split("=")[0]: keyval.split("=")[1]
for keyval in connection_string.split()
}
return cls(
host=init_args["host"],
port=init_args["port"],
user=init_args["user"],
password=init_args["password"],
dbname=init_args["dbname"],
)

def select_one(self, query) -> tuple:
with psycopg2.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
dbname=self.dbname,
) as connection:
with connection.cursor() as cursor:
cursor.execute(query)
return cursor.fetchone() # type: ignore

def get_txn_min_max_round(self):
min_round, max_round = self.select_one("SELECT min(round), max(round) FROM txn")
return min_round, max_round

def get_table_row_count(self, table_name):
return self.select_one(f"SELECT count(*) FROM {table_name}")[0]

def get_block_header_final_round(self):
return self.select_one("SELECT max(round) FROM block_header")[0]
31 changes: 20 additions & 11 deletions e2e_tests/src/e2e_conduit/e2econduit.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#!/usr/bin/env python3
#

import argparse
import logging
import os
import sys

from e2e_common.util import find_binary
import e2e_conduit.fixtures.importers as importers
import e2e_conduit.fixtures.processors as processors
import e2e_conduit.fixtures.exporters as exporters
from e2e_conduit.runner import ConduitE2ETestRunner
from e2e_conduit.scenarios import scenarios
from e2e_conduit.scenarios.follower_indexer_scenario import follower_indexer_scenario
from e2e_conduit.scenarios.filter_scenario import app_filter_indexer_scenario, pay_filter_indexer_scenario
from e2e_conduit.scenarios.follower_indexer_scenario import (
FollowerIndexerScenario,
FollowerIndexerScenarioWithDeleteTask,
)
from e2e_conduit.scenarios.filter_scenario import (
app_filter_indexer_scenario,
pay_filter_indexer_scenario,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,21 +44,29 @@ def main():
else:
logging.basicConfig(level=logging.INFO)
sourcenet = args.source_net
source_is_tar = False
if not sourcenet:
e2edata = os.getenv("E2EDATA")
sourcenet = e2edata and os.path.join(e2edata, "net")
importer_source = sourcenet if sourcenet else args.s3_source_net
if importer_source:
scenarios.append(follower_indexer_scenario(importer_source))
scenarios.append(app_filter_indexer_scenario(importer_source))
scenarios.append(pay_filter_indexer_scenario(importer_source))
scenarios.extend(
[
FollowerIndexerScenario(importer_source),
FollowerIndexerScenarioWithDeleteTask(importer_source),
app_filter_indexer_scenario(importer_source),
pay_filter_indexer_scenario(importer_source),
]
)

runner = ConduitE2ETestRunner(args.conduit_bin, keep_temps=args.keep_temps)

success = True
for scenario in scenarios:
runner.setup_scenario(scenario)
if scenario.exporter.name == "postgresql":
print(
f"postgresql exporter with connect info: {scenario.exporter.config_input['connection-string']}"
)
if runner.run_scenario(scenario) != 0:
success = False
return 0 if success else 1
Expand Down
14 changes: 12 additions & 2 deletions e2e_tests/src/e2e_conduit/fixtures/exporters/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass, asdict
import logging
import random
import string
Expand All @@ -9,16 +10,24 @@

logger = logging.getLogger(__name__)

CONFIG_DELETE_TASK = "delete-task"

@dataclass
class DeleteTask:
interval: int
rounds: int


class PostgresqlExporter(PluginFixture):
def __init__(self, max_conn=0):
def __init__(self, max_conn=0, delete_interval=0, delete_rounds=0):
self.user = "algorand"
self.password = "algorand"
self.db_name = "e2e_db"
# Should we have a random port here so that we can run multiple of these in parallel?
self.port = "45432"
self.container_name = ""
self.max_conn = max_conn
self.delete_task = DeleteTask(delete_interval, delete_rounds)
super().__init__()

@property
Expand All @@ -27,7 +36,7 @@ def name(self):

def setup(self, _):
self.container_name = "".join(
random.choice(string.ascii_lowercase) for i in range(10)
random.choice(string.ascii_lowercase) for _ in range(10)
)
self.port = f"{random.randint(1150, 65535)}"
try:
Expand Down Expand Up @@ -60,4 +69,5 @@ def resolve_config_input(self):
self.config_input = {
"connection-string": f"host=localhost port={self.port} user={self.user} password={self.password} dbname={self.db_name} sslmode=disable",
"max-conn": self.max_conn,
CONFIG_DELETE_TASK: asdict(self.delete_task),
}
18 changes: 7 additions & 11 deletions e2e_tests/src/e2e_conduit/fixtures/importers/follower_algod.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import atexit
import glob
import json
import logging
import os
import shutil
import tempfile

import boto3
from botocore.config import Config
from botocore import UNSIGNED

from e2e_common.util import hassuffix, xrun, firstFromS3Prefix, countblocks, atexitrun
from e2e_conduit.fixtures.importers.importer_plugin import ImporterPlugin

Expand Down Expand Up @@ -67,8 +63,8 @@ def setup(self, accumulated_config):
if "/" in tarname:
cmhash_tarnme = tarname.split("/")
cmhash = cmhash_tarnme[0]
tarname =cmhash_tarnme[1]
prefix+="/"+cmhash
tarname = cmhash_tarnme[1]
prefix += "/" + cmhash
tarpath = os.path.join(conduit_dir, tarname)
else:
tarpath = os.path.join(conduit_dir, tarname)
Expand All @@ -90,14 +86,14 @@ def setup(self, accumulated_config):
self.last = countblocks(blockfiles[0])
# Reset the secondary node, and enable follow mode.
# This is what conduit will connect to for data access.
for root, dirs, files in os.walk(os.path.join(tempnet, 'Node', 'tbd-v1')):
for f in files:
if ".sqlite" in f:
os.remove(os.path.join(root, f))
for root, _, files in os.walk(os.path.join(tempnet, "Node", "tbd-v1")):
for f in files:
if ".sqlite" in f:
os.remove(os.path.join(root, f))
cf = {}
with open(os.path.join(tempnet, "Node", "config.json"), "r") as config_file:
cf = json.load(config_file)
cf['EnableFollowMode'] = True
cf["EnableFollowMode"] = True
with open(os.path.join(tempnet, "Node", "config.json"), "w") as config_file:
config_file.write(json.dumps(cf))
try:
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载