+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions agent/agentevent_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions agent/agentstate_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

177 changes: 124 additions & 53 deletions agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,52 @@
"github.com/ultravioletrs/cocos/agent/algorithm/python"
"github.com/ultravioletrs/cocos/agent/algorithm/wasm"
"github.com/ultravioletrs/cocos/agent/events"
"github.com/ultravioletrs/cocos/agent/statemachine"
"github.com/ultravioletrs/cocos/internal"
"golang.org/x/crypto/sha3"
)

var _ Service = (*agentService)(nil)

//go:generate stringer -type=AgentState
type AgentState int

const (
Idle AgentState = iota
ReceivingManifest
ReceivingAlgorithm
ReceivingData
Running
ConsumingResults
Complete
Failed
)

//go:generate stringer -type=AgentEvent
type AgentEvent int

const (
Start AgentEvent = iota
ManifestReceived
AlgorithmReceived
DataReceived
RunComplete
ResultsConsumed
RunFailed
)

//go:generate stringer -type=Status
type Status uint8

const (
IdleState Status = iota
InProgress
Ready
Completed
Terminated
Warning
)

const (
// ReportDataSize is the size of the report data expected by the attestation service.
ReportDataSize = 64
Expand Down Expand Up @@ -71,40 +111,69 @@
}

type agentService struct {
computation Computation // Holds the current computation request details.
algorithm algorithm.Algorithm // Filepath to the algorithm received for the computation.
result []byte // Stores the result of the computation.
sm *StateMachine // Manages the state transitions of the agent service.
runError error // Stores any error encountered during the computation run.
eventSvc events.Service // Service for publishing events related to computation.
quoteProvider client.QuoteProvider // Provider for generating attestation quotes.
computation Computation // Holds the current computation request details.
algorithm algorithm.Algorithm // Filepath to the algorithm received for the computation.
result []byte // Stores the result of the computation.
sm statemachine.StateMachine // Manages the state transitions of the agent service.
runError error // Stores any error encountered during the computation run.
eventSvc events.Service // Service for publishing events related to computation.
quoteProvider client.QuoteProvider // Provider for generating attestation quotes.
logger *slog.Logger // Logger for the agent service.
resultsConsumed bool // Indicates if the results have been consumed.
}

var _ Service = (*agentService)(nil)

// New instantiates the agent service implementation.
func New(ctx context.Context, logger *slog.Logger, eventSvc events.Service, cmp Computation, quoteProvider client.QuoteProvider) Service {
sm := statemachine.NewStateMachine(Idle)
svc := &agentService{
sm: NewStateMachine(logger, cmp),
sm: sm,
eventSvc: eventSvc,
quoteProvider: quoteProvider,
logger: logger,
computation: cmp,
}

svc.sm.StateFunctions[Idle] = svc.publishEvent(IdleState.String(), json.RawMessage{})
svc.sm.StateFunctions[ReceivingManifest] = svc.publishEvent(InProgress.String(), json.RawMessage{})
svc.sm.StateFunctions[ReceivingAlgorithm] = svc.publishEvent(InProgress.String(), json.RawMessage{})
svc.sm.StateFunctions[ReceivingData] = svc.publishEvent(InProgress.String(), json.RawMessage{})
svc.sm.StateFunctions[ConsumingResults] = svc.publishEvent(Ready.String(), json.RawMessage{})
svc.sm.StateFunctions[Complete] = svc.publishEvent(Completed.String(), json.RawMessage{})
svc.sm.StateFunctions[Running] = svc.runComputation
svc.sm.StateFunctions[Failed] = svc.publishEvent(Failed.String(), json.RawMessage{})
transitions := []statemachine.Transition{
{From: Idle, Event: Start, To: ReceivingManifest},
{From: ReceivingManifest, Event: ManifestReceived, To: ReceivingAlgorithm},
}

if len(cmp.Datasets) == 0 {
transitions = append(transitions, statemachine.Transition{From: ReceivingAlgorithm, Event: AlgorithmReceived, To: Running})

Check warning on line 144 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L144

Added line #L144 was not covered by tests
} else {
transitions = append(transitions, statemachine.Transition{From: ReceivingAlgorithm, Event: AlgorithmReceived, To: ReceivingData})
transitions = append(transitions, statemachine.Transition{From: ReceivingData, Event: DataReceived, To: Running})
}

go svc.sm.Start(ctx)
svc.sm.SendEvent(start)
transitions = append(transitions, []statemachine.Transition{
{From: Running, Event: RunComplete, To: ConsumingResults},
{From: Running, Event: RunFailed, To: Failed},
{From: ConsumingResults, Event: ResultsConsumed, To: Complete},
}...)

svc.computation = cmp
for _, t := range transitions {
sm.AddTransition(t)
}

sm.SetAction(Idle, svc.publishEvent(IdleState.String()))
sm.SetAction(ReceivingManifest, svc.publishEvent(InProgress.String()))
sm.SetAction(ReceivingAlgorithm, svc.publishEvent(InProgress.String()))
sm.SetAction(ReceivingData, svc.publishEvent(InProgress.String()))
sm.SetAction(Running, svc.runComputation)
sm.SetAction(ConsumingResults, svc.publishEvent(Ready.String()))
sm.SetAction(Complete, svc.publishEvent(Completed.String()))
sm.SetAction(Failed, svc.publishEvent(Failed.String()))

go func() {
if err := sm.Start(ctx); err != nil {
logger.Error(err.Error())
}
}()
sm.SendEvent(Start)
defer sm.SendEvent(ManifestReceived)

svc.sm.SendEvent(manifestReceived)
return svc
}

