+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions manager/agentEventsLogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package manager

import (
"net"
"testing"
"time"

mglog "github.com/absmach/magistrala/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/ultravioletrs/cocos/manager/qemu"
"github.com/ultravioletrs/cocos/manager/vm"
"github.com/ultravioletrs/cocos/pkg/manager"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

type MockConn struct {
mock.Mock
}

func (m *MockConn) Read(b []byte) (n int, err error) {
args := m.Called(b)
return args.Int(0), args.Error(1)
}

func (m *MockConn) Write(b []byte) (n int, err error) {
args := m.Called(b)
return args.Int(0), args.Error(1)
}

func (m *MockConn) Close() error {
args := m.Called()
return args.Error(0)
}

func (m *MockConn) LocalAddr() net.Addr {
args := m.Called()
return args.Get(0).(net.Addr)
}

func (m *MockConn) RemoteAddr() net.Addr {
args := m.Called()
return args.Get(0).(net.Addr)
}

func (m *MockConn) SetDeadline(t time.Time) error {
args := m.Called(t)
return args.Error(0)
}

func (m *MockConn) SetReadDeadline(t time.Time) error {
args := m.Called(t)
return args.Error(0)
}

func (m *MockConn) SetWriteDeadline(t time.Time) error {
args := m.Called(t)
return args.Error(0)
}

type MockAddr struct {
mock.Mock
}

func (m *MockAddr) Network() string {
args := m.Called()
return args.String(0)
}

func (m *MockAddr) String() string {
args := m.Called()
return args.String(0)
}

func TestComputationIDFromAddress(t *testing.T) {
ms := &managerService{
vms: map[string]vm.VM{
"comp1": qemu.NewVM(qemu.Config{VSockConfig: qemu.VSockConfig{GuestCID: 3}}, make(chan *manager.ClientStreamMessage), "comp1"),
"comp2": qemu.NewVM(qemu.Config{VSockConfig: qemu.VSockConfig{GuestCID: 5}}, make(chan *manager.ClientStreamMessage), "comp2"),
},
}

tests := []struct {
name string
address string
want string
wantErr bool
}{
{"Valid address", "vm(3)", "comp1", false},
{"Invalid address", "invalid", "", true},
{"Non-existent CID", "vm(10)", "", true},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ms.computationIDFromAddress(tt.address)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
}
})
}
}

func TestHandleConnection(t *testing.T) {
ms := &managerService{
vms: map[string]vm.VM{
"comp1": qemu.NewVM(qemu.Config{VSockConfig: qemu.VSockConfig{GuestCID: 3}}, make(chan *manager.ClientStreamMessage), "comp1"),
},
eventsChan: make(chan *manager.ClientStreamMessage, 1),
logger: mglog.NewMock(),
}

mockConn := new(MockConn)
mockAddr := new(MockAddr)
mockConn.On("RemoteAddr").Return(mockAddr)
mockConn.On("Close").Return(nil)
mockAddr.On("String").Return("vm(3)")

msg := &manager.ClientStreamMessage{
Message: &manager.ClientStreamMessage_AgentEvent{
AgentEvent: &manager.AgentEvent{
EventType: manager.VmRunning.String(),
ComputationId: "comp1",
Status: manager.VmRunning.String(),
Timestamp: timestamppb.Now(),
Originator: "agent",
},
},
}
msgBytes, _ := proto.Marshal(msg)

mockConn.On("Read", mock.Anything).Return(len(msgBytes), nil).Run(func(args mock.Arguments) {
copy(args.Get(0).([]byte), msgBytes)
}).Once()

mockConn.On("Read", mock.Anything).Return(0, net.ErrClosed)

go ms.handleConnection(mockConn)

receivedMsg := <-ms.eventsChan
assert.Equal(t, msg.GetAgentEvent().EventType, receivedMsg.GetAgentEvent().EventType)
assert.Equal(t, msg.GetAgentEvent().ComputationId, receivedMsg.GetAgentEvent().ComputationId)

mockConn.AssertExpectations(t)
}

