@@ -22,16 +22,52 @@ package agency
22
22
23
23
import (
24
24
"context"
25
+ "fmt"
25
26
"sync"
27
+ "time"
26
28
27
29
"github.com/arangodb/go-driver/agency"
28
30
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
31
+ "github.com/arangodb/kube-arangodb/pkg/util/errors"
29
32
)
30
33
34
+ type health map [string ]uint64
35
+
36
+ // IsHealthy returns true if all agencies have the same commit index.
37
+ // Returns false when:
38
+ // - agencies' list is empty.
39
+ // - agencies have different commit indices.
40
+ // - agencies have commit indices == 0.
41
+ func (h health ) IsHealthy () bool {
42
+ var globalCommitIndex uint64
43
+ first := true
44
+
45
+ for _ , commitIndex := range h {
46
+ if first {
47
+ globalCommitIndex = commitIndex
48
+ first = false
49
+ } else if commitIndex != globalCommitIndex {
50
+ return false
51
+ }
52
+ }
53
+
54
+ return globalCommitIndex != 0
55
+ }
56
+
57
+ // Health describes interface to check healthy of the environment.
58
+ type Health interface {
59
+ // IsHealthy return true when environment is considered as healthy.
60
+ IsHealthy () bool
61
+ }
62
+
31
63
type Cache interface {
32
- Reload (ctx context.Context , client agency.Agency ) (uint64 , error )
64
+ Reload (ctx context.Context , clients [] agency.Agency ) (uint64 , error )
33
65
Data () (State , bool )
34
66
CommitIndex () uint64
67
+ // GetLeaderID returns a leader ID.
68
+ GetLeaderID () string
69
+ // Health returns true when healthy object is available.
70
+ Health () (Health , bool )
35
71
}
36
72
37
73
func NewCache (mode * api.DeploymentMode ) Cache {
@@ -57,7 +93,17 @@ func (c cacheSingle) CommitIndex() uint64 {
57
93
return 0
58
94
}
59
95
60
- func (c cacheSingle ) Reload (ctx context.Context , client agency.Agency ) (uint64 , error ) {
96
+ // GetLeaderID returns always empty string for a single cache.
97
+ func (c cacheSingle ) GetLeaderID () string {
98
+ return ""
99
+ }
100
+
101
+ // Health returns always false for single cache.
102
+ func (c cacheSingle ) Health () (Health , bool ) {
103
+ return nil , false
104
+ }
105
+
106
+ func (c cacheSingle ) Reload (_ context.Context , _ []agency.Agency ) (uint64 , error ) {
61
107
return 0 , nil
62
108
}
63
109
@@ -66,48 +112,169 @@ func (c cacheSingle) Data() (State, bool) {
66
112
}
67
113
68
114
type cache struct {
69
- lock sync.Mutex
115
+ lock sync.RWMutex
70
116
71
117
valid bool
72
118
73
119
commitIndex uint64
74
120
75
121
data State
122
+
123
+ health Health
124
+
125
+ leaderID string
76
126
}
77
127
78
128
func (c * cache ) CommitIndex () uint64 {
129
+ c .lock .RLock ()
130
+ defer c .lock .RUnlock ()
131
+
79
132
return c .commitIndex
80
133
}
81
134
82
135
func (c * cache ) Data () (State , bool ) {
83
- c .lock .Lock ()
84
- defer c .lock .Unlock ()
136
+ c .lock .RLock ()
137
+ defer c .lock .RUnlock ()
85
138
86
139
return c .data , c .valid
87
140
}
88
141
89
- func (c * cache ) Reload (ctx context.Context , client agency.Agency ) (uint64 , error ) {
142
+ // GetLeaderID returns a leader ID or empty string if a leader is not known.
143
+ func (c * cache ) GetLeaderID () string {
144
+ c .lock .RLock ()
145
+ defer c .lock .RUnlock ()
146
+
147
+ return c .leaderID
148
+ }
149
+
150
+ // Health returns always false for single cache.
151
+ func (c * cache ) Health () (Health , bool ) {
152
+ c .lock .RLock ()
153
+ defer c .lock .RUnlock ()
154
+
155
+ if c .health != nil {
156
+ return c .health , true
157
+ }
158
+
159
+ return nil , false
160
+ }
161
+
162
+ func (c * cache ) Reload (ctx context.Context , clients []agency.Agency ) (uint64 , error ) {
90
163
c .lock .Lock ()
91
164
defer c .lock .Unlock ()
92
165
93
- cfg , err := getAgencyConfig (ctx , client )
166
+ leaderCli , leaderConfig , health , err := getLeader (ctx , clients )
94
167
if err != nil {
168
+ // Invalidate a leader ID and agency state.
169
+ // In the next iteration leaderID will be sat because `valid` will be false.
170
+ c .leaderID = ""
95
171
c .valid = false
172
+
96
173
return 0 , err
97
174
}
98
175
99
- if cfg .CommitIndex == c .commitIndex && c .valid {
176
+ c .health = health
177
+ if leaderConfig .CommitIndex == c .commitIndex && c .valid {
100
178
// We are on same index, nothing to do
101
- return cfg .CommitIndex , err
179
+ return leaderConfig .CommitIndex , nil
102
180
}
103
181
104
- if data , err := loadState (ctx , client ); err != nil {
182
+ // A leader should be known even if an agency state is invalid.
183
+ c .leaderID = leaderConfig .LeaderId
184
+ if data , err := loadState (ctx , leaderCli ); err != nil {
105
185
c .valid = false
106
- return cfg .CommitIndex , err
186
+ return leaderConfig .CommitIndex , err
107
187
} else {
108
188
c .data = data
109
189
c .valid = true
110
- c .commitIndex = cfg .CommitIndex
111
- return cfg .CommitIndex , nil
190
+ c .commitIndex = leaderConfig .CommitIndex
191
+ return leaderConfig .CommitIndex , nil
112
192
}
113
193
}
194
+
195
+ // getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
196
+ // If there is no quorum for the leader then error is returned.
197
+ func getLeader (ctx context.Context , clients []agency.Agency ) (agency.Agency , * agencyConfig , Health , error ) {
198
+ var mutex sync.Mutex
199
+ var anyError error
200
+ var wg sync.WaitGroup
201
+
202
+ cliLen := len (clients )
203
+ if cliLen == 0 {
204
+ return nil , nil , nil , errors .New ("empty list of agencies' clients" )
205
+ }
206
+ configs := make ([]* agencyConfig , cliLen )
207
+ leaders := make (map [string ]int )
208
+
209
+ h := make (health )
210
+ // Fetch all configs from agencies.
211
+ wg .Add (cliLen )
212
+ for i , cli := range clients {
213
+ go func (iLocal int , cliLocal agency.Agency ) {
214
+ defer wg .Done ()
215
+
216
+ ctxLocal , cancel := context .WithTimeout (ctx , time .Second )
217
+ defer cancel ()
218
+ config , err := getAgencyConfig (ctxLocal , cliLocal )
219
+
220
+ mutex .Lock ()
221
+ defer mutex .Unlock ()
222
+
223
+ if err != nil {
224
+ anyError = err
225
+ return
226
+ } else if config == nil || config .LeaderId == "" {
227
+ anyError = fmt .Errorf ("leader unknown for the agent %v" , cliLocal .Connection ().Endpoints ())
228
+ return
229
+ }
230
+
231
+ // Write config on the same index where client is (It will be helpful later).
232
+ configs [iLocal ] = config
233
+ // Count leaders.
234
+ leaders [config .LeaderId ]++
235
+ h [config .Configuration .ID ] = config .CommitIndex
236
+ }(i , cli )
237
+ }
238
+ wg .Wait ()
239
+
240
+ if anyError != nil {
241
+ return nil , nil , nil , wrapError (anyError , "not all agencies are responsive" )
242
+ }
243
+
244
+ if len (leaders ) == 0 {
245
+ return nil , nil , nil , wrapError (anyError , "failed to get config from agencies" )
246
<
F438
code class="diff-text syntax-highlighted-line addition">+ }
247
+
248
+ // Find the leader ID which has the most votes from all agencies.
249
+ maxVotes := 0
250
+ var leaderID string
251
+ for id , votes := range leaders {
252
+ if votes > maxVotes {
253
+ maxVotes = votes
254
+ leaderID = id
255
+ }
256
+ }
257
+
258
+ // Check if a leader has quorum from all possible agencies.
259
+ if maxVotes <= cliLen / 2 {
260
+ message := fmt .Sprintf ("no quorum for leader %s, votes %d of %d" , leaderID , maxVotes , cliLen )
261
+ return nil , nil , nil , wrapError (anyError , message )
262
+ }
263
+
264
+ // From here on, a leader with quorum is known.
265
+ for i , config := range configs {
266
+ if config != nil && config .Configuration .ID == leaderID {
267
+ return clients [i ], config , h , nil
268
+ }
269
+ }
270
+
271
+ return nil , nil , nil , wrapError (anyError , "the leader is not responsive" )
272
+ }
273
+
274
+ func wrapError (err error , message string ) error {
275
+ if err != nil {
276
+ return errors .WithMessage (err , message )
277
+ }
278
+
279
+ return errors .New (message )
280
+ }
0 commit comments