+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ const (
)

type config struct {
LogLevel string `env:"MANAGER_LOG_LEVEL" envDefault:"info"`
JaegerURL url.URL `env:"COCOS_JAEGER_URL" envDefault:"http://localhost:4318"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
InstanceID string `env:"MANAGER_INSTANCE_ID" envDefault:""`
LogLevel string `env:"MANAGER_LOG_LEVEL" envDefault:"info"`
JaegerURL url.URL `env:"COCOS_JAEGER_URL" envDefault:"http://localhost:4318"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
InstanceID string `env:"MANAGER_INSTANCE_ID" envDefault:""`
BackendMeasurementBinary string `env:"BACKEND_MEASUREMENT_BINARY" envDefault:"../../build"`
}

func main() {
Expand Down Expand Up @@ -103,7 +104,7 @@ func main() {
}

eventsChan := make(chan *pkgmanager.ClientStreamMessage)
svc := newService(logger, tracer, qemuCfg, eventsChan)
svc := newService(logger, tracer, qemuCfg, eventsChan, cfg.BackendMeasurementBinary)

mc := managerapi.NewClient(pc, svc, eventsChan)

Expand All @@ -120,8 +121,8 @@ func main() {
}
}

func newService(logger *slog.Logger, tracer trace.Tracer, qemuCfg qemu.Config, eventsChan chan *pkgmanager.ClientStreamMessage) manager.Service {
svc := manager.New(qemuCfg, logger, eventsChan, qemu.NewVM)
func newService(logger *slog.Logger, tracer trace.Tracer, qemuCfg qemu.Config, eventsChan chan *pkgmanager.ClientStreamMessage, backendMeasurementPath string) manager.Service {
svc := manager.New(qemuCfg, backendMeasurementPath, logger, eventsChan, qemu.NewVM)
go svc.RetrieveAgentEventsLogs()
svc = api.LoggingMiddleware(svc, logger)
counter, latency := prometheus.MakeMetrics(svcName, "api")
Expand Down
12 changes: 12 additions & 0 deletions manager/api/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ func (client ManagerClient) Process(ctx context.Context, cancel context.CancelFu
if err := client.svc.Stop(ctx, mes.StopComputation.ComputationId); err != nil {
return err
}
case *pkgmanager.ServerStreamMessage_BackendInfoReq:
res, err := client.svc.FetchBackendInfo()
if err != nil {
return err
}
info := &pkgmanager.ClientStreamMessage_BackendInfo{BackendInfo: &pkgmanager.BackendInfo{
Info: res,
Id: mes.BackendInfoReq.Id,
}}
if err := client.stream.Send(&pkgmanager.ClientStreamMessage{Message: info}); err != nil {
return err
}
}
}
})
Expand Down
9 changes: 9 additions & 0 deletions manager/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,12 @@ func (lm *loggingMiddleware) Stop(ctx context.Context, computationID string) (er
func (lm *loggingMiddleware) RetrieveAgentEventsLogs() {
lm.svc.RetrieveAgentEventsLogs()
}

func (lm *loggingMiddleware) FetchBackendInfo() ([]byte, error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method FetchBackendInfo took %s to complete", time.Since(begin))
lm.logger.Info(message)
}(time.Now())

return lm.svc.FetchBackendInfo()
}
9 changes: 9 additions & 0 deletions manager/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,12 @@ func (ms *metricsMiddleware) Stop(ctx context.Context, computationID string) err
func (ms *metricsMiddleware) RetrieveAgentEventsLogs() {
ms.svc.RetrieveAgentEventsLogs()
}

func (ms *metricsMiddleware) FetchBackendInfo() ([]byte, error) {
defer func(begin time.Time) {
ms.counter.With("method", "FetchBackendInfo").Add(1)
ms.latency.With("method", "FetchBackendInfo").Observe(time.Since(begin).Seconds())
}(time.Now())

return ms.svc.FetchBackendInfo()
}
11 changes: 11 additions & 0 deletions manager/manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ message RunResponse{
string computation_id = 2;
}

message BackendInfo{
bytes info = 1;
string id = 2;
}

message AgentEvent {
string event_type = 1;
google.protobuf.Timestamp timestamp = 2;
Expand All @@ -47,6 +52,7 @@ message ClientStreamMessage {
AgentLog agent_log = 1;
AgentEvent agent_event = 2;
RunResponse run_res = 3;
BackendInfo backendInfo = 4;
}
}

