+
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions p2p/MockRemotePeer_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions p2p/blkreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,21 @@ func (br *BlocksChunkReceiver) ReceiveResp(msg Message, msgBody proto.Message) (
br.peer.consumeRequest(br.requestID)
return
}
// remote peer response failure
body := msgBody.(*types.GetBlockResponse)
if body.Status != types.ResultStatus_OK || len(body.Blocks) == 0 {
respBody, ok := msgBody.(types.ResponseMessage)
if !ok || respBody.GetStatus() != types.ResultStatus_OK {
br.actor.TellRequest(message.SyncerSvc, &message.GetBlockChunksRsp{ToWhom:br.peer.ID(), Err:message.RemotePeerFailError})
br.finished = true
br.peer.consumeRequest(br.requestID)
return
}
// remote peer response failure
body, ok := msgBody.(*types.GetBlockResponse)
if !ok || len(body.Blocks) == 0 {
br.actor.TellRequest(message.SyncerSvc, &message.GetBlockChunksRsp{ToWhom:br.peer.ID(), Err:message.MissingHashError})
br.finished = true
br.peer.consumeRequest(br.requestID)
return
}

// add to Got
for _, block := range body.Blocks {
Expand Down
1 change: 1 addition & 0 deletions p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p-peer"
)

// HSHandlerFactory is creator of HSHandler
type HSHandlerFactory interface {
CreateHSHandler(outbound bool, pm PeerManager, actor ActorService, log *log.Logger, pid peer.ID) HSHandler
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/mockPeerManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (_m *MockPeerManager) AddNewPeer(_a0 PeerMeta) {
}

// RemovePeer provides a mock function with given fields: _a0
func (_m *MockPeerManager) RemovePeer(_a0 peer.ID) {
func (_m *MockPeerManager) RemovePeer(_a0 RemotePeer) {
_m.Called(_a0)
}

Expand Down
108 changes: 70 additions & 38 deletions p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net"
"strconv"
"sync"
"sync/atomic"
"time"

inet "github.com/libp2p/go-libp2p-net"
Expand All @@ -33,7 +34,8 @@ type PeerManager interface {
SelfNodeID() peer.ID

AddNewPeer(peer PeerMeta)
RemovePeer(peerID peer.ID)
// Remove peer from peer list. Peer dispose relative resources and stop itself, and then call RemovePeer to peermanager
RemovePeer(peer RemotePeer)
// NotifyPeerHandshake is called after remote peer is completed handshake and ready to receive or send
NotifyPeerHandshake(peerID peer.ID)
NotifyPeerAddressReceived([]PeerMeta)
Expand All @@ -58,18 +60,20 @@ type peerManager struct {
rm ReconnectManager
mm metric.MetricsManager

// designatedPeers and hiddenPeerSet is set in construction time once and will not be changed
designatedPeers map[peer.ID]PeerMeta
hiddenPeerSet map[peer.ID]bool

manageNumber uint32
remotePeers map[peer.ID]*remotePeerImpl
peerPool map[peer.ID]PeerMeta
conf *cfg.P2PConfig
logger *log.Logger
mutex *sync.Mutex
// peerCache is copy-on-write style
peerCache []RemotePeer

addPeerChannel chan PeerMeta
removePeerChannel chan peer.ID
fillPoolChannel chan []PeerMeta
finishChannel chan struct{}
eventListeners []PeerEventListener
Expand Down Expand Up @@ -111,7 +115,6 @@ func NewPeerManager(handlerFactory HandlerFactory, hsFactory HSHandlerFactory, i
peerCache: make([]RemotePeer, 0, p2pConf.NPMaxPeers),

addPeerChannel: make(chan PeerMeta, 2),
removePeerChannel: make(chan peer.ID),
fillPoolChannel: make(chan []PeerMeta, 2),
eventListeners: make([]PeerEventListener, 0, 4),
finishChannel: make(chan struct{}),
Expand Down Expand Up @@ -204,12 +207,6 @@ MANLOOP:
pm.rm.CancelJob(meta.ID)
}
}
case id := <-pm.removePeerChannel:
if pm.removePeer(id) {
if meta, found := pm.designatedPeers[id]; found {
pm.rm.AddJob(meta)
}
}
case <-initialTimer.C:
initialTimer.Stop()
pm.checkAndCollectPeerListFromAll()
Expand All @@ -220,16 +217,37 @@ MANLOOP:
pm.tryFillPool(&peerMetas)
case <-pm.finishChannel:
addrTicker.Stop()
pm.nt.RemoveStreamHandler(aergoP2PSub)
pm.rm.Stop()
// TODO need to keep loop till all remote peer objects are removed, otherwise panic or channel deadlock can come.
break MANLOOP
}
}
// guarrenty no new peer connection will be made
pm.rm.Stop()
pm.nt.RemoveStreamHandler(aergoP2PSub)
pm.logger.Info().Msg("Finishing peerManager")

// cleanup peers
for peerID := range pm.remotePeers {
pm.removePeer(peerID)
go func() {
// closing all peer connections
for _, peer := range pm.peerCache {
peer.stop()
}
}()
timer := time.NewTimer(time.Second*30)
finishPoll := time.NewTicker(time.Second)
CLEANUPLOOP:
for {
select {
case <-finishPoll.C:
pm.mutex.Lock()
if len(pm.remotePeers) == 0 {
pm.mutex.Unlock()
pm.logger.Debug().Msg("All peers were finished peerManager")
break CLEANUPLOOP
}
pm.mutex.Unlock()
case <-timer.C:
pm.logger.Warn().Int("remained",len(pm.peerCache)).Msg("peermanager stop timeout. some peers were not finished.")
break CLEANUPLOOP
}
}
}

Expand Down Expand Up @@ -261,7 +279,7 @@ func (pm *peerManager) addOutboundPeer(meta PeerMeta) bool {
}

// tryAddPeer will do check connecting peer and add. it will return peer meta information received from
// remote peer setup some
// remote peer. stream s will be owned to remotePeer if succeed to add perr.
func (pm *peerManager) tryAddPeer(outbound bool, meta PeerMeta, s inet.Stream) (PeerMeta, bool) {
var peerID = meta.ID
rd := metric.NewReader(s)
Expand All @@ -284,7 +302,7 @@ func (pm *peerManager) tryAddPeer(outbound bool, meta PeerMeta, s inet.Stream) (
_, receivedMeta.Designated = pm.designatedPeers[peerID]

// adding peer to peer list
newPeer, err := pm.registerPeer(peerID, receivedMeta, rw)
newPeer, err := pm.registerPeer(peerID, receivedMeta, s, rw)
if err != nil {
pm.sendGoAway(rw, err.Error())
return meta, false
Expand All @@ -303,7 +321,7 @@ func (pm *peerManager) tryAddPeer(outbound bool, meta PeerMeta, s inet.Stream) (
return receivedMeta, true
}

func (pm *peerManager) registerPeer(peerID peer.ID, receivedMeta PeerMeta, rw MsgReadWriter) (*remotePeerImpl, error) {
func (pm *peerManager) registerPeer(peerID peer.ID, receivedMeta PeerMeta, s inet.Stream, rw MsgReadWriter) (*remotePeerImpl, error) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
preExistPeer, ok := pm.remotePeers[peerID]
Expand All @@ -315,19 +333,19 @@ func (pm *peerManager) registerPeer(peerID peer.ID, receivedMeta PeerMeta, rw Ms
return nil, fmt.Errorf("Already handshake peer %s ", peerID.Pretty())
} else {
pm.logger.Info().Str("local_peer_id", pm.SelfNodeID().Pretty()).Str(LogPeerID, peerID.Pretty()).Bool("outbound", receivedMeta.Outbound).Msg("Keep connection and close earlier handshake connection.")
// TODO send goaway messge to pre-exist peer
// disconnect lower valued connection
pm.deletePeer(receivedMeta.ID)
// stopping lower valued connection
preExistPeer.stop()
}
}

outboundPeer := newRemotePeer(receivedMeta, pm, pm.actorService, pm.logger, pm.mf, pm.signer, rw)
outboundPeer := newRemotePeer(receivedMeta, pm, pm.actorService, pm.logger, pm.mf, pm.signer, s, rw)
outboundPeer.manageNum = pm.GetNextManageNum()
// insert Handlers
pm.handlerFactory.insertHandlers(outboundPeer)

go outboundPeer.runPeer()
pm.insertPeer(peerID, outboundPeer)
pm.logger.Info().Bool("outbound", receivedMeta.Outbound).Str(LogPeerID, peerID.Pretty()).Str("addr", net.ParseIP(receivedMeta.IPAddress).String()+":"+strconv.Itoa(int(receivedMeta.Port))).Msg("peer is added to peerService")
pm.logger.Info().Bool("outbound", receivedMeta.Outbound).Str(LogPeerID, peerID.Pretty()).Uint32("manage_num",outboundPeer.ManageNumber()).Str("addr", net.ParseIP(receivedMeta.IPAddress).String()+":"+strconv.Itoa(int(receivedMeta.Port))).Msg("peer is added to peerService")

return outboundPeer, nil
}
Expand All @@ -342,6 +360,9 @@ func (pm *peerManager) doPostHandshake(peerID peer.ID, remoteStatus *types.Statu
// TODO add tx handling
}

func (pm *peerManager) GetNextManageNum() uint32 {
return atomic.AddUint32(&pm.manageNumber,1)
}
func (pm *peerManager) sendGoAway(rw MsgReadWriter, msg string) {
goMsg := &types.GoAwayNotice{Message: msg}
// TODO code smell. non safe casting.
Expand All @@ -355,8 +376,8 @@ func (pm *peerManager) AddNewPeer(peer PeerMeta) {
pm.addPeerChannel <- peer
}

func (pm *peerManager) RemovePeer(peerID peer.ID) {
pm.removePeerChannel <- peerID
func (pm *peerManager) RemovePeer(peer RemotePeer) {
pm.removePeer(peer)
}

func (pm *peerManager) NotifyPeerHandshake(peerID peer.ID) {
Expand All @@ -367,25 +388,34 @@ func (pm *peerManager) NotifyPeerAddressReceived(metas []PeerMeta) {
pm.fillPoolChannel <- metas
}

// removePeer remove and disconnect managed remote peer connection
// removePeer unregister managed remote peer connection
// It return true if peer is exist and managed by peermanager
func (pm *peerManager) removePeer(peerID peer.ID) bool {
func (pm *peerManager) removePeer(peer RemotePeer) bool {
peerID := peer.ID()
pm.mutex.Lock()
target, ok := pm.remotePeers[peerID]
if !ok {
pm.mutex.Unlock()
return false
}
if target.manageNum != peer.ManageNumber() {
pm.logger.Debug().Uint32("remove_num", peer.ManageNumber()).Uint32("exist_num", target.ManageNumber()).Str(LogPeerID, target.ID().Pretty()).Msg("remove peer is requested but already removed and other instance is on")
pm.mutex.Unlock()
return false
}
if target.State() == types.RUNNING {
pm.logger.Warn().Str(LogPeerID, target.ID().Pretty()).Msg("remove peer is requested but peer is still running")
}
pm.deletePeer(peerID)
// No internal module access this peer anymore, but remote message can be received.
target.stop()
pm.logger.Info().Uint32("manage_num",peer.ManageNumber()).Str(LogPeerID, target.ID().Pretty()).Msg("removed peer in peermanager")
pm.mutex.Unlock()
for _, listener := range pm.eventListeners {
listener.OnRemovePeer(peerID)
}

// also disconnect connection
pm.nt.ClosePeerConnection(peerID)
if meta, found := pm.designatedPeers[peer.ID()]; found {
pm.rm.AddJob(meta)
}
return true
}

Expand All @@ -411,8 +441,9 @@ func (pm *peerManager) checkAndCollectPeerListFromAll() {
pm.actorService.SendRequest(message.P2PSvc, &message.MapQueryMsg{Count: MaxAddrListSizePolaris})
}

for _, remotePeer := range pm.remotePeers {
pm.actorService.SendRequest(message.P2PSvc, &message.GetAddressesMsg{ToWhom: remotePeer.meta.ID, Size: MaxAddrListSizePeer, Offset: 0})
// not strictly need to check peers. so use cache instead
for _, remotePeer := range pm.peerCache {
pm.actorService.SendRequest(message.P2PSvc, &message.GetAddressesMsg{ToWhom: remotePeer.ID(), Size: MaxAddrListSizePeer, Offset: 0})
}
}

Expand Down Expand Up @@ -503,12 +534,13 @@ func (pm *peerManager) GetPeerAddresses() ([]*types.PeerAddress, []bool, []*type
hiddens := make([]bool, 0, len(pm.remotePeers))
blks := make([]*types.NewBlockNotice, 0, len(pm.remotePeers))
states := make([]types.PeerState, 0, len(pm.remotePeers))
for _, aPeer := range pm.remotePeers {
addr := aPeer.meta.ToPeerAddress()
hiddens = append(hiddens, aPeer.meta.Hidden)
for _, aPeer := range pm.peerCache {
meta := aPeer.Meta()
addr := meta.ToPeerAddress()
hiddens = append(hiddens, meta.Hidden)
peers = append(peers, &addr)
blks = append(blks, aPeer.lastNotice)
states = append(states, aPeer.state)
blks = append(blks, aPeer.LastNotice())
states = append(states, aPeer.State())
}
return peers, hiddens, blks, states
}
Expand Down
16 changes: 2 additions & 14 deletions p2p/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func FailTestGetPeers(t *testing.T) {
for i := 0; i < iterSize; i++ {
peerID := peer.ID(strconv.Itoa(i))
peerMeta := PeerMeta{ID: peerID}
target.remotePeers[peerID] = newRemotePeer(peerMeta, target, mockActorServ, logger, nil, nil, nil)
target.remotePeers[peerID] = newRemotePeer(peerMeta, target, mockActorServ, logger, nil, nil, nil, nil)
if i == (iterSize >> 2) {
wg.Done()
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestPeerManager_GetPeers(t *testing.T) {
for i := 0; i < iterSize; i++ {
peerID := peer.ID(strconv.Itoa(i))
peerMeta := PeerMeta{ID: peerID}
target.insertPeer(peerID, newRemotePeer(peerMeta, target, mockActorServ, logger, nil, nil,nil))
target.insertPeer(peerID, newRemotePeer(peerMeta, target, mockActorServ, logger, nil, nil, nil, nil))
if i == (iterSize >> 2) {
wg.Done()
}
Expand Down Expand Up @@ -176,15 +176,3 @@ func TestPeerManager_init(t *testing.T) {
}
}

func TestPeerManager_addOutboundPeer(t *testing.T) {
tests := []struct {
name string
}{
// TODO: test cases
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {

})
}
}
8 changes: 4 additions & 4 deletions p2p/protobufHelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Test_pbRequestOrder_SendTo(t *testing.T) {

mockRW := new(MockMsgReadWriter)
mockRW.On("WriteMsg", mock.Anything).Return(tt.writeErr)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, mockRW)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, nil, mockRW)

pr := factory.newMsgRequestOrder(true, PingRequest, &types.Ping{})
prevCacheSize := len(peer.requests)
Expand Down Expand Up @@ -76,7 +76,7 @@ func Test_pbMessageOrder_SendTo(t *testing.T) {

mockRW := new(MockMsgReadWriter)
mockRW.On("WriteMsg", mock.Anything).Return(tt.writeErr)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, mockRW)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, nil, mockRW)

pr := factory.newMsgResponseOrder(NewMsgID(), PingResponse, &types.Pong{})
msgID := pr.GetMsgID()
Expand Down Expand Up @@ -118,7 +118,7 @@ func Test_pbBlkNoticeOrder_SendTo(t *testing.T) {
mockPeerManager := new(MockPeerManager)
mockRW := new(MockMsgReadWriter)
mockRW.On("WriteMsg", mock.Anything).Return(tt.writeErr)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, mockRW)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, nil, mockRW)

target := factory.newMsgBlkBroadcastOrder(&types.NewBlockNotice{BlockHash: dummyBlockHash})
msgID := sampleMsgID
Expand Down Expand Up @@ -172,7 +172,7 @@ func Test_pbTxNoticeOrder_SendTo(t *testing.T) {

mockRW := new(MockMsgReadWriter)
mockRW.On("WriteMsg", mock.Anything).Return(tt.writeErr)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, mockRW)
peer := newRemotePeer(sampleMeta, mockPeerManager, mockActorServ, logger, factory, &dummySigner{}, nil, mockRW)

pr := factory.newMsgTxBroadcastOrder(&types.NewTransactionsNotice{TxHashes: sampleHashes})
msgID := pr.GetMsgID()
Expand Down
Loading
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载