10000 [Feature] Add agency leader discovery (#984) · sharekey/kube-arangodb@b4d44a9 · GitHub
[go: up one dir, main page]

Skip to content

Commit b4d44a9

Browse files
authored
[Feature] Add agency leader discovery (arangodb#984)
1 parent 635ed17 commit b4d44a9

File tree

8 files changed

+230
-56
lines changed

8 files changed

+230
-56
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- (Feature) Allow raw json value for license token-v2
99
- (Update) Replace `beta.kubernetes.io/arch` to `kubernetes.io/arch` in Operator Chart
1010
- (Feature) Add operator shutdown handler for graceful termination
11+
- (Feature) Add agency leader discovery
1112
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method
1213

1314
## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)

pkg/deployment/agency/cache.go

Lines changed: 180 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,52 @@ package agency
2222

2323
import (
2424
"context"
25+
"fmt"
2526
"sync"
27+
"time"
2628

2729
"github.com/arangodb/go-driver/agency"
2830
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
31+
"github.com/arangodb/kube-arangodb/pkg/util/errors"
2932
)
3033

34+
type health map[string]uint64
35+
36+
// IsHealthy returns true if all agencies have the same commit index.
37+
// Returns false when:
38+
// - agencies' list is empty.
39+
// - agencies have different commit indices.
40+
// - agencies have commit indices == 0.
41+
func (h health) IsHealthy() bool {
42+
var globalCommitIndex uint64
43+
first := true
44+
45+
for _, commitIndex := range h {
46+
if first {
47+
globalCommitIndex = commitIndex
48+
first = false
49+
} else if commitIndex != globalCommitIndex {
50+
return false
51+
}
52+
}
53+
54+
return globalCommitIndex != 0
55+
}
56+
57+
// Health describes interface to check healthy of the environment.
58+
type Health interface {
59+
// IsHealthy return true when environment is considered as healthy.
60+
IsHealthy() bool
61+
}
62+
3163
type Cache interface {
32-
Reload(ctx context.Context, client agency.Agency) (uint64, error)
64+
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
3365
Data() (State, bool)
3466
CommitIndex() uint64
67+
// GetLeaderID returns a leader ID.
68+
GetLeaderID() string
69+
// Health returns true when healthy object is available.
70+
Health() (Health, bool)
3571
}
3672

3773
func NewCache(mode *api.DeploymentMode) Cache {
@@ -57,7 +93,17 @@ func (c cacheSingle) CommitIndex() uint64 {
5793
return 0
5894
}
5995

60-
func (c cacheSingle) Reload(ctx context.Context, client agency.Agency) (uint64, error) {
96+
// GetLeaderID returns always empty string for a single cache.
97+
func (c cacheSingle) GetLeaderID() string {
98+
return ""
99+
}
100+
101+
// Health returns always false for single cache.
102+
func (c cacheSingle) Health() (Health, bool) {
103+
return nil, false
104+
}
105+
106+
func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) {
61107
return 0, nil
62108
}
63109

@@ -66,48 +112,169 @@ func (c cacheSingle) Data() (State, bool) {
66112
}
67113

68114
type cache struct {
69-
lock sync.Mutex
115+
lock sync.RWMutex
70116

71117
valid bool
72118

73119
commitIndex uint64
74120

75121
data State
122+
123+
health Health
124+
125+
leaderID string
76126
}
77127

78128
func (c *cache) CommitIndex() uint64 {
129+
c.lock.RLock()
130+
defer c.lock.RUnlock()
131+
79132
return c.commitIndex
80133
}
81134

82135
func (c *cache) Data() (State, bool) {
83-
c.lock.Lock()
84-
defer c.lock.Unlock()
136+
c.lock.RLock()
137+
defer c.lock.RUnlock()
85138

86139
return c.data, c.valid
87140
}
88141

89-
func (c *cache) Reload(ctx context.Context, client agency.Agency) (uint64, error) {
142+
// GetLeaderID returns a leader ID or empty string if a leader is not known.
143+
func (c *cache) GetLeaderID() string {
144+
c.lock.RLock()
145+
defer c.lock.RUnlock()
146+
147+
return c.leaderID
148+
}
149+
150+
// Health returns always false for single cache.
151+
func (c *cache) Health() (Health, bool) {
152+
c.lock.RLock()
153+
defer c.lock.RUnlock()
154+
155+
if c.health != nil {
156+
return c.health, true
157+
}
158+
159+
return nil, false
160+
}
161+
162+
func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) {
90163
c.lock.Lock()
91164
defer c.lock.Unlock()
92165

93-
cfg, err := getAgencyConfig(ctx, client)
166+
leaderCli, leaderConfig, health, err := getLeader(ctx, clients)
94167
if err != nil {
168+
// Invalidate a leader ID and agency state.
169+
// In the next iteration leaderID will be sat because `valid` will be false.
170+
c.leaderID = ""
95171
c.valid = false
172+
96173
return 0, err
97174
}
98175

99-
if cfg.CommitIndex == c.commitIndex && c.valid {
176+
c.health = health
177+
if leaderConfig.CommitIndex == c.commitIndex && c.valid {
100178
// We are on same index, nothing to do
101-
return cfg.CommitIndex, err
179+
return leaderConfig.CommitIndex, nil
102180
}
103181

104-
if data, err := loadState(ctx, client); err != nil {
182+
// A leader should be known even if an agency state is invalid.
183+
c.leaderID = leaderConfig.LeaderId
184+
if data, err := loadState(ctx, leaderCli); err != nil {
105185
c.valid = false
106-
return cfg.CommitIndex, err
186+
return leaderConfig.CommitIndex, err
107187
} else {
108188
c.data = data
109189
c.valid = true
110-
c.commitIndex = cfg.CommitIndex
111-
return cfg.CommitIndex, nil
190+
c.commitIndex = leaderConfig.CommitIndex
191+
return leaderConfig.CommitIndex, nil
112192
}
113193
}
194+
195+
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
196+
// If there is no quorum for the leader then error is returned.
197+
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) {
198+
var mutex sync.Mutex
199+
var anyError error
200+
var wg sync.WaitGroup
201+
202+
cliLen := len(clients)
203+
if cliLen == 0 {
204+
return nil, nil, nil, errors.New("empty list of agencies' clients")
205+
}
206+
configs := make([]*agencyConfig, cliLen)
207+
leaders := make(map[string]int)
208+
209+
h := make(health)
210+
// Fetch all configs from agencies.
211+
wg.Add(cliLen)
212+
for i, cli := range clients {
213+
go func(iLocal int, cliLocal agency.Agency) {
214+
defer wg.Done()
215+
216+
ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
217+
defer cancel()
218+
config, err := getAgencyConfig(ctxLocal, cliLocal)
219+
220+
mutex.Lock()
221+
defer mutex.Unlock()
222+
223+
if err != nil {
224+
anyError = err
225+
return
226+
} else if config == nil || config.LeaderId == "" {
227+
anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints())
228+
return
229+
}
230+
231+
// Write config on the same index where client is (It will be helpful later).
232+
configs[iLocal] = config
233+
// Count leaders.
234+
leaders[config.LeaderId]++
235+
h[config.Configuration.ID] = config.CommitIndex
236+
}(i, cli)
237+
}
238+
wg.Wait()
239+
240+
if anyError != nil {
241+
return nil, nil, nil, wrapError(anyError, "not all agencies are responsive")
242+
}
243+
244+
if len(leaders) == 0 {
245+
return nil, nil, nil, wrapError(anyError, "failed to get config from agencies")
246< F438 code class="diff-text syntax-highlighted-line addition">+
}
247+
248+
// Find the leader ID which has the most votes from all agencies.
249+
maxVotes := 0
250+
var leaderID string
251+
for id, votes := range leaders {
252+
if votes > maxVotes {
253+
maxVotes = votes
254+
leaderID = id
255+
}
256+
}
257+
258+
// Check if a leader has quorum from all possible agencies.
259+
if maxVotes <= cliLen/2 {
260+
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)
261+
return nil, nil, nil, wrapError(anyError, message)
262+
}
263+
264+
// From here on, a leader with quorum is known.
265+
for i, config := range configs {
266+
if config != nil && config.Configuration.ID == leaderID {
267+
return clients[i], config, h, nil
268+
}
269+
}
270+
271+
return nil, nil, nil, wrapError(anyError, "the leader is not responsive")
272+
}
273+
274+
func wrapError(err error, message string) error {
275+
if err != nil {
276+
return errors.WithMessage(err, message)
277+
}
278+
279+
return errors.New(message)
280+
}

pkg/deployment/client/client_cache.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
3232
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
3333
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
34+
"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
3435
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
3536
"github.com/arangodb/kube-arangodb/pkg/util/errors"
3637
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
@@ -46,7 +47,7 @@ type Cache interface {
4647
Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
4748
GetDatabase(ctx context.Context) (driver.Client, error)
4849
GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error)
49-
GetAgency(ctx context.Context) (agency.Agency, error)
50+
GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error)
5051
}
5152