Expand All @@ -55,6 +61,7 @@ message ServerStreamMessage {
ComputationRunReq runReq = 1;
Terminate terminateReq = 2;
StopComputation stopComputation = 3;
BackendInfoReq backendInfoReq = 4;
}
}

Expand All @@ -68,6 +75,10 @@ message ComputationRunReq {
AgentConfig agent_config = 7;
}

message BackendInfoReq {
string id = 1;
}

message ResultConsumer {
bytes userKey = 1;
}
Expand Down
4 changes: 2 additions & 2 deletions manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
)

func TestNew(t *testing.T) {
qemuCfg := qemu.Config{}
cfg := qemu.Config{}
logger := slog.Default()
eventsChan := make(chan *manager.ClientStreamMessage)
vmf := new(mocks.Provider)

service := New(qemuCfg, logger, eventsChan, vmf.Execute)
service := New(cfg, "", logger, eventsChan, vmf.Execute)

assert.NotNil(t, service)
assert.IsType(t, &managerService{}, service)
Expand Down
48 changes: 35 additions & 13 deletions manager/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"log/slog"
"net"
"os"
"os/exec"
"strconv"

"github.com/absmach/magistrala/pkg/errors"
Expand Down Expand Up @@ -53,28 +55,32 @@ type Service interface {
Stop(ctx context.Context, computationID string) error
// RetrieveAgentEventsLogs Retrieve and forward agent logs and events via vsock.
RetrieveAgentEventsLogs()
// FetchBackendInfo measures and fetches the backend information.
FetchBackendInfo() ([]byte, error)
}

type managerService struct {
qemuCfg qemu.Config
logger *slog.Logger
agents map[int]string // agent map of vsock cid to computationID.
eventsChan chan *manager.ClientStreamMessage
vms map[string]vm.VM
vmFactory vm.Provider
qemuCfg qemu.Config
backendMeasurementBinaryPath string
logger *slog.Logger
agents map[int]string // agent map of vsock cid to computationID.
eventsChan chan *manager.ClientStreamMessage
vms map[string]vm.VM
vmFactory vm.Provider
}

var _ Service = (*managerService)(nil)

// New instantiates the manager service implementation.
func New(qemuCfg qemu.Config, logger *slog.Logger, eventsChan chan *manager.ClientStreamMessage, vmFactory vm.Provider) Service {
func New(cfg qemu.Config, backendMeasurementBinPath string, logger *slog.Logger, eventsChan chan *manager.ClientStreamMessage, vmFactory vm.Provider) Service {
ms := &managerService{
qemuCfg: qemuCfg,
logger: logger,
agents: make(map[int]string),
vms: make(map[string]vm.VM),
eventsChan: eventsChan,
vmFactory: vmFactory,
qemuCfg: cfg,
logger: logger,
agents: make(map[int]string),
vms: make(map[string]vm.VM),
eventsChan: eventsChan,
vmFactory: vmFactory,
backendMeasurementBinaryPath: backendMeasurementBinPath,
}
return ms
}
Expand Down Expand Up @@ -160,6 +166,22 @@ func (ms *managerService) Stop(ctx context.Context, computationID string) error
return nil
}

func (ms *managerService) FetchBackendInfo() ([]byte, error) {
cmd := exec.Command("sudo", fmt.Sprintf("%s/backend_info", ms.backendMeasurementBinaryPath), "--policy", "1966081")

_, err := cmd.Output()
if err != nil {
return nil, err
}

f, err := os.ReadFile("./backend_info.json")
if err != nil {
return nil, err
}

return f, nil
}

func getFreePort() (int, error) {
listener, err := net.Listen("tcp", "")
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions manager/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ func (tm *tracingMiddleware) Stop(ctx context.Context, computationID string) err
func (tm *tracingMiddleware) RetrieveAgentEventsLogs() {
tm.svc.RetrieveAgentEventsLogs()
}

func (tm *tracingMiddleware) FetchBackendInfo() ([]byte, error) {
_, span := tm.tracer.Start(context.Background(), "fetch_backend_info")
defer span.End()

return tm.svc.FetchBackendInfo()
}
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载