Expand Down Expand Up @@ -153,7 +222,7 @@

switch algoType {
case string(algorithm.AlgoTypeBin):
as.algorithm = binary.NewAlgorithm(as.sm.logger, as.eventSvc, f.Name(), args)
as.algorithm = binary.NewAlgorithm(as.logger, as.eventSvc, f.Name(), args)
case string(algorithm.AlgoTypePython):
var requirementsFile string
if len(algo.Requirements) > 0 {
Expand All @@ -171,19 +240,19 @@
requirementsFile = fr.Name()
}
runtime := python.PythonRunTimeFromContext(ctx)
as.algorithm = python.NewAlgorithm(as.sm.logger, as.eventSvc, runtime, requirementsFile, f.Name(), args)
as.algorithm = python.NewAlgorithm(as.logger, as.eventSvc, runtime, requirementsFile, f.Name(), args)
case string(algorithm.AlgoTypeWasm):
as.algorithm = wasm.NewAlgorithm(as.sm.logger, as.eventSvc, f.Name(), args)
as.algorithm = wasm.NewAlgorithm(as.logger, as.eventSvc, f.Name(), args)
case string(algorithm.AlgoTypeDocker):
as.algorithm = docker.NewAlgorithm(as.sm.logger, as.eventSvc, f.Name())
as.algorithm = docker.NewAlgorithm(as.logger, as.eventSvc, f.Name())
}

if err := os.Mkdir(algorithm.DatasetsDir, 0o755); err != nil {
return fmt.Errorf("error creating datasets directory: %v", err)
}

if as.algorithm != nil {
as.sm.SendEvent(algorithmReceived)
as.sm.SendEvent(AlgorithmReceived)
}

return nil
Expand Down Expand Up @@ -236,27 +305,30 @@
}

if len(as.computation.Datasets) == 0 {
defer as.sm.SendEvent(dataReceived)
defer as.sm.SendEvent(DataReceived)
}

return nil
}

