10000 Adding tests for publisher package. · packplusplus/libbeat@0801177 · GitHub
[go: up one dir, main page]

Skip to content
This repository was archived by the owner on Sep 14, 2023. It is now read-only.

Commit 0801177

Browse files
committed
Adding tests for publisher package.
1 parent e5043b5 commit 0801177

13 files changed

+743
-16
lines changed

outputs/signal.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import (
55
)
66

77
// Signaler signals the completion of potentially asynchronous output operation.
8-
// Completed is send by output plugin when all events have been send. On failure or
9-
// only subset of data being published Failed will be send.
8+
// Completed is called by the output plugin when all events have been sent. On
9+
// failure or if only a subset of the data is published then Failed will be
10+
// invoked.
1011
type Signaler interface {
1112
Completed()
1213

publisher/async_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package publisher
2+
3+
import (
4+
"testing"
5+
6+
"github.com/elastic/libbeat/common"
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestAsyncPublishEvent(t *testing.T) {
11+
// Init
12+
testPub := newTestPublisherNoBulk(CompletedResponse)
13+
event := testEvent()
14+
15+
// D7AE Execute. Async PublishEvent always immediately returns true.
16+
assert.True(t, testPub.pub.asyncPublisher.client().PublishEvent(event))
17+
18+
// Validate
19+
msgs, err := testPub.outputMsgHandler.waitForMessages(1)
20+
if err != nil {
21+
t.Fatal(err)
22+
}
23+
assert.Equal(t, event, msgs[0].event)
24+
}
25+
26+
func TestAsyncPublishEvents(t *testing.T) {
27+
// Init
28+
testPub := newTestPublisherNoBulk(CompletedResponse)
29+
events := []common.MapStr{testEvent(), testEvent()}
30+
31+
// Execute. Async PublishEvent always immediately returns true.
32+
assert.True(t, testPub.pub.asyncPublisher.client().PublishEvents(events))
33+
34+
// Validate
35+
msgs, err := testPub.outputMsgHandler.waitForMessages(1)
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
assert.Equal(t, events[0], msgs[0].events[0])
40+
assert.Equal(t, events[1], msgs[0].events[1])
41+
}
42+
43+
func TestBulkAsyncPublishEvent(t *testing.T) {
44+
// Init
45+
testPub := newTestPublisherWithBulk(CompletedResponse)
46+
event := testEvent()
47+
48+
// Execute. Async PublishEvent always immediately returns true.
49+
assert.True(t, testPub.pub.asyncPublisher.client().PublishEvent(event))
50+
51+
// validate
52+
msgs, err := testPub.outputMsgHandler.waitForMessages(1)
53+
if err != nil {
54+
t.Fatal(err)
55+
}
56+
// Bulk outputer always sends bulk messages (even if only one event is
57+
// present)
58+
assert.Equal(t, event, msgs[0].events[0])
59+
}
60+
61+
func TestBulkAsyncPublishEvents(t *testing.T) {
62+
// init
63+
testPub := newTestPublisherWithBulk(CompletedResponse)
64+
events := []common.MapStr{testEvent(), testEvent()}
65+
66+
// Async PublishEvent always immediately returns true.
67+
assert.True(t, testPub.pub.asyncPublisher.client().PublishEvents(events))
68+
69+
msgs, err := testPub.outputMsgHandler.waitForMessages(1)
70+
if err != nil {
71+
t.Fatal(err)
72+
}
73+
assert.Equal(t, events[0], msgs[0].events[0])
74+
assert.Equal(t, events[1], msgs[0].events[1])
75+
}

publisher/bulk.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package publisher
22

33
import (
4-
"fmt"
54
"time"
65

76
"github.com/elastic/libbeat/common"
8-
"github.com/elastic/libbeat/logp"
97
"github.com/elastic/libbeat/outputs"
108
)
119

@@ -53,8 +51,6 @@ func (b *bulkWorker) run() {
5351
case <-b.ws.done:
5452
return
5553
case m := <-b.queue:
56-
fmt.Printf("received message: %v", m)
57-
5854
if m.event != nil { // single event
5955
b.onEvent(m.signal, m.event)
6056
} else { // batch of events
@@ -66,7 +62,6 @@ func (b *bulkWorker) run() {
6662
b.publish()
6763
}
6864
case <-b.flushTicker.C:
69-
logp.Debug("publish", "flush tick")
7065
b.publish()
7166
}
7267
}

publisher/bulk_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package publisher
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/elastic/libbeat/common"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
const (
12+
flushInterval time.Duration = 10 * time.Millisecond
13+
maxBatchSize = 10
14+
queueSize = 4 * maxBatchSize
15+
)
16+
17+
// Send a single event to the bulkWorker and verify that the event
18+
// is sent after the flush timeout occurs.
19+
func TestBulkWorkerSendSingle(t *testing.T) {
20+
mh := &testMessageHandler{
21+
response: CompletedResponse,
22+
msgs: make(chan message, queueSize),
23+
}
24+
ws := newWorkerSignal()
25+
defer ws.stop()
26+
bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize)
27+
28+
s := newTestSignaler()
29+
m := message{event: testEvent(), signal: s}
30+
bw.send(m)
31+
msgs, err := mh.waitForMessages(1)
32+
if err != nil {
33+
t.Fatal(err)
34+
}
35+
assert.True(t, s.wait())
36+
assert.Equal(t, m.event, msgs[0].events[0])
37+
}
38+
39+
// Send a batch of events to the bulkWorker and verify that a single
40+
// message is distributed (not triggered by flush timeout).
41+
func TestBulkWorkerSendBatch(t *testing.T) {
42+
// Setup
43+
mh := &testMessageHandler{
44+
response: CompletedResponse,
45+
msgs: make(chan message, queueSize),
46+
}
47+
ws := newWorkerSignal()
48+
defer ws.stop()
49+
bw := newBulkWorker(ws, queueSize, mh, time.Duration(time.Hour), maxBatchSize)
50+
51+
events := make([]common.MapStr, maxBatchSize)
52+
for i := range events {
53+
events[i] = testEvent()
54+
}
55+
s := newTestSignaler()
56+
m := message{events: events, signal: s}
57+
bw.send(m)
58+
59+
// Validate
60+
outMsgs, err := mh.waitForMessages(1)
61+
if err != nil {
62+
t.Fatal(err)
63+
}
64+
assert.True(t, s.wait())
65+
assert.Len(t, outMsgs[0].events, maxBatchSize)
66+
assert.Equal(t, m.events[0], outMsgs[0].events[0])
67+
}
68+
69+
// Send more events than the configured maximum batch size and then validate
70+
// that the events are split across two messages.
71+
func TestBulkWorkerSendBatchGreaterThanMaxBatchSize(t *testing.T) {
72+
// Setup
73+
mh := &testMessageHandler{
74+
response: CompletedResponse,
75+
msgs: make(chan message),
76+
}
77+
ws := newWorkerSignal()
78+
defer ws.stop()
79+
bw := newBulkWorker(ws, queueSize, mh, flushInterval, maxBatchSize)
80+
81+
// Send
82+
events := make([]common.MapStr, maxBatchSize+1)
83+
for i := range events {
84+
events[i] = testEvent()
85+
}
86+
s := newTestSignaler()
87+
m := message{events: events, signal: s}
88+
bw.send(m)
89+
90+
// Read first message and verify no Completed or Failed signal has
91+
// been received in the sent message.
92+
outMsgs, err := mh.waitForMessages(1)
93+
if err != nil {
94+
t.Fatal(err)
95+
}
96+
assert.False(t, s.isDone())
97+
assert.Len(t, outMsgs[0].events, maxBatchSize)
98+
assert.Equal(t, m.events[0:maxBatchSize], outMsgs[0].events[0:maxBatchSize])
99+
100+
// Read the next message and verify the sent message received the
101+
// Completed signal.
102+
outMsgs, err = mh.waitForMessages(1)
103+
if err != nil {
104+
t.Fatal(err)
105+
}
106+
assert.True(t, s.wait())
107+
assert.Len(t, outMsgs[0].events, 1)
108+
assert.Equal(t, m.events[maxBatchSize], outMsgs[0].events[0])
109+
}

publisher/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,14 @@ func (c *client) getClient(opts []ClientOption) eventPublisher {
5959
}
6060

6161
// PublishEvent will publish the event on the channel. Options will be ignored.
62+
// Always returns true.
6263
func (c ChanClient) PublishEvent(event common.MapStr, opts ...ClientOption) bool {
6364
c.Channel <- event
6465
return true
6566
}
6667

6768
// PublishEvents publishes all event on the configured channel. Options will be ignored.
69+
// Always returns true.
6870
func (c ChanClient) PublishEvents(events []common.MapStr, opts ...ClientOption) bool {
6971
for _, event := range events {
7072
c.Channel <- event

publisher/client_test.go

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,60 @@
11
package publisher
22

3-
import "github.com/elastic/libbeat/common"
3+
import (
4+
"reflect"
5+
"testing"
46

5-
type nilClient struct{}
7+
"github.com/elastic/libbeat/common"
8+
"github.com/stretchr/testify/assert"
9+
)
610

7-
// NilClient will ignore all events being published
8-
var NilClient Client = nilClient{}
11+
// Test that the correct client type is returned based on the given
12+
// ClientOptions.
13+
func TestGetClient(t *testing.T) {
14+
c := &client{
15+
publisher: &PublisherType{
16+
asyncPublisher: &asyncPublisher{},
17+
syncPublisher: &syncPublisher{},
18+
},
19+
}
20+
asyncClient := c.publisher.asyncPublisher.client()
21+
syncClient := c.publisher.syncPublisher.client()
922

10-
func (c nilClient) PublishEvent(event common.MapStr, opts ...ClientOption) bool {
11-
return true
23+
var testCases = []struct {
24+
in []ClientOption
25+
out eventPublisher
26+
}{
27+
// Add new client options here:
28+
{[]ClientOption{}, asyncClient},
29+
{[]ClientOption{Confirm}, syncClient},
30+
}
31+
32+
for _, test := range testCases {
33+
expected := reflect.ValueOf(test.out)
34+
actual := reflect.ValueOf(c.getClient(test.in))
35+
assert.Equal(t, expected.Pointer(), actual.Pointer())
36+
}
1237
}
1338

14-
func (c nilClient) PublishEvents(events []common.MapStr, opts ...ClientOption) bool {
15-
return true
39+
// Test that ChanClient writes an event to its Channel.
40+
func TestChanClientPublishEvent(t *testing.T) {
41+
cc := &ChanClient{
42+
Channel: make(chan common.MapStr, 1),
43+
}
44+
45+
e1 := testEvent()
46+
cc.PublishEvent(e1)
47+
assert.Equal(t, e1, <-cc.Channel)
48+
}
49+
50+
// Test that ChanClient write events to its Channel.
51+
func TestChanClientPublishEvents(t *testing.T) {
52+
cc := &ChanClient{
53+
Channel: make(chan common.MapStr, 2),
54+
}
55+
56+
e1, e2 := testEvent(), testEvent()
57+
cc.PublishEvents([]common.MapStr{e1, e2})
58+
assert.Equal(t, e1, <-cc.Channel)
59+
assert.Equal(t, e2, <-cc.Channel)
1660
}

0 commit comments

Comments
 (0)
0