8000 add some doc to load balancer mode · packplusplus/libbeat@aa35a8d · GitHub
[go: up one dir, main page]

Skip to content
This repository was archived by the owner on Sep 14, 2023. It is now read-only.

Commit aa35a8d

Browse files
author
urso
committed
add some doc to load balancer mode
1 parent e82858d commit aa35a8d

File tree

1 file changed

+58
-24
lines changed

1 file changed

+58
-24
lines changed

outputs/lumberjack/modes.go

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,33 +57,65 @@ type ConnectionMode interface {
5757
PublishEvents(trans outputs.Signaler, events []common.MapStr) error
5858
}
5959

60+
// singleConnectionMode sends all Output on one single connection. If connection is
61+
// not available, the output plugin blocks until the connection is either available
62+
// again or the connection mode is closed by Close.
6063
type singleConnectionMode struct {
61-
conn ProtocolClient
62-
timeout time.Duration
63-
waitRetry time.Duration
64-
closed bool
64+
conn ProtocolClient
65+
66+
closed bool // mode closed flag to break publisher loop
67+
68+
timeout time.Duration // connection timeout
69+
waitRetry time.Duration // wait time until reconnect
6570
}
6671

67-
// Connect to at most one host by random and swap to another host (by random) if
68-
// active host becomes unavailable
72+
// failOverConnectionMode connects to at most one host by random and swap to
73+
// another host (by random) if currently active host becomes unavailable. If no
74+
// connection is available, the mode blocks until a new connection can be established.
6975
type failOverConnectionMode struct {
70-
conns []ProtocolClient
71-
timeout time.Duration
72-
active int
73-
waitRetry time.Duration
74-
closed bool
76+
conns []ProtocolClient
77+
active int // id of active connection
78+
79+
closed bool // mode closed flag to break publisher loop
80+
81+
timeout time.Duration // connection timeout
82+
waitRetry time.Duration // wait time until trying a new connection
7583
}
7684

85+
// loadBalancerMode balances the sending of events between multiple connections.
86+
//
87+
// The balancing algorithm is mostly pull-based, with multiple workers trying to pull
88+
// some amount of work from a shared queue. Workers will try to get a new work item
89+
// only if they have a working/active connection. Workers without active connection
90+
// do not participate until a connection has been re-established.
91+
// Due to the pull based nature the algorithm will load-balance events by random
92+
// with workers having less latencies/turn-around times potentially getting more
93+
// work items then other workers with higher latencies. Thusly the algorithm
94+
// dynamically adapts to resource availability of server events are forwarded to.
95+
//
96+
// Workers not participating in the load-balancing will continuously try to reconnect
97+
// to their configured endpoints. Once a new connection has been established,
98+
// these workers will participate in in load-balancing again.
99+
//
100+
// If a connection becomes unavailable, the events are rescheduled for another
101+
// connection to pick up. Rescheduling events is limited to a maximum number of
102+
// send attempts. If events have not been send after maximum number of allowed
103+
// attemps has been passed, they will be dropped.
104+
//
105+
// Distributing events to workers is subject to timeout. If no worker is available to
106+
// pickup a message for sending, the message will be dropped internally.
77107
type loadBalancerMode struct {
78-
timeout time.Duration
79-
waitRetry time.Duration
80-
maxAttempts int
81-
closed bool
82-
83-
wg sync.WaitGroup
84-
work chan eventsMessage
85-
retries chan eventsMessage
86-
done chan struct{}
108+
timeout time.Duration // send/retry timeout. Every timeout is a failed send attempt
109+
waitRetry time.Duration // duration to wait during re-connection attempts
110+
111+
maxAttempts int // maximum number of configured send attempts
112+
113+
// waitGroup + signaling channel for handling shutdown
114+
wg sync.WaitGroup
115+
done chan struct{}
116+
117+
work chan eventsMessage // work channel with new events to be published
118+
retries chan eventsMessage // work channel for fail send attempts being forwarded to other workers
87119
}
88120

89121
type eventsMessage struct {
@@ -273,7 +305,6 @@ func newLoadBalancerMode(
273305
timeout: timeout,
274306
waitRetry: waitRetry,
275307
maxAttempts: maxAttempts,
276-
closed: false,
277308

278309
work: make(chan eventsMessage),
279310
retries: make(chan eventsMessage),
@@ -314,8 +345,8 @@ func (m *loadBalancerMode) start(clients []ProtocolClient) {
314345
select {
315346
case <-m.done:
316347
return
317-
case msg = <-m.retries:
318-
case msg = <-m.work:
348+
case msg = <-m.retries: // receive message from other failed worker
349+
case msg = <-m.work: // receive message from publisher
319350
}
320351
m.onMessage(client, msg)
321352
}
@@ -354,9 +385,10 @@ func (m *loadBalancerMode) onFail(msg eventsMessage) {
354385
}
355386

356387
select {
357-
case m.retries <- msg:
388+
case m.retries <- msg: // forward message to another worker
358389
return
359390
case <-time.After(m.timeout):
391+
// another failed send
360392
}
361393
}
362394
}
@@ -380,6 +412,8 @@ func (m *loadBalancerMode) PublishEvents(
380412
select {
381413
case m.work <- msg:
382414
case <-time.After(m.timeout):
415+
// failed send attempt if no worker is available to pick up message
416+
// within configured time limit.
383417
m.onFail(msg)
384418
}
385419
return nil

0 commit comments

Comments
 (0)
0