func (as *agentService) Result(ctx context.Context) ([]byte, error) {
if as.sm.GetState() != ConsumingResults && as.sm.GetState() != Failed {
currentState := as.sm.GetState()
if currentState != ConsumingResults && currentState != Complete && currentState != Failed {
return []byte{}, ErrResultsNotReady
}
if len(as.computation.ResultConsumers) == 0 {
return []byte{}, ErrAllResultsConsumed
}

index, ok := IndexFromContext(ctx)
if !ok {
return []byte{}, ErrUndeclaredConsumer
}
as.computation.ResultConsumers = slices.Delete(as.computation.ResultConsumers, index, index+1)

if len(as.computation.ResultConsumers) == 0 && as.sm.GetState() == ConsumingResults {
defer as.sm.SendEvent(resultsConsumed)
if index < 0 || index >= len(as.computation.ResultConsumers) {
return []byte{}, ErrUndeclaredConsumer

Check warning on line 326 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L326

Added line #L326 was not covered by tests
}

if !as.resultsConsumed && currentState == ConsumingResults {
as.resultsConsumed = true
defer as.sm.SendEvent(ResultsConsumed)
}

return as.result, as.runError
Expand All @@ -271,59 +343,58 @@
return rawQuote, nil
}

func (as *agentService) runComputation() {
as.publishEvent(InProgress.String(), json.RawMessage{})()
as.sm.logger.Debug("computation run started")
func (as *agentService) runComputation(state statemachine.State) {
as.publishEvent(InProgress.String())(state)
as.logger.Debug("computation run started")
defer func() {
if as.runError != nil {
as.sm.SendEvent(runFailed)
as.sm.SendEvent(RunFailed)
} else {
as.sm.SendEvent(runComplete)
as.sm.SendEvent(RunComplete)

Check warning on line 353 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L353

Added line #L353 was not covered by tests
}
}()

if err := os.Mkdir(algorithm.ResultsDir, 0o755); err != nil {
as.runError = fmt.Errorf("error creating results directory: %s", err.Error())
as.sm.logger.Warn(as.runError.Error())
as.publishEvent(Failed.String(), json.RawMessage{})()
as.logger.Warn(as.runError.Error())
as.publishEvent(Failed.String())(state)

Check warning on line 360 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L359-L360

Added lines #L359 - L360 were not covered by tests
return
}

defer func() {
if err := os.RemoveAll(algorithm.ResultsDir); err != nil {
as.sm.logger.Warn(fmt.Sprintf("error removing results directory and its contents: %s", err.Error()))
as.logger.Warn(fmt.Sprintf("error removing results directory and its contents: %s", err.Error()))

Check warning on line 366 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L366

Added line #L366 was not covered by tests
}
if err := os.RemoveAll(algorithm.DatasetsDir); err != nil {
as.sm.logger.Warn(fmt.Sprintf("error removing datasets directory and its contents: %s", err.Error()))
as.logger.Warn(fmt.Sprintf("error removing datasets directory and its contents: %s", err.Error()))

Check warning on line 369 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L369

Added line #L369 was not covered by tests
}
}()

as.publishEvent(InProgress.String(), json.RawMessage{})()
as.publishEvent(InProgress.String())(state)
if err := as.algorithm.Run(); err != nil {
as.runError = err
as.sm.logger.Warn(fmt.Sprintf("failed to run computation: %s", err.Error()))
as.publishEvent(Failed.String(), json.RawMessage{})()
as.logger.Warn(fmt.Sprintf("failed to run computation: %s", err.Error()))
as.publishEvent(Failed.String())(state)
return
}

results, err := internal.ZipDirectoryToMemory(algorithm.ResultsDir)
if err != nil {
as.runError = err
as.sm.logger.Warn(fmt.Sprintf("failed to zip results: %s", err.Error()))
as.publishEvent(Failed.String(), json.RawMessage{})()
as.logger.Warn(fmt.Sprintf("failed to zip results: %s", err.Error()))
as.publishEvent(Failed.String())(state)

Check warning on line 385 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L384-L385

Added lines #L384 - L385 were not covered by tests
return
}

as.publishEvent(Completed.String(), json.RawMessage{})()
as.publishEvent(Completed.String())(state)

Check warning on line 389 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L389

Added line #L389 was not covered by tests

as.result = results
}

func (as *agentService) publishEvent(status string, details json.RawMessage) func() {
return func() {
st := as.sm.GetState().String()
if err := as.eventSvc.SendEvent(st, status, details); err != nil {
as.sm.logger.Warn(err.Error())
func (as *agentService) publishEvent(status string) statemachine.Action {
return func(state statemachine.State) {
if err := as.eventSvc.SendEvent(state.String(), status, json.RawMessage{}); err != nil {
as.logger.Warn(err.Error())

Check warning on line 397 in agent/service.go

View check run for this annotation

Codecov / codecov/patch

agent/service.go#L397

Added line #L397 was not covered by tests
}
}
}
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载