@@ -57,33 +57,65 @@ type ConnectionMode interface {
57
57
PublishEvents (trans outputs.Signaler , events []common.MapStr ) error
58
58
}
59
59
60
+ // singleConnectionMode sends all Output on one single connection. If connection is
61
+ // not available, the output plugin blocks until the connection is either available
62
+ // again or the connection mode is closed by Close.
60
63
type singleConnectionMode struct {
61
- conn ProtocolClient
62
- timeout time.Duration
63
- waitRetry time.Duration
64
- closed bool
64
+ conn ProtocolClient
65
+
66
+ closed bool // mode closed flag to break publisher loop
67
+
68
+ timeout time.Duration // connection timeout
69
+ waitRetry time.Duration // wait time until reconnect
65
70
}
66
71
67
- // Connect to at most one host by random and swap to another host (by random) if
68
- // active host becomes unavailable
72
+ // failOverConnectionMode connects to at most one host by random and swap to
73
+ // another host (by random) if currently active host becomes unavailable. If no
74
+ // connection is available, the mode blocks until a new connection can be established.
69
75
type failOverConnectionMode struct {
70
- conns []ProtocolClient
71
- timeout time.Duration
72
- active int
73
- waitRetry time.Duration
74
- closed bool
76
+ conns []ProtocolClient
77
+ active int // id of active connection
78
+
79
+ closed bool // mode closed flag to break publisher loop
80
+
81
+ timeout time.Duration // connection timeout
82
+ waitRetry time.Duration // wait time until trying a new connection
75
83
}
76
84
85
+ // loadBalancerMode balances the sending of events between multiple connections.
86
+ //
87
+ // The balancing algorithm is mostly pull-based, with multiple workers trying to pull
88
+ // some amount of work from a shared queue. Workers will try to get a new work item
89
+ // only if they have a working/active connection. Workers without active connection
90
+ // do not participate until a connection has been re-established.
91
+ // Due to the pull based nature the algorithm will load-balance events by random
92
+ // with workers having less latencies/turn-around times potentially getting more
93
+ // work items then other workers with higher latencies. Thusly the algorithm
94
+ // dynamically adapts to resource availability of server events are forwarded to.
95
+ //
96
+ // Workers not participating in the load-balancing will continuously try to reconnect
97
+ // to their configured endpoints. Once a new connection has been established,
98
+ // these workers will participate in in load-balancing again.
99
+ //
100
+ // If a connection becomes unavailable, the events are rescheduled for another
101
+ // connection to pick up. Rescheduling events is limited to a maximum number of
102
+ // send attempts. If events have not been send after maximum number of allowed
103
+ // attemps has been passed, they will be dropped.
104
+ //
105
+ // Distributing events to workers is subject to timeout. If no worker is available to
106
+ // pickup a message for sending, the message will be dropped internally.
77
107
type loadBalancerMode struct {
78
- timeout time.Duration
79
- waitRetry time.Duration
80
- maxAttempts int
81
- closed bool
82
-
83
- wg sync.WaitGroup
84
- work chan eventsMessage
85
- retries chan eventsMessage
86
- done chan struct {}
108
+ timeout time.Duration // send/retry timeout. Every timeout is a failed send attempt
109
+ waitRetry time.Duration // duration to wait during re-connection attempts
110
+
111
+ maxAttempts int // maximum number of configured send attempts
112
+
113
+ // waitGroup + signaling channel for handling shutdown
114
+ wg sync.WaitGroup
115
+ done chan struct {}
116
+
117
+ work chan eventsMessage // work channel with new events to be published
118
+ retries chan eventsMessage // work channel for fail send attempts being forwarded to other workers
87
119
}
88
120
89
121
type eventsMessage struct {
@@ -273,7 +305,6 @@ func newLoadBalancerMode(
273
305
timeout : timeout ,
274
306
waitRetry : waitRetry ,
275
307
maxAttempts : maxAttempts ,
276
- closed : false ,
277
308
278
309
work : make (chan eventsMessage ),
279
310
retries : make (chan eventsMessage ),
@@ -314,8 +345,8 @@ func (m *loadBalancerMode) start(clients []ProtocolClient) {
314
345
select {
315
346
case <- m .done :
316
347
return
317
- case msg = <- m .retries :
318
- case msg = <- m .work :
348
+ case msg = <- m .retries : // receive message from other failed worker
349
+ case msg = <- m .work : // receive message from publisher
319
350
}
320
351
m .onMessage (client , msg )
321
352
}
@@ -354,9 +385,10 @@ func (m *loadBalancerMode) onFail(msg eventsMessage) {
354
385
}
355
386
356
387
select {
357
- case m .retries <- msg :
388
+ case m .retries <- msg : // forward message to another worker
358
389
return
359
390
case <- time .After (m .timeout ):
391
+ // another failed send
360
392
}
361
393
}
362
394
}
@@ -380,6 +412,8 @@ func (m *loadBalancerMode) PublishEvents(
380
412
select {
381
413
case m .work <- msg :
382
414
case <- time .After (m .timeout ):
415
+ // failed send attempt if no worker is available to pick up message
416
+ // within configured time limit.
383
417
m .onFail (msg )
384
418
}
385
419
return nil
0 commit comments