func TestReportBrokenConnection(t *testing.T) {
ms := &managerService{
eventsChan: make(chan *manager.ClientStreamMessage, 1),
}

ms.reportBrokenConnection("comp1")

select {
case msg := <-ms.eventsChan:
assert.Equal(t, "comp1", msg.GetAgentEvent().ComputationId)
assert.Equal(t, manager.Disconnected.String(), msg.GetAgentEvent().Status)
assert.Equal(t, "manager", msg.GetAgentEvent().Originator)
default:
t.Error("Expected message in eventsChan, but none received")
}
}
181 changes: 181 additions & 0 deletions manager/api/grpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (c) Ultraviolet
// SPDX-License-Identifier: Apache-2.0
package grpc

import (
"context"
"testing"
"time"

mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/ultravioletrs/cocos/manager/mocks"
pkgmanager "github.com/ultravioletrs/cocos/pkg/manager"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)

type mockStream struct {
mock.Mock
grpc.ClientStream
}

func (m *mockStream) Recv() (*pkgmanager.ServerStreamMessage, error) {
args := m.Called()
return args.Get(0).(*pkgmanager.ServerStreamMessage), args.Error(1)
}

func (m *mockStream) Send(msg *pkgmanager.ClientStreamMessage) error {
args := m.Called(msg)
return args.Error(0)
}

func TestManagerClient_Process(t *testing.T) {
mockStream := new(mockStream)
mockSvc := new(mocks.Service)
messageQueue := make(chan *pkgmanager.ClientStreamMessage, 10)
logger := mglog.NewMock()

client := NewClient(mockStream, mockSvc, messageQueue, logger)

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

mockStream.On("Recv").Return(&pkgmanager.ServerStreamMessage{Message: &pkgmanager.ServerStreamMessage_StopComputation{StopComputation: &pkgmanager.StopComputation{}}}, nil).Maybe()
mockStream.On("Send", mock.Anything).Return(nil).Maybe()

mockSvc.On("Stop", mock.Anything, mock.Anything).Return(nil).Maybe()

err := client.Process(ctx, cancel)

assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
}

func TestManagerClient_handleRunReqChunks(t *testing.T) {
mockStream := new(mockStream)
mockSvc := new(mocks.Service)
messageQueue := make(chan *pkgmanager.ClientStreamMessage, 10)
logger := mglog.NewMock()

client := NewClient(mockStream, mockSvc, messageQueue, logger)

runReq := &pkgmanager.ComputationRunReq{
Id: "test-id",
}
runReqBytes, _ := proto.Marshal(runReq)

chunk1 := &pkgmanager.ServerStreamMessage_RunReqChunks{
RunReqChunks: &pkgmanager.RunReqChunks{
Id: "chunk-1",
Data: runReqBytes[:len(runReqBytes)/2],
IsLast: false,
},
}
chunk2 := &pkgmanager.ServerStreamMessage_RunReqChunks{
RunReqChunks: &pkgmanager.RunReqChunks{
Id: "chunk-1",
Data: runReqBytes[len(runReqBytes)/2:],
IsLast: true,
},
}

mockSvc.On("Run", mock.Anything, mock.AnythingOfType("*manager.ComputationRunReq")).Return("8080", nil)

err := client.handleRunReqChunks(context.Background(), chunk1)
assert.NoError(t, err)

err = client.handleRunReqChunks(context.Background(), chunk2)
assert.NoError(t, err)

// Wait for the goroutine to finish
time.Sleep(50 * time.Millisecond)

mockSvc.AssertExpectations(t)
assert.Len(t, messageQueue, 1)

msg := <-messageQueue
runRes, ok := msg.Message.(*pkgmanager.ClientStreamMessage_RunRes)
assert.True(t, ok)
assert.Equal(t, "8080", runRes.RunRes.AgentPort)
assert.Equal(t, "test-id", runRes.RunRes.ComputationId)
}

