+
Skip to content

pipelineImpl: de-pointerize plugin interface fields #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 33 additions & 36 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type pipelineImpl struct {

initProvider *data.InitProvider

importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
importer importers.Importer
processors []processors.Processor
exporter exporters.Exporter
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state
Expand All @@ -72,30 +72,30 @@ func (p *pipelineImpl) setError(err error) {
}

func (p *pipelineImpl) registerLifecycleCallbacks() {
if v, ok := (*p.importer).(conduit.Completed); ok {
if v, ok := p.importer.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
for _, processor := range p.processors {
if v, ok := (*processor).(conduit.Completed); ok {
if v, ok := processor.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}
if v, ok := (*p.exporter).(conduit.Completed); ok {
if v, ok := p.exporter.(conduit.Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}

func (p *pipelineImpl) registerPluginMetricsCallbacks() {
var collectors []prometheus.Collector
if v, ok := (*p.importer).(conduit.PluginMetrics); ok {
if v, ok := p.importer.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
for _, processor := range p.processors {
if v, ok := (*processor).(conduit.PluginMetrics); ok {
if v, ok := processor.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
}
if v, ok := (*p.exporter).(conduit.PluginMetrics); ok {
if v, ok := p.exporter.(conduit.PluginMetrics); ok {
collectors = append(collectors, v.ProvideMetrics(p.cfg.Metrics.Prefix)...)
}
for _, c := range collectors {
Expand Down Expand Up @@ -143,23 +143,23 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
}
var parts []overridePart

if v, ok := (*p.importer).(conduit.RoundRequestor); ok {
if v, ok := p.importer.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Importer,
t: plugins.Importer,
})
}
for idx, processor := range p.processors {
if v, ok := (*processor).(conduit.RoundRequestor); ok {
if v, ok := processor.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Processors[idx],
t: plugins.Processor,
})
}
}
if v, ok := (*p.exporter).(conduit.RoundRequestor); ok {
if v, ok := p.exporter.(conduit.RoundRequestor); ok {
parts = append(parts, overridePart{
RoundRequest: v.RoundRequest,
cfg: p.cfg.Exporter,
Expand Down Expand Up @@ -306,11 +306,11 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}
err = (*p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}
genesis, err := (*p.importer).GetGenesis()
genesis, err := p.importer.GetGenesis()
if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}
Expand Down Expand Up @@ -339,7 +339,7 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)
}
err = (*processor).Init(p.ctx, *p.initProvider, config, logger)
err = processor.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err)
}
Expand All @@ -352,7 +352,7 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}
err = (*p.exporter).Init(p.ctx, *p.initProvider, config, logger)
err = p.exporter.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err)
}
Expand Down Expand Up @@ -388,20 +388,20 @@ func (p *pipelineImpl) Stop() {
}
}

if err := (*p.importer).Close(); err != nil {
if err := p.importer.Close(); err != nil {
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (*p.importer).Metadata().Name, err)
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err)
}

for _, processor := range p.processors {
if err := (*processor).Close(); err != nil {
if err := processor.Close(); err != nil {
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (*processor).Metadata().Name, err)
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err)
}
}

if err := (*p.exporter).Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (*p.exporter).Metadata().Name, err)
if err := p.exporter.Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err)
}
}

Expand Down Expand Up @@ -460,7 +460,7 @@ func (p *pipelineImpl) Start() {
p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound)
// fetch block
importStart := time.Now()
blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound)
blkData, err := p.importer.GetBlock(p.pipelineMetadata.NextRound)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand All @@ -477,18 +477,18 @@ func (p *pipelineImpl) Start() {
start := time.Now()
for _, proc := range p.processors {
processorStart := time.Now()
blkData, err = (*proc).Process(blkData)
blkData, err = proc.Process(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
retry++
goto pipelineRun
}
metrics.ProcessorTimeSeconds.WithLabelValues((*proc).Metadata().Name).Observe(time.Since(processorStart).Seconds())
metrics.ProcessorTimeSeconds.WithLabelValues(proc.Metadata().Name).Observe(time.Since(processorStart).Seconds())
}
// run through exporter
exporterStart := time.Now()
err = (*p.exporter).Receive(blkData)
err = p.exporter.Receive(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand Down Expand Up @@ -563,47 +563,44 @@ func MakePipeline(ctx context.Context, cfg *data.Config, logger *log.Logger) (Pi
logger: logger,
initProvider: nil,
importer: nil,
processors: []*processors.Processor{},
processors: []processors.Processor{},
exporter: nil,
}

importerName := cfg.Importer.Name

importerBuilder, err := importers.ImporterBuilderByName(importerName)
importerConstructor, err := importers.ImporterConstructorByName(importerName)
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err)
}

importer := importerBuilder.New()
pipeline.importer = &importer
pipeline.importer = importerConstructor.New()
logger.Infof("Found Importer: %s", importerName)

// ---

for _, processorConfig := range cfg.Processors {
processorName := processorConfig.Name

processorBuilder, err := processors.ProcessorBuilderByName(processorName)
processorConstructor, err := processors.ProcessorConstructorByName(processorName)
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err)
}

processor := processorBuilder.New()
pipeline.processors = append(pipeline.processors, &processor)
pipeline.processors = append(pipeline.processors, processorConstructor.New())
logger.Infof("Found Processor: %s", processorName)
}

