3
3
#define CLUSTERING_GENERIC_MULTI_THROTTLING_SERVER_HPP_
4
4
5
5
#include < algorithm>
6
+ #include < map>
6
7
7
8
#include " arch/timing.hpp"
9
+ #include " containers/priority_queue.hpp"
8
10
#include " clustering/generic/multi_throttling_metadata.hpp"
9
11
#include " clustering/generic/registrar.hpp"
10
12
#include " rpc/mailbox/typed.hpp"
@@ -23,7 +25,7 @@ class multi_throttling_server_t :
23
25
int capacity) :
24
26
mailbox_manager (mm),
25
27
user_data (ud),
26
- total_tickets (capacity), free_tickets(capacity),
28
+ goal_capacity (capacity), total_tickets(capacity), free_tickets(capacity),
27
29
reallocate_timer (reallocate_interval_ms, this ),
28
30
registrar (mailbox_manager, this )
29
31
{ }
@@ -35,6 +37,7 @@ class multi_throttling_server_t :
35
37
36
38
private:
37
39
static const int reallocate_interval_ms = 1000 ;
40
+ static const int fair_fraction_denom = 5 ;
38
41
39
42
class client_t :
40
43
public intrusive_list_node_t <client_t >,
@@ -66,11 +69,13 @@ class multi_throttling_server_t :
66
69
server_business_card_t (request_mailbox->get_address (),
67
70
relinquish_tickets_mailbox->get_address ()));
68
71
parent->clients .push_back (this );
72
+ parent->adjust_total_tickets ();
69
73
parent->recompute_allocations ();
70
74
}
71
75
72
76
~client_t () {
73
77
parent->clients .remove (this );
78
+ parent->adjust_total_tickets ();
74
79
parent->recompute_allocations ();
75
80
request_mailbox.reset ();
76
81
relinquish_tickets_mailbox.reset ();
@@ -182,11 +187,11 @@ class multi_throttling_server_t :
182
187
/* We divide the total number of tickets into two pools. The first pool
183
188
is distributed evenly among all the clients. The second pool is
184
189
distributed in proportion to the clients' QPS. */
185
- static const double fair_fraction = 0.1 ;
186
- int fair_tickets = static_cast < int >( total_tickets * fair_fraction );
190
+ int fair_tickets = std::max ( static_cast < int >(clients. size ()),
191
+ total_tickets / fair_fraction_denom );
187
192
int qps_tickets = total_tickets - fair_tickets;
188
193
int total_qps = 0 ;
189
- for (client_t *c = clients.head (); c; c = clients.next (c)) {
194
+ for (client_t *c = clients.head (); c != NULL ; c = clients.next (c)) {
190
195
total_qps += c->estimate_qps ();
191
196
}
192
197
if (clients.size () == 0 ) {
@@ -197,7 +202,7 @@ class multi_throttling_server_t :
197
202
tickets will be distributed, but that's OK. */
198
203
total_qps = 1 ;
199
204
}
200
- for (client_t *c = clients.head (); c; c = clients.next (c)) {
205
+ for (client_t *c = clients.head (); c != NULL ; c = clients.next (c)) {
201
206
/* This math isn't exact, but it's OK if the target tickets of all
202
207
the clients don't add up to `total_tickets`. */
203
208
c->set_target_tickets (fair_tickets / clients.size () +
@@ -206,69 +211,117 @@ class multi_throttling_server_t :
206
211
redistribute_tickets ();
207
212
}
208
213
214
+ void adjust_total_tickets () {
215
+ /* If new clients connect, we adapt the total_tickets number, rather than
216
+ just leaving it at goal_capacity.
217
+ This serves two purposes:
218
+ 1. It makes sure that when a new client connect, we always have some
219
+ free_ticket available to give to that client (note that clients here mean
220
+ cluster nodes, not application clients).
221
+ Otherwise new clients would have to wait until we send a relinquish_tickets
222
+ message to one of the existing clients, then wait until that other client
223
+ returns some of its tickets to us, which we could only then pass on to the
224
+ newly connected client. The result would be a delay until a new client
225
+ could actually process any query which we would like to avoid.
226
+ 2. If we have more clients than total_tickets/fair_fraction_denom, we would
227
+ end up assigning 0 tickets to some clients. Those clients could never
228
+ process any query. */
229
+
230
+ /* So fair_tickets in recompute_allocations() is at least 1 per client. */
231
+ int per_client_capacity = fair_fraction_denom;
232
+ int new_total_tickets = goal_capacity + clients.size () * per_client_capacity;
233
+ /* Note: This can temporarily make free_tickets negative */
234
+ int diff = new_total_tickets - total_tickets;
235
+ free_tickets += diff;
236
+ total_tickets = new_total_tickets;
237
+ }
238
+
209
239
void return_tickets (int tickets) {
210
240
free_tickets += tickets;
211
- guarantee (free_tickets <= total_tickets);
212
241
redistribute_tickets ();
213
242
}
214
243
215
244
void redistribute_tickets () {
216
- static const int chunk_size = 100 ;
217
- static const int min_reasonable_tickets = 10 ;
218
- client_t *neediest;
219
- int gift_size;
220
-
221
- /* First, look for a client with a critically low number of tickets.
222
- They get priority in tickets. This prevents starvation. */
223
- while (free_tickets > 0 ) {
224
- gift_size = -1 ;
225
- neediest = NULL ;
226
- for (client_t *c = clients.head (); c; c = clients.next (c)) {
227
- if (c->get_current_tickets () < min_reasonable_tickets && c->get_current_tickets () < c->get_target_tickets ()) {
228
- if (!neediest || c->get_current_tickets () < neediest->get_current_tickets ()) {
229
- neediest = c;
230
- gift_size = std::min (c->get_target_tickets () - c->get_current_tickets (), free_tickets);
231
- }
245
+ if (free_tickets <= 0 || clients.empty ()) {
246
+ return ;
247
+ }
248
+
249
+ const int min_chunk_size = ceil_divide (100 , static_cast <int >(clients.size ()));
250
+ const int min_reasonable_tickets = 10 ;
251
+
252
+ {
253
+ /* We cannot risk a client disconnecting while we are in here. That would
254
+ invalidate the pointers in tickets_to_give. */
255
+ ASSERT_NO_CORO_WAITING;
256
+ std::map<client_t *, int > tickets_to_give;
257
+
258
+ /* First, look for clients with a critically low number of tickets.
259
+ They get priority in tickets. This prevents starvation. */
260
+ std::vector<client_t *> critical_clients;
261
+ critical_clients.reserve (clients.size ());
262
+ for (client_t *c = clients.head (); c != NULL ; c = clients.next (c)) {
263
+ if (c->get_current_tickets () < min_reasonable_tickets
264
+ && c->get_current_tickets () < c->get_target_tickets ()) {
265
+ critical_clients.push_back (c);
232
266
}
233
267
}
234
-
235
- if (!neediest) {
236
- break ;
268
+ /* Distribute the available tickets among critical clients, up to a
269
+ gift size of `min_reasonable_tickets`. As a consequence of the
270
+ `ceil_divide()` in here we still set gift_size to 1 even if we don't
271
+ have enough free tickets to give at least 1 to every critical client.
272
+ That way we will at least give something to the first couple
273
+ of clients.*/
274
+ if (!critical_clients.empty ()) {
275
+ int gift_size_for_critical_clients = std::min (min_reasonable_tickets,
276
+ ceil_divide (free_tickets, critical_clients.size ()));
277
+ for (auto itr = critical_clients.begin (); itr != critical_clients.end (); ++itr) {
278
+ int tickets_client_actually_wants = std::max (0 ,
279
+ (*itr)->get_target_tickets () - (*itr)->get_current_tickets ());
280
+ int gift_size = std::min (free_tickets,
281
+ std::min (tickets_client_actually_wants, gift_size_for_critical_clients));
282
+ free_tickets -= gift_size;
283
+ tickets_to_give[*itr] += gift_size;
284
+ }
237
285
}
238
- guarantee (gift_size >= 0 );
239
- free_tickets -= gift_size;
240
- neediest->give_tickets (gift_size);
241
- }
242
286
243
- /* Next, look for clients with a large difference between their target
244
- number of tickets and their current number of tickets. But if the
245
- difference is less than `chunk_size`, don't send any tickets at all
246
- to avoid flooding the network with many small ticket updates. */
247
- while (free_tickets > chunk_size) {
248
- gift_size = -1 ;
249
- neediest = NULL ;
250
- for (client_t *c = clients.head (); c; c = clients.next (c)) {
251
- int need_size = c->get_target_tickets () - c->get_current_tickets ();
252
- if (need_size > chunk_size && (!neediest || need_size > neediest->get_target_tickets () - neediest->get_current_tickets ())) {
253
- neediest = c;
254
- gift_size = chunk_size;
287
+ /* Next, look for clients with a large difference between their target
288
+ number of tickets and their current number of tickets. But if the
289
+ difference is less than `min_chunk_size`, don't send any tickets at all
290
+ to avoid flooding the network with many small ticket updates. */
291
+ priority_queue_t <std::pair<int , client_t *> > needy_clients;
292
+ for (client_t *c = clients.head (); c != NULL ; c = clients.next (c)) {
293
+ int need_size = c->get_target_tickets ()
294
+ - c->get_current_tickets ()
295
+ - tickets_to_give[c];
296
+ if (need_size >= min_chunk_size) {
297
+ needy_clients.push (std::pair<int , client_t *>(need_size, c));
298
+ }
299
+ }
300
+ while (free_tickets >= min_chunk_size && !needy_clients.empty ()) {
301
+ std::pair<int , client_t *> neediest = needy_clients.pop ();
302
+ free_tickets -= min_chunk_size;
303
+ tickets_to_give[neediest.second ] += min_chunk_size;
304
+ neediest.first -= min_chunk_size;
305
+ if (neediest.first >= min_chunk_size) {
306
+ /* Re-insert the client so it gets more tickets later */
307
+ needy_clients.push (neediest);
255
308
}
256
309
}
257
310
258
- if (!neediest) {
259
- break ;
311
+ /* Now actually send the tickets to the clients */
312
+ for (auto itr = tickets_to_give.begin (); itr != tickets_to_give.end (); ++itr) {
313
+ if (itr->second > 0 ) {
314
+ itr->first ->give_tickets (itr->second );
315
+ }
260
316
}
261
- guarantee (gift_size >= 0 );
262
- free_tickets -= gift_size;
263
- neediest->give_tickets (gift_size);
264
317
}
265
318
}
266
319
267
320
mailbox_manager_t *const mailbox_manager;
268
321
user_data_type user_data;
269
322
270
323
intrusive_list_t <client_t > clients;
271
- int total_tickets, free_tickets;
324
+ int goal_capacity, total_tickets, free_tickets;
272
325
273
326
repeating_timer_t reallocate_timer;
274
327
0 commit comments