From 1f3fcd43bceb3779e49e74abb7fe85fe13b60d5a Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 09:48:16 -0500 Subject: [PATCH 1/8] pipelineImpl: de-pointerize plugin interface fields --- conduit/pipeline/pipeline.go | 179 +++++++++++++++++++++++++----- conduit/pipeline/pipeline_test.go | 32 +++--- 2 files changed, 165 insertions(+), 46 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 95f930fb..3b232762 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -51,9 +51,9 @@ type pipelineImpl struct { initProvider *data.InitProvider - importer *importers.Importer - processors []*processors.Processor - exporter *exporters.Exporter + importer importers.Importer + processors []processors.Processor + exporter exporters.Exporter completeCallback []conduit.OnCompleteFunc pipelineMetadata state @@ -72,30 +72,30 @@ func (p *pipelineImpl) setError(err error) { } func (p *pipelineImpl) registerLifecycleCallbacks() { - if v, ok := (*p.importer).(conduit.Completed); ok { + if v, ok := (p.importer).(conduit.Completed); ok { p.completeCallback = append(p.completeCallback, v.OnComplete) } for _, processor := range p.processors { - if v, ok := (*processor).(conduit.Completed); ok { + if v, ok := (processor).(conduit.Completed); ok { p.completeCallback = append(p.completeCallback, v.OnComplete) } } - if v, ok := (*p.exporter).(conduit.Completed); ok { + if v, ok := (p.exporter).(conduit.Completed); ok { p.completeCallback = append(p.completeCallback, v.OnComplete) } } func (p *pipelineImpl) registerPluginMetricsCallbacks() { var collectors []prometheus.Collector - if v, ok := (*p.importer).(conduit.PluginMetrics); ok { + if v, ok := (p.importer).(conduit.PluginMetrics); ok { collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...) } for _, processor := range p.processors { - if v, ok := (*processor).(conduit.PluginMetrics); ok { + if v, ok := (processor).(conduit.PluginMetrics); ok { collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...) } } - if v, ok := (*p.exporter).(conduit.PluginMetrics); ok { + if v, ok := (p.exporter).(conduit.PluginMetrics); ok { collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...) } for _, c := range collectors { @@ -143,7 +143,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) { } var parts []overridePart - if v, ok := (*p.importer).(conduit.RoundRequestor); ok { + if v, ok := (p.importer).(conduit.RoundRequestor); ok { parts = append(parts, overridePart{ RoundRequest: v.RoundRequest, cfg: p.cfg.Importer, @@ -151,7 +151,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) { }) } for idx, processor := range p.processors { - if v, ok := (*processor).(conduit.RoundRequestor); ok { + if v, ok := (processor).(conduit.RoundRequestor); ok { parts = append(parts, overridePart{ RoundRequest: v.RoundRequest, cfg: p.cfg.Processors[idx], @@ -159,7 +159,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) { }) } } - if v, ok := (*p.exporter).(conduit.RoundRequestor); ok { + if v, ok := (p.exporter).(conduit.RoundRequestor); ok { parts = append(parts, overridePart{ RoundRequest: v.RoundRequest, cfg: p.cfg.Exporter, @@ -306,11 +306,11 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err) } - err = (*p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger) + err = (p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger) if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err) } - genesis, err := (*p.importer).GetGenesis() + genesis, err := (p.importer).GetGenesis() if err != nil { return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err) } @@ -339,7 +339,7 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err) } - err = (*processor).Init(p.ctx, *p.initProvider, config, logger) + err = (processor).Init(p.ctx, *p.initProvider, config, logger) if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err) } @@ -352,7 +352,7 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err) } - err = (*p.exporter).Init(p.ctx, *p.initProvider, config, logger) + err = (p.exporter).Init(p.ctx, *p.initProvider, config, logger) if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err) } @@ -388,20 +388,20 @@ func (p *pipelineImpl) Stop() { } } - if err := (*p.importer).Close(); err != nil { + if err := (p.importer).Close(); err != nil { // Log and continue on closing the rest of the pipeline - p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (*p.importer).Metadata().Name, err) + p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (p.importer).Metadata().Name, err) } for _, processor := range p.processors { - if err := (*processor).Close(); err != nil { + if err := (processor).Close(); err != nil { // Log and continue on closing the rest of the pipeline - p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (*processor).Metadata().Name, err) + p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (processor).Metadata().Name, err) } } - if err := (*p.exporter).Close(); err != nil { - p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (*p.exporter).Metadata().Name, err) + if err := (p.exporter).Close(); err != nil { + p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (p.exporter).Metadata().Name, err) } } @@ -431,8 +431,129 @@ func addMetrics(block data.BlockData, importTime time.Duration) { metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)) + float64(innerTxn)) } -// Start pushes block data through the pipeline +// Start is what Start was before the beginning of this branch func (p *pipelineImpl) Start() { + p.wg.Add(1) + retry := uint64(0) + go func() { + defer p.wg.Done() + // We need to add a separate recover function here since it launches its own go-routine + defer HandlePanic(p.logger) + for { + pipelineRun: + metrics.PipelineRetryCount.Observe(float64(retry)) + if retry > p.cfg.RetryCount && p.cfg.RetryCount != 0 { + p.logger.Errorf("Pipeline has exceeded maximum retry count (%d) - stopping...", p.cfg.RetryCount) + return + } + + if retry > 0 { + p.logger.Infof("Retry number %d resuming after a %s retry delay.", retry, p.cfg.RetryDelay) + time.Sleep(p.cfg.RetryDelay) + } + + select { + case <-p.ctx.Done(): + return + default: + { + p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound) + // fetch block + importStart := time.Now() + blkData, err := (p.importer).GetBlock(p.pipelineMetadata.NextRound) + if err != nil { + p.logger.Errorf("%v", err) + p.setError(err) + retry++ + goto pipelineRun + } + metrics.ImporterTimeSeconds.Observe(time.Since(importStart).Seconds()) + + // TODO: Verify that the block was built with a known protocol version. + + // Start time currently measures operations after block fetching is complete. + // This is for backwards compatibility w/ Indexer's metrics + // run through processors + start := time.Now() + for _, proc := range p.processors { + processorStart := time.Now() + blkData, err = (proc).Process(blkData) + if err != nil { + p.logger.Errorf("%v", err) + p.setError(err) + retry++ + goto pipelineRun + } + metrics.ProcessorTimeSeconds.WithLabelValues((proc).Metadata().Name).Observe(time.Since(processorStart).Seconds()) + } + // run through exporter + exporterStart := time.Now() + err = (p.exporter).Receive(blkData) + if err != nil { + p.logger.Errorf("%v", err) + p.setError(err) + retry++ + goto pipelineRun + } + p.logger.Infof("round r=%d (%d txn) exported in %s", p.pipelineMetadata.NextRound, len(blkData.Payset), time.Since(start)) + + // Increment Round, update metadata + p.pipelineMetadata.NextRound++ + err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir) + if err != nil { + p.logger.Errorf("%v", err) + } + + // Callback Processors + for _, cb := range p.completeCallback { + err = cb(blkData) + if err != nil { + p.logger.Errorf("%v", err) + p.setError(err) + retry++ + goto pipelineRun + } + } + metrics.ExporterTimeSeconds.Observe(time.Since(exporterStart).Seconds()) + // Ignore round 0 (which is empty). + if p.pipelineMetadata.NextRound > 1 { + addMetrics(blkData, time.Since(start)) + } + p.setError(nil) + retry = 0 + } + } + + } + }() +} + +// OriginalStart pushes block data through the pipeline +/* +func (p *pipelineImpl) FutureStart() { + fetchBlocks := make(chan *data.BlockData, 10) + processedBlocks := make(chan *data.BlockData, 10) + + // Fetch Blocks + go func() { + defer close(fetchBlocks) + for { + select { + case <-p.ctx.Done(): + return + default: + blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound) + if err != nil { + p.logger.Errorf("%v", err) + p.setError(err) + // Retry logic + continue + } + fetchBlocks <- &blkData + } + } + }() + p.wg.Add(1) retry := uint64(0) go func() { @@ -527,6 +648,7 @@ func (p *pipelineImpl) Start() { } }() } +*/ func (p *pipelineImpl) Wait() { p.wg.Wait() @@ -563,7 +685,7 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi logger: logger, initProvider: nil, importer: nil, - processors: []*processors.Processor{}, + processors: []processors.Processor{}, exporter: nil, } @@ -574,8 +696,7 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err) } - importer := importerBuilder.New() - pipeline.importer = &importer + pipeline.importer = importerBuilder.New() logger.Infof("Found Importer: %s", importerName) // --- @@ -588,8 +709,7 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err) } - processor := processorBuilder.New() - pipeline.processors = append(pipeline.processors, &processor) + pipeline.processors = append(pipeline.processors, processorBuilder.New()) logger.Infof("Found Processor: %s", processorName) } @@ -602,8 +722,7 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err) } - exporter := exporterBuilder.New() - pipeline.exporter = &exporter + pipeline.exporter = exporterBuilder.New() logger.Infof("Found Exporter: %s", exporterName) return pipeline, nil diff --git a/conduit/pipeline/pipeline_test.go b/conduit/pipeline/pipeline_test.go index 32c610a4..7303687f 100644 --- a/conduit/pipeline/pipeline_test.go +++ b/conduit/pipeline/pipeline_test.go @@ -233,9 +233,9 @@ func mockPipeline(t *testing.T, dataDir string) (*pipelineImpl, *test.Hook, *moc }, logger: l, initProvider: nil, - importer: &pImporter, - processors: []*processors.Processor{&pProcessor}, - exporter: &pExporter, + importer: pImporter, + processors: []processors.Processor{pProcessor}, + exporter: pExporter, pipelineMetadata: state{ GenesisHash: "", Network: "", @@ -271,9 +271,9 @@ func TestPipelineRun(t *testing.T) { cf: cf, logger: l, initProvider: nil, - importer: &pImporter, - processors: []*processors.Processor{&pProcessor}, - exporter: &pExporter, + importer: pImporter, + processors: []processors.Processor{pProcessor}, + exporter: pExporter, completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete}, pipelineMetadata: state{ NextRound: 0, @@ -371,9 +371,9 @@ func TestPipelineErrors(t *testing.T) { }, logger: l, initProvider: nil, - importer: &pImporter, - processors: []*processors.Processor{&pProcessor}, - exporter: &pExporter, + importer: pImporter, + processors: []processors.Processor{pProcessor}, + exporter: pExporter, completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete}, pipelineMetadata: state{}, } @@ -440,9 +440,9 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) { cfg: &data.Config{}, logger: l, initProvider: nil, - importer: &pImporter, - processors: []*processors.Processor{&pProcessor, &pProcessor}, - exporter: &pExporter, + importer: pImporter, + processors: []processors.Processor{pProcessor, pProcessor}, + exporter: pExporter, } // Each plugin implements the Completed interface, so there should be 4 @@ -485,7 +485,7 @@ func TestGenesisHash(t *testing.T) { // mock a different genesis hash var pImporter importers.Importer = &mockImporter{genesis: sdk.Genesis{Network: "dev"}} - pImpl.importer = &pImporter + pImpl.importer = pImporter err = pImpl.Init() assert.Contains(t, err.Error(), "genesis hash in metadata does not match") } @@ -797,9 +797,9 @@ func TestPipelineRetryVariables(t *testing.T) { }, logger: l, initProvider: nil, - importer: &pImporter, - processors: []*processors.Processor{&pProcessor}, - exporter: &pExporter, + importer: pImporter, + processors: []processors.Processor{pProcessor}, + exporter: pExporter, pipelineMetadata: state{ GenesisHash: "", Network: "", From c2d0e98129735d9874b6d6e1c920923c588386c1 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 10:52:12 -0500 Subject: [PATCH 2/8] Update conduit/pipeline/pipeline.go --- conduit/pipeline/pipeline.go | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 3b232762..01021093 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -528,32 +528,6 @@ func (p *pipelineImpl) Start() { }() } -// OriginalStart pushes block data through the pipeline -/* -func (p *pipelineImpl) FutureStart() { - fetchBlocks := make(chan *data.BlockData, 10) - processedBlocks := make(chan *data.BlockData, 10) - - // Fetch Blocks - go func() { - defer close(fetchBlocks) - for { - select { - case <-p.ctx.Done(): - return - default: - blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - // Retry logic - continue - } - fetchBlocks <- &blkData - } - } - }() - p.wg.Add(1) retry := uint64(0) go func() { From 1d2ac681909bebbdb1a639733497566d2153d1de Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 10:54:32 -0500 Subject: [PATCH 3/8] Update conduit/pipeline/pipeline.go --- conduit/pipeline/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 01021093..573ac7fa 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -431,7 +431,7 @@ func addMetrics(block data.BlockData, importTime time.Duration) { metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Payset)) + float64(innerTxn)) } -// Start is what Start was before the beginning of this branch +// Start pushes block data through the pipeline func (p *pipelineImpl) Start() { p.wg.Add(1) retry := uint64(0) From 77716df7edd1685be7352165c08fa62d098a7ce5 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 10:58:28 -0500 Subject: [PATCH 4/8] dedupe --- conduit/pipeline/pipeline.go | 96 ------------------------------------ 1 file changed, 96 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 573ac7fa..5c82a49f 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -528,102 +528,6 @@ func (p *pipelineImpl) Start() { }() } - p.wg.Add(1) - retry := uint64(0) - go func() { - defer p.wg.Done() - // We need to add a separate recover function here since it launches its own go-routine - defer HandlePanic(p.logger) - for { - pipelineRun: - metrics.PipelineRetryCount.Observe(float64(retry)) - if retry > p.cfg.RetryCount && p.cfg.RetryCount != 0 { - p.logger.Errorf("Pipeline has exceeded maximum retry count (%d) - stopping...", p.cfg.RetryCount) - return - } - - if retry > 0 { - p.logger.Infof("Retry number %d resuming after a %s retry delay.", retry, p.cfg.RetryDelay) - time.Sleep(p.cfg.RetryDelay) - } - - select { - case <-p.ctx.Done(): - return - default: - { - p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound) - // fetch block - importStart := time.Now() - blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - metrics.ImporterTimeSeconds.Observe(time.Since(importStart).Seconds()) - - // TODO: Verify that the block was built with a known protocol version. - - // Start time currently measures operations after block fetching is complete. - // This is for backwards compatibility w/ Indexer's metrics - // run through processors - start := time.Now() - for _, proc := range p.processors { - processorStart := time.Now() - blkData, err = (*proc).Process(blkData) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - metrics.ProcessorTimeSeconds.WithLabelValues((*proc).Metadata().Name).Observe(time.Since(processorStart).Seconds()) - } - // run through exporter - exporterStart := time.Now() - err = (*p.exporter).Receive(blkData) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - p.logger.Infof("round r=%d (%d txn) exported in %s", p.pipelineMetadata.NextRound, len(blkData.Payset), time.Since(start)) - - // Increment Round, update metadata - p.pipelineMetadata.NextRound++ - err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir) - if err != nil { - p.logger.Errorf("%v", err) - } - - // Callback Processors - for _, cb := range p.completeCallback { - err = cb(blkData) - if err != nil { - p.logger.Errorf("%v", err) - p.setError(err) - retry++ - goto pipelineRun - } - } - metrics.ExporterTimeSeconds.Observe(time.Since(exporterStart).Seconds()) - // Ignore round 0 (which is empty). - if p.pipelineMetadata.NextRound > 1 { - addMetrics(blkData, time.Since(start)) - } - p.setError(nil) - retry = 0 - } - } - - } - }() -} -*/ - func (p *pipelineImpl) Wait() { p.wg.Wait() } From f507ba58379a85661a19bdc51d6a2eb57ae40a78 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 11:02:10 -0500 Subject: [PATCH 5/8] per CR: don't need parens anymore pre-cast --- conduit/pipeline/pipeline.go | 42 ++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 5c82a49f..200f6007 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -72,30 +72,30 @@ func (p *pipelineImpl) setError(err error) { } func (p *pipelineImpl) registerLifecycleCallbacks() { - if v, ok := (p.importer).(conduit.Completed); ok { + if v, ok := p.importer.(conduit.Completed); ok { p.completeCallback = append(p.completeCallback, v.OnComplete) } for _, processor := range p.processors { - if v, ok := (processor).(conduit.Completed); ok { + if v, ok := processor.(conduit.Completed); ok { p.completeCallback = append(p.completeCallback, v.OnComplete) } } - if v, ok := (p.exporter).(conduit.Completed); ok { + if v, ok := p.exporter.(conduit.Completed); ok { p.completeCallback = append(p.completeCallback, v.OnComplete) } } func (p *pipelineImpl) registerPluginMetricsCallbacks() { var collectors []prometheus.Collector - if v, ok := (p.importer).(conduit.PluginMetrics); ok { + if v, ok := p.importer.(conduit.PluginMetrics); ok { collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...) } for _, processor := range p.processors { - if v, ok := (processor).(conduit.PluginMetrics); ok { + if v, ok := processor.(conduit.PluginMetrics); ok { collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...) } } - if v, ok := (p.exporter).(conduit.PluginMetrics); ok { + if v, ok := p.exporter.(conduit.PluginMetrics); ok { collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...) } for _, c := range collectors { @@ -143,7 +143,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) { } var parts []overridePart - if v, ok := (p.importer).(conduit.RoundRequestor); ok { + if v, ok := p.importer.(conduit.RoundRequestor); ok { parts = append(parts, overridePart{ RoundRequest: v.RoundRequest, cfg: p.cfg.Importer, @@ -151,7 +151,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) { }) } for idx, processor := range p.processors { - if v, ok := (processor).(conduit.RoundRequestor); ok { + if v, ok := processor.(conduit.RoundRequestor); ok { parts = append(parts, overridePart{ RoundRequest: v.RoundRequest, cfg: p.cfg.Processors[idx], @@ -159,7 +159,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) { }) } } - if v, ok := (p.exporter).(conduit.RoundRequestor); ok { + if v, ok := p.exporter.(conduit.RoundRequestor); ok { parts = append(parts, overridePart{ RoundRequest: v.RoundRequest, cfg: p.cfg.Exporter, @@ -306,11 +306,11 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err) } - err = (p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger) + err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger) if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err) } - genesis, err := (p.importer).GetGenesis() + genesis, err := p.importer.GetGenesis() if err != nil { return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err) } @@ -339,7 +339,7 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err) } - err = (processor).Init(p.ctx, *p.initProvider, config, logger) + err = processor.Init(p.ctx, *p.initProvider, config, logger) if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err) } @@ -352,7 +352,7 @@ func (p *pipelineImpl) Init() error { if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err) } - err = (p.exporter).Init(p.ctx, *p.initProvider, config, logger) + err = p.exporter.Init(p.ctx, *p.initProvider, config, logger) if err != nil { return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err) } @@ -388,20 +388,20 @@ func (p *pipelineImpl) Stop() { } } - if err := (p.importer).Close(); err != nil { + if err := p.importer.Close(); err != nil { // Log and continue on closing the rest of the pipeline - p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (p.importer).Metadata().Name, err) + p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err) } for _, processor := range p.processors { - if err := (processor).Close(); err != nil { + if err := processor.Close(); err != nil { // Log and continue on closing the rest of the pipeline - p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (processor).Metadata().Name, err) + p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err) } } - if err := (p.exporter).Close(); err != nil { - p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (p.exporter).Metadata().Name, err) + if err := p.exporter.Close(); err != nil { + p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err) } } @@ -460,7 +460,7 @@ func (p *pipelineImpl) Start() { p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound) // fetch block importStart := time.Now() - blkData, err := (p.importer).GetBlock(p.pipelineMetadata.NextRound) + blkData, err := p.importer.GetBlock(p.pipelineMetadata.NextRound) if err != nil { p.logger.Errorf("%v", err) p.setError(err) @@ -488,7 +488,7 @@ func (p *pipelineImpl) Start() { } // run through exporter exporterStart := time.Now() - err = (p.exporter).Receive(blkData) + err = p.exporter.Receive(blkData) if err != nil { p.logger.Errorf("%v", err) p.setError(err) From f7e0bab9e076de22157dcbd7e2f646d52fc203d5 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 14:45:11 -0500 Subject: [PATCH 6/8] Update conduit/pipeline/pipeline.go Co-authored-by: Will Winder --- conduit/pipeline/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 200f6007..a1981796 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -477,7 +477,7 @@ func (p *pipelineImpl) Start() { start := time.Now() for _, proc := range p.processors { processorStart := time.Now() - blkData, err = (proc).Process(blkData) + blkData, err = proc.Process(blkData) if err != nil { p.logger.Errorf("%v", err) p.setError(err) From 56432955b7535070e4cad683348c4f770ec275d6 Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 14:45:32 -0500 Subject: [PATCH 7/8] Update conduit/pipeline/pipeline.go Co-authored-by: Will Winder --- conduit/pipeline/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index a1981796..96e66b47 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -484,7 +484,7 @@ func (p *pipelineImpl) Start() { retry++ goto pipelineRun } - metrics.ProcessorTimeSeconds.WithLabelValues((proc).Metadata().Name).Observe(time.Since(processorStart).Seconds()) + metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(time.Since(processorStart).Seconds()) } // run through exporter exporterStart := time.Now() From d1f9615b7a9a93dbc7799dc8c9974d40ba79762c Mon Sep 17 00:00:00 2001 From: Zeph Grunschlag Date: Fri, 30 Jun 2023 14:59:31 -0500 Subject: [PATCH 8/8] Builer --> Constructor (on Constructor is now ImporterConstructor) --- conduit/pipeline/pipeline.go | 12 ++++++------ conduit/plugins/exporters/exporter_factory.go | 4 ++-- .../plugins/exporters/exporter_factory_test.go | 4 ++-- .../exporters/noop/noop_exporter_test.go | 2 +- conduit/plugins/importers/importer_factory.go | 12 ++++++------ .../filterprocessor/filter_processor_test.go | 18 +++++++++--------- .../plugins/processors/processor_factory.go | 4 ++-- .../processors/processor_factory_test.go | 4 ++-- 8 files changed, 30 insertions(+), 30 deletions(-) diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 96e66b47..254396c1 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -569,12 +569,12 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi importerName := cfg.Importer.Name - importerBuilder, err := importers.ImporterBuilderByName(importerName) + importerConstructor, err := importers.ImporterConstructorByName(importerName) if err != nil { return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err) } - pipeline.importer = importerBuilder.New() + pipeline.importer = importerConstructor.New() logger.Infof("Found Importer: %s", importerName) // --- @@ -582,12 +582,12 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi for _, processorConfig := range cfg.Processors { processorName := processorConfig.Name - processorBuilder, err := processors.ProcessorBuilderByName(processorName) + processorConstructor, err := processors.ProcessorConstructorByName(processorName) if err != nil { return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err) } - pipeline.processors = append(pipeline.processors, processorBuilder.New()) + pipeline.processors = append(pipeline.processors, processorConstructor.New()) logger.Infof("Found Processor: %s", processorName) } @@ -595,12 +595,12 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi exporterName := cfg.Exporter.Name - exporterBuilder, err := exporters.ExporterBuilderByName(exporterName) + exporterConstructor, err := exporters.ExporterConstructorByName(exporterName) if err != nil { return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err) } - pipeline.exporter = exporterBuilder.New() + pipeline.exporter = exporterConstructor.New() logger.Infof("Found Exporter: %s", exporterName) return pipeline, nil diff --git a/conduit/plugins/exporters/exporter_factory.go b/conduit/plugins/exporters/exporter_factory.go index d8e9a6e2..7e8164b1 100644 --- a/conduit/plugins/exporters/exporter_factory.go +++ b/conduit/plugins/exporters/exporter_factory.go @@ -34,8 +34,8 @@ func Register(name string, constructor ExporterConstructor) { Exporters[name] = constructor } -// ExporterBuilderByName returns a Processor constructor for the name provided -func ExporterBuilderByName(name string) (ExporterConstructor, error) { +// ExporterConstructorByName returns a Processor constructor for the name provided +func ExporterConstructorByName(name string) (ExporterConstructor, error) { constructor, ok := Exporters[name] if !ok { return nil, fmt.Errorf("no Exporter Constructor for %s", name) diff --git a/conduit/plugins/exporters/exporter_factory_test.go b/conduit/plugins/exporters/exporter_factory_test.go index 7e5c9e9b..a894670a 100644 --- a/conduit/plugins/exporters/exporter_factory_test.go +++ b/conduit/plugins/exporters/exporter_factory_test.go @@ -37,14 +37,14 @@ func TestExporterByNameSuccess(t *testing.T) { me := mockExporter{} Register("foobar", &mockExporterConstructor{&me}) - expC, err := ExporterBuilderByName("foobar") + expC, err := ExporterConstructorByName("foobar") assert.NoError(t, err) exp := expC.New() assert.Implements(t, (*Exporter)(nil), exp) } func TestExporterByNameNotFound(t *testing.T) { - _, err := ExporterBuilderByName("barfoo") + _, err := ExporterConstructorByName("barfoo") expectedErr := "no Exporter Constructor for barfoo" assert.EqualError(t, err, expectedErr) } diff --git a/conduit/plugins/exporters/noop/noop_exporter_test.go b/conduit/plugins/exporters/noop/noop_exporter_test.go index e213b234..96b5071e 100644 --- a/conduit/plugins/exporters/noop/noop_exporter_test.go +++ b/conduit/plugins/exporters/noop/noop_exporter_test.go @@ -21,7 +21,7 @@ var ne = nc.New() func TestExporterBuilderByName(t *testing.T) { // init() has already registered the noop exporter assert.Contains(t, exporters.Exporters, metadata.Name) - neBuilder, err := exporters.ExporterBuilderByName(metadata.Name) + neBuilder, err := exporters.ExporterConstructorByName(metadata.Name) assert.NoError(t, err) ne := neBuilder.New() assert.Implements(t, (*exporters.Exporter)(nil), ne) diff --git a/conduit/plugins/importers/importer_factory.go b/conduit/plugins/importers/importer_factory.go index 51c18c92..6acf5ba2 100644 --- a/conduit/plugins/importers/importer_factory.go +++ b/conduit/plugins/importers/importer_factory.go @@ -4,9 +4,9 @@ import ( "fmt" ) -// Constructor must be implemented by each Importer. +// ImporterConstructor must be implemented by each Importer. // It provides a basic no-arg constructor for instances of an ImporterImpl. -type Constructor interface { +type ImporterConstructor interface { // New should return an instantiation of a Importer. // Configuration values should be passed and can be processed during `Init()`. New() Importer @@ -21,20 +21,20 @@ func (f ImporterConstructorFunc) New() Importer { } // Importers are the constructors to build importer plugins. -var Importers = make(map[string]Constructor) +var Importers = make(map[string]ImporterConstructor) // Register is used to register Constructor implementations. This mechanism allows // for loose coupling between the configuration and the implementation. It is extremely similar to the way sql.DB // drivers are configured and used. -func Register(name string, constructor Constructor) { +func Register(name string, constructor ImporterConstructor) { if _, ok := Importers[name]; ok { panic(fmt.Errorf("importer %s already registered", name)) } Importers[name] = constructor } -// ImporterBuilderByName returns a Importer constructor for the name provided -func ImporterBuilderByName(name string) (Constructor, error) { +// ImporterConstructorByName returns a Importer constructor for the name provided +func ImporterConstructorByName(name string) (ImporterConstructor, error) { constructor, ok := Importers[name] if !ok { return nil, fmt.Errorf("no Importer Constructor for %s", name) diff --git a/conduit/plugins/processors/filterprocessor/filter_processor_test.go b/conduit/plugins/processors/filterprocessor/filter_processor_test.go index 15b7eb56..832c0cf8 100644 --- a/conduit/plugins/processors/filterprocessor/filter_processor_test.go +++ b/conduit/plugins/processors/filterprocessor/filter_processor_test.go @@ -54,7 +54,7 @@ filters: expression: "` + sampleAddr2.String() + `" ` - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -213,7 +213,7 @@ filters: for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -341,7 +341,7 @@ filters: for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -501,7 +501,7 @@ filters: for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -660,7 +660,7 @@ filters: for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -762,7 +762,7 @@ filters: for _, test := range tests { t.Run(test.name, func(t *testing.T) { - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -804,7 +804,7 @@ filters: expression: "` + sampleAddr2.String() + `" ` - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -903,7 +903,7 @@ filters: expression: "` + sampleAddr2.String() + `" ` - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() @@ -976,7 +976,7 @@ filters: expression: "` + sampleAddr2.String() + `" ` - fpBuilder, err := processors.ProcessorBuilderByName(PluginName) + fpBuilder, err := processors.ProcessorConstructorByName(PluginName) assert.NoError(t, err) fp := fpBuilder.New() diff --git a/conduit/plugins/processors/processor_factory.go b/conduit/plugins/processors/processor_factory.go index d8118a19..dea4290f 100644 --- a/conduit/plugins/processors/processor_factory.go +++ b/conduit/plugins/processors/processor_factory.go @@ -33,8 +33,8 @@ func Register(name string, constructor ProcessorConstructor) { Processors[name] = constructor } -// ProcessorBuilderByName returns a Processor constructor for the name provided -func ProcessorBuilderByName(name string) (ProcessorConstructor, error) { +// ProcessorConstructorByName returns a Processor constructor for the name provided +func ProcessorConstructorByName(name string) (ProcessorConstructor, error) { constructor, ok := Processors[name] if !ok { return nil, fmt.Errorf("no Processor Constructor for %s", name) diff --git a/conduit/plugins/processors/processor_factory_test.go b/conduit/plugins/processors/processor_factory_test.go index 3c0ace92..7ad34b65 100644 --- a/conduit/plugins/processors/processor_factory_test.go +++ b/conduit/plugins/processors/processor_factory_test.go @@ -42,14 +42,14 @@ func TestProcessorBuilderByNameSuccess(t *testing.T) { me := mockProcessor{} Register("foobar", &mockProcessorConstructor{&me}) - expBuilder, err := ProcessorBuilderByName("foobar") + expBuilder, err := ProcessorConstructorByName("foobar") assert.NoError(t, err) exp := expBuilder.New() assert.Implements(t, (*Processor)(nil), exp) } func TestProcessorBuilderByNameNotFound(t *testing.T) { - _, err := ProcessorBuilderByName("barfoo") + _, err := ProcessorConstructorByName("barfoo") expectedErr := "no Processor Constructor for barfoo" assert.EqualError(t, err, expectedErr) }