5253
type CacheGen interface {
@@ -167,13 +168,19 @@ func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWra
167168
}
168169

169170
// GetAgency returns a cached client for the agency
170-
func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) {
171+
func (cc *cache) GetAgency(_ context.Context, agencyIDs ...string) (agency.Agency, error) {
171172
cc.mutex.Lock()
172173
defer cc.mutex.Unlock()
173174

174175
// Not found, create a new client
175176
var dnsNames []string
176177
for _, m := range cc.in.GetStatusSnapshot().Members.Agents {
178+
if len(agencyIDs) > 0 {
179+
if !utils.StringList(agencyIDs).Has(m.ID) {
180+
continue
181+
}
182+
}
183+
177184
endpoint, err := cc.in.GenerateMemberEndpoint(api.ServerGroupAgents, m)
178185
if err != nil {
179186
return nil, err

pkg/deployment/context_impl.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ import (
3030

3131
"github.com/arangodb/kube-arangodb/pkg/util/globals"
3232

33-
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
3433
"k8s.io/apimachinery/pkg/types"
3534

35+
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
36+
3637
"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"
3738

3839
"github.com/arangodb/kube-arangodb/pkg/util/errors"
@@ -51,6 +52,10 @@ import (
5152

5253
apiErrors "k8s.io/apimachinery/pkg/api/errors"
5354

55+
"github.com/rs/zerolog/log"
56+
core "k8s.io/api/core/v1"
57+
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
58+
5459
"github.com/arangodb/arangosync-client/client"
5560
"github.com/arangodb/arangosync-client/tasks"
5661
driver "github.com/arangodb/go-driver"
@@ -70,9 +75,6 @@ import (
7075
serviceaccountv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount/v1"
7176
servicemonitorv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor/v1"
7277
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
73-
"github.com/rs/zerolog/log"
74-
core "k8s.io/api/core/v1"
75-
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
7678
)
7779

7880
var _ resources.Context = &Deployment{}
@@ -234,9 +236,9 @@ func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicat
234236
return result, nil
235237
}
236238

237-
// GetAgency returns a connection to the entire agency.
238-
func (d *Deployment) GetAgency(ctx context.Context) (agency.Agency, error) {
239-
return d.clientCache.GetAgency(ctx)
239+
// GetAgency returns a connection to the agency.
240+
func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) {
241+
return d.clientCache.GetAgency(ctx, agencyIDs...)
240242
}
241243

242244
func (d *Deployment) getConnConfig() (http.ConnectionConfig, error) {

0 commit comments

Comments
 (0)
0