// ---

exporterName := cfg.Exporter.Name

exporterBuilder, err := exporters.ExporterBuilderByName(exporterName)
exporterConstructor, err := exporters.ExporterConstructorByName(exporterName)
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err)
}

exporter := exporterBuilder.New()
pipeline.exporter = &exporter
pipeline.exporter = exporterConstructor.New()
logger.Infof("Found Exporter: %s", exporterName)

return pipeline, nil
Expand Down
32 changes: 16 additions & 16 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ func mockPipeline(t *testing.T, dataDir string) (*pipelineImpl, *test.Hook, *moc
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
pipelineMetadata: state{
GenesisHash: "",
Network: "",
Expand Down Expand Up @@ -271,9 +271,9 @@ func TestPipelineRun(t *testing.T) {
cf: cf,
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete},
pipelineMetadata: state{
NextRound: 0,
Expand Down Expand Up @@ -371,9 +371,9 @@ func TestPipelineErrors(t *testing.T) {
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
completeCallback: []conduit.OnCompleteFunc{cbComplete.OnComplete},
pipelineMetadata: state{},
}
Expand Down Expand Up @@ -440,9 +440,9 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) {
cfg: &data.Config{},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor, &pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor, pProcessor},
exporter: pExporter,
}

// Each plugin implements the Completed interface, so there should be 4
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestGenesisHash(t *testing.T) {

// mock a different genesis hash
var pImporter importers.Importer = &mockImporter{genesis: sdk.Genesis{Network: "dev"}}
pImpl.importer = &pImporter
pImpl.importer = pImporter
err = pImpl.Init()
assert.Contains(t, err.Error(), "genesis hash in metadata does not match")
}
Expand Down Expand Up @@ -797,9 +797,9 @@ func TestPipelineRetryVariables(t *testing.T) {
},
logger: l,
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
importer: pImporter,
processors: []processors.Processor{pProcessor},
exporter: pExporter,
pipelineMetadata: state{
GenesisHash: "",
Network: "",
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/exporter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func Register(name string, constructor ExporterConstructor) {
Exporters[name] = constructor
}

// ExporterBuilderByName returns a Processor constructor for the name provided
func ExporterBuilderByName(name string) (ExporterConstructor, error) {
// ExporterConstructorByName returns a Processor constructor for the name provided
func ExporterConstructorByName(name string) (ExporterConstructor, error) {
constructor, ok := Exporters[name]
if !ok {
return nil, fmt.Errorf("no Exporter Constructor for %s", name)
Expand Down
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/exporter_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ func TestExporterByNameSuccess(t *testing.T) {
me := mockExporter{}
Register("foobar", &mockExporterConstructor{&me})

expC, err := ExporterBuilderByName("foobar")
expC, err := ExporterConstructorByName("foobar")
assert.NoError(t, err)
exp := expC.New()
assert.Implements(t, (*Exporter)(nil), exp)
}

func TestExporterByNameNotFound(t *testing.T) {
_, err := ExporterBuilderByName("barfoo")
_, err := ExporterConstructorByName("barfoo")
expectedErr := "no Exporter Constructor for barfoo"
assert.EqualError(t, err, expectedErr)
}
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/exporters/noop/noop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var ne = nc.New()
func TestExporterBuilderByName(t *testing.T) {
// init() has already registered the noop exporter
assert.Contains(t, exporters.Exporters, metadata.Name)
neBuilder, err := exporters.ExporterBuilderByName(metadata.Name)
neBuilder, err := exporters.ExporterConstructorByName(metadata.Name)
assert.NoError(t, err)
ne := neBuilder.New()
assert.Implements(t, (*exporters.Exporter)(nil), ne)
Expand Down
12 changes: 6 additions & 6 deletions conduit/plugins/importers/importer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
)

// Constructor must be implemented by each Importer.
// ImporterConstructor must be implemented by each Importer.
// It provides a basic no-arg constructor for instances of an ImporterImpl.
type Constructor interface {
type ImporterConstructor interface {
// New should return an instantiation of a Importer.
// Configuration values should be passed and can be processed during `Init()`.
New() Importer
Expand All @@ -21,20 +21,20 @@ func (f ImporterConstructorFunc) New() Importer {
}

// Importers are the constructors to build importer plugins.
var Importers = make(map[string]Constructor)
var Importers = make(map[string]ImporterConstructor)

// Register is used to register Constructor implementations. This mechanism allows
// for loose coupling between the configuration and the implementation. It is extremely similar to the way sql.DB
// drivers are configured and used.
func Register(name string, constructor Constructor) {
func Register(name string, constructor ImporterConstructor) {
if _, ok := Importers[name]; ok {
panic(fmt.Errorf("importer %s already registered", name))
}
Importers[name] = constructor
}

// ImporterBuilderByName returns a Importer constructor for the name provided
func ImporterBuilderByName(name string) (Constructor, error) {
// ImporterConstructorByName returns a Importer constructor for the name provided
func ImporterConstructorByName(name string) (ImporterConstructor, error) {
constructor, ok := Importers[name]
if !ok {
return nil, fmt.Errorf("no Importer Constructor for %s", name)
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载