func TestManagerClient_handleTerminateReq(t *testing.T) {
client := ManagerClient{}

terminateReq := &pkgmanager.ServerStreamMessage_TerminateReq{
TerminateReq: &pkgmanager.Terminate{
Message: "Test termination",
},
}

err := client.handleTerminateReq(terminateReq)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Test termination")
assert.True(t, errors.Contains(err, errTerminationFromServer))
}

func TestManagerClient_handleStopComputation(t *testing.T) {
mockStream := new(mockStream)
mockSvc := new(mocks.Service)
messageQueue := make(chan *pkgmanager.ClientStreamMessage, 10)
logger := mglog.NewMock()

client := NewClient(mockStream, mockSvc, messageQueue, logger)

stopReq := &pkgmanager.ServerStreamMessage_StopComputation{
StopComputation: &pkgmanager.StopComputation{
ComputationId: "test-comp-id",
},
}

mockSvc.On("Stop", mock.Anything, "test-comp-id").Return(nil)

client.handleStopComputation(context.Background(), stopReq)

// Wait for the goroutine to finish
time.Sleep(50 * time.Millisecond)

mockSvc.AssertExpectations(t)
assert.Len(t, messageQueue, 1)

msg := <-messageQueue
stopRes, ok := msg.Message.(*pkgmanager.ClientStreamMessage_StopComputationRes)
assert.True(t, ok)
assert.Equal(t, "test-comp-id", stopRes.StopComputationRes.ComputationId)
assert.Empty(t, stopRes.StopComputationRes.Message)
}

func TestManagerClient_handleBackendInfoReq(t *testing.T) {
mockStream := new(mockStream)
mockSvc := new(mocks.Service)
messageQueue := make(chan *pkgmanager.ClientStreamMessage, 10)
logger := mglog.NewMock()

client := NewClient(mockStream, mockSvc, messageQueue, logger)

infoReq := &pkgmanager.ServerStreamMessage_BackendInfoReq{
BackendInfoReq: &pkgmanager.BackendInfoReq{
Id: "test-info-id",
},
}

mockSvc.On("FetchBackendInfo").Return([]byte("test-backend-info"), nil)

client.handleBackendInfoReq(infoReq)

// Wait for the goroutine to finish
time.Sleep(50 * time.Millisecond)

mockSvc.AssertExpectations(t)
assert.Len(t, messageQueue, 1)

msg := <-messageQueue
infoRes, ok := msg.Message.(*pkgmanager.ClientStreamMessage_BackendInfo)
assert.True(t, ok)
assert.Equal(t, "test-info-id", infoRes.BackendInfo.Id)
assert.Equal(t, []byte("test-backend-info"), infoRes.BackendInfo.Info)
}
27 changes: 18 additions & 9 deletions manager/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,31 @@

eg.Go(func() error {
for {
req, err := stream.Recv()
if err != nil {
return err
select {
case <-ctx.Done():
return ctx.Err()
default:
req, err := stream.Recv()
if err != nil {
return err

Check warning on line 65 in manager/api/grpc/server.go

View check run for this annotation

Codecov / codecov/patch

manager/api/grpc/server.go#L65

Added line #L65 was not covered by tests
}
s.incoming <- req
}

s.incoming <- req
}
})

eg.Go(func() error {
sendMessage := func(msg *manager.ServerStreamMessage) error {
switch m := msg.Message.(type) {
case *manager.ServerStreamMessage_RunReq:
return s.sendRunReqInChunks(stream, m.RunReq)
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 76 in manager/api/grpc/server.go

View check run for this annotation

Codecov / codecov/patch

manager/api/grpc/server.go#L75-L76

Added lines #L75 - L76 were not covered by tests
default:
return stream.Send(msg)
switch m := msg.Message.(type) {
case *manager.ServerStreamMessage_RunReq:
return s.sendRunReqInChunks(stream, m.RunReq)
default:
return stream.Send(msg)

Check warning on line 82 in manager/api/grpc/server.go

View check run for this annotation

Codecov / codecov/patch

manager/api/grpc/server.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}
}
}

Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载