8000 libnetwork/networkdb: prioritize local broadcasts · moby/moby@ce52ebc · GitHub
[go: up one dir, main page]

Skip to content

Commit ce52ebc

Browse files
committed
libnetwork/networkdb: prioritize local broadcasts
A network node is responsible for both broadcasting table events for entries it owns and for rebroadcasting table events from other nodes it has received. Table events to be broadcast are added to a single queue per network, including events for rebroadcasting. As the memberlist TransmitLimitedQueue is (to a first approximation) LIFO, a flood of events from other nodes could delay the broadcasting of locally-generated events indefinitely. Prioritize broadcasting local events by splitting up the queues and only pulling from the rebroadcast queue if there is free space in the gossip packet after draining the local-broadcast queue. Signed-off-by: Cory Snider <csnider@mirantis.com>
1 parent 6fd0e80 commit ce52ebc

File tree

4 files changed

+32
-18
lines changed

4 files changed

+32
-18
lines changed

libnetwork/networkdb/broadcast.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,21 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
170170
})
171171
return nil
172172
}
173+
174+
func getBroadasts(overhead, limit int, queues ...*memberlist.TransmitLimitedQueue) [][]byte {
175+
var msgs [][]byte
176+
for _, q := range queues {
177+
if q == nil {
178+
continue
179+
}
180+
b := q.GetBroadcasts(overhead, limit)
181+
for _, m := range b {
182+
limit -= overhead + len(m)
183+
}
184+
msgs = append(msgs, b...)
185+
if limit <= 0 {
186+
break
187+
}
188+
}
189+
return msgs
190+
}

libnetwork/networkdb/cluster.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -461,21 +461,20 @@ func (nDB *NetworkDB) gossip() {
461461
continue
462462
}
463463

464-
broadcastQ := network.tableBroadcasts
465-
466-
if broadcastQ == nil {
464+
if network.tableBroadcasts == nil {
467465
log.G(context.TODO()).Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
468466
continue
469467
}
470468

471-
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
469+
msgs := getBroadasts(compoundOverhead, bytesAvail, network.tableBroadcasts, network.tableRebroadcasts)
472470
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
473471
network.qMessagesSent.Add(int64(len(msgs)))
474472
if printStats {
475473
msent := network.qMessagesSent.Swap(0)
476-
log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
474+
log.G(context.TODO()).Infof("NetworkDB stats %v(%v) - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d+%d netMsg/s:%d",
477475
nDB.config.Hostname, nDB.config.NodeID,
478-
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber.Load(), broadcastQ.NumQueued(),
476+
nid, network.leaving, network.tableBroadcasts.NumNodes(), network.entriesNumber.Load(),
477+
network.tableBroadcasts.NumQueued(), network.tableRebroadcasts.NumQueued(),
479478
msent/int64((nDB.config.StatsPrintPeriod/time.Second)))
480479
}
481480

libnetwork/networkdb/delegate.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -291,16 +291,16 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
291291
nDB.RUnlock()
292292

293293
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
294-
if !ok || n.leaving || n.tableBroadcasts == nil {
294+
if !ok || n.leaving || n.tableRebroadcasts == nil {
295295
return
296296
}
297297

298298
// if the queue is over the threshold, avoid distributing information coming from TCP sync
299-
if isBulkSync && n.tableBroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
299+
if isBulkSync && n.tableRebroadcasts.NumQueued() > maxQueueLenBroadcastOnSync {
300300
return
301301
}
302302

303-
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
303+
n.tableRebroadcasts.QueueBroadcast(&tableEventMessage{
304304
msg: buf,
305305
id: tEvent.NetworkID,
306306
tname: tEvent.TableName,
@@ -423,14 +423,7 @@ func (d *delegate) NotifyMsg(buf []byte) {
423423
}
424424

425425
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
426-
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
427-
for _, m := range msgs {
428-
limit -= overhead + len(m)
429-
}
430-
if limit > 0 {
431-
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
432-
}
433-
return msgs
426+
return getBroadasts(overhead, limit, d.nDB.networkBroadcasts, d.nDB.nodeBroadcasts)
434427
}
435428

436429
func (d *delegate) LocalState(join bool) []byte {

libnetwork/networkdb/networkdb.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ type network struct {
146146

147147
// The broadcast queue for table event gossip. This is only
148148
// initialized for this node's network attachment entries.
149-
tableBroadcasts *memberlist.TransmitLimitedQueue
149+
tableBroadcasts, tableRebroadcasts *memberlist.TransmitLimitedQueue
150150

151151
// Number of gossip messages sent related to this network during the last stats collection period
152152
qMessagesSent atomic.Int64
@@ -634,6 +634,10 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
634634
},
635635
RetransmitMult: 4,
636636
}
637+
nodeNetworks[nid].tableRebroadcasts = &memberlist.TransmitLimitedQueue{
638+
NumNodes: nodeNetworks[nid].tableBroadcasts.NumNodes,
639+
RetransmitMult: nodeNetworks[nid].tableBroadcasts.RetransmitMult,
640+
}
637641
nDB.addNetworkNode(nid, nDB.config.NodeID)
638642
networkNodes := nDB.networkNodes[nid]
639643
n = nodeNetworks[nid]

0 commit comments

Comments
 (0)
0