+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ var (
ErrHashMismatch = errors.New("malformed data, hash does not match manifest")
// ErrFileNameMismatch provided dataset filename does not match filename in manifest.
ErrFileNameMismatch = errors.New("malformed data, filename does not match manifest")
// ErrAllResultsConsumed indicates all results have been consumed.
ErrAllResultsConsumed = errors.New("all results have been consumed by declared consumers")
)

// Service specifies an API that must be fullfiled by the domain service
Expand All @@ -64,7 +66,7 @@ type Service interface {
}

type agentService struct {
computation Computation // Holds the current computation request details.
computation Computation // Holds the current computation manifest.
algorithm algorithm.Algorithm // Filepath to the algorithm received for the computation.
result []byte // Stores the result of the computation.
sm *StateMachine // Manages the state transitions of the agent service.
Expand All @@ -90,6 +92,7 @@ func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp
svc.sm.StateFunctions[resultsReady] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[complete] = svc.publishEvent("in-progress", json.RawMessage{})
svc.sm.StateFunctions[running] = svc.runComputation
svc.sm.StateFunctions[failed] = svc.publishEvent("failed", json.RawMessage{})

svc.computation = cmp

Expand Down Expand Up @@ -214,22 +217,22 @@ func (as *agentService) Data(ctx context.Context, dataset Dataset) error {
}

func (as *agentService) Result(ctx context.Context) ([]byte, error) {
if as.sm.GetState() != resultsReady {
if as.sm.GetState() != resultsReady && as.sm.GetState() != failed {
return []byte{}, ErrResultsNotReady
}
if len(as.computation.ResultConsumers) == 0 {
return []byte{}, ErrAllManifestItemsReceived
return []byte{}, ErrAllResultsConsumed
}
index, ok := IndexFromContext(ctx)
if !ok {
return []byte{}, ErrUndeclaredConsumer
}
as.computation.ResultConsumers = slices.Delete(as.computation.ResultConsumers, index, index+1)

if len(as.computation.ResultConsumers) == 0 {
if len(as.computation.ResultConsumers) == 0 && as.sm.GetState() == resultsReady {
as.sm.SendEvent(resultsConsumed)
}
// Return the result file or an error

return as.result, as.runError
}

Expand All @@ -249,7 +252,13 @@ func (as *agentService) Attestation(ctx context.Context, reportData [ReportDataS
func (as *agentService) runComputation() {
as.publishEvent("starting", json.RawMessage{})()
as.sm.logger.Debug("computation run started")
defer as.sm.SendEvent(runComplete)
defer func() {
if as.runError != nil {
as.sm.SendEvent(runFailed)
} else {
as.sm.SendEvent(runComplete)
}
}()

if err := os.Mkdir(algorithm.ResultsDir, 0o755); err != nil {
as.runError = fmt.Errorf("error creating results directory: %s", err.Error())
Expand Down
3 changes: 3 additions & 0 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
running
resultsReady
complete
failed
)

type event uint8
Expand All @@ -31,6 +32,7 @@ const (
dataReceived
runComplete
resultsConsumed
runFailed
)

// StateMachine represents the state machine.
Expand Down Expand Up @@ -74,6 +76,7 @@ func NewStateMachine(logger *slog.Logger, cmp Computation) *StateMachine {

sm.Transitions[running] = make(map[event]state)
sm.Transitions[running][runComplete] = resultsReady
sm.Transitions[running][runFailed] = failed

sm.Transitions[resultsReady] = make(map[event]state)
sm.Transitions[resultsReady][resultsConsumed] = complete
Expand Down
5 changes: 3 additions & 2 deletions agent/state_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载