8000 Merge branch 'daniel_1820_fixes' into next · jchjava/rethinkdb@54bab7f · GitHub
[go: up one dir, main page]

Skip to content

Commit 54bab7f

Browse files
author
Daniel Mewes
committed
Merge branch 'daniel_1820_fixes' into next
2 parents 712de26 + 4323032 commit 54bab7f

File tree

17 files changed

+528
-346
lines changed

17 files changed

+528
-346
lines changed

src/buffer_cache/mirrored/config.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ struct mirrored_cache_config_t {
1515
max_size = 8 * MEGABYTE; // This should be overwritten
1616
// at a place where more information about the system and use of the cache is available.
1717
flush_timer_ms = DEFAULT_FLUSH_TIMER_MS;
18-
max_dirty_size = DEFAULT_UNSAVED_DATA_LIMIT;
18+
max_dirty_size = max_size / 2; // This should be overwritten
1919
flush_dirty_size = 0;
2020
max_concurrent_flushes = DEFAULT_MAX_CONCURRENT_FLUSHES;
2121
io_priority_reads = CACHE_READS_IO_PRIORITY;

src/clustering/generic/multi_throttling_client.cc

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ multi_throttling_client_t<request_type, inner_client_business_card_type>::ticket
2323
break;
2424
case state_acquired_ticket:
2525
parent->free_tickets++;
26+
parent->pump_free_tickets();
2627
break;
2728
case state_used_ticket:
2829
break;
@@ -39,6 +40,7 @@ multi_throttling_client_t<request_type, inner_client_business_card_type>::multi_
3940
signal_t *interruptor) :
4041
mailbox_manager(mm),
4142
free_tickets(0),
43+
to_relinquish(0),
4244
give_tickets_mailbox(mailbox_manager,
4345
boost::bind(&multi_throttling_client_t::on_give_tickets, this, _1)),
4446
reclaim_tickets_mailbox(mailbox_manager,
@@ -110,23 +112,45 @@ boost::optional<boost::optional<registrar_business_card_t<typename multi_throttl
110112

111113
template <class request_type, class inner_client_business_card_type>
112114
void multi_throttling_client_t<request_type, inner_client_business_card_type>::on_give_tickets(int count) {
113-
while (count > 0 && !ticket_queue.empty()) {
115+
free_tickets += count;
116+
pump_free_tickets();
117+
}
118+
119+
template <class request_type, class inner_client_business_card_type>
120+
void multi_throttling_client_t<request_type, inner_client_business_card_type>::pump_free_tickets() {
121+
// Hand out tickets to the waiter
122+
while (free_tickets > 0 && !ticket_queue.empty()) {
114123
ticket_acq_t *lucky_winner = ticket_queue.head();
115124
ticket_queue.remove(lucky_winner);
116125
lucky_winner->state = ticket_acq_t::state_acquired_ticket;
126+
free_tickets--;
117127
lucky_winner->pulse();
118-
count--;
119128
}
120-
free_tickets += count;
129+
// If we didn't need all tickets, see if we are still supposed to return some
130+
// of them to the server.
131+
try_to_relinquish_tickets();
121132
}
122133

123134
template <class request_type, class inner_client_business_card_type>
124135
void multi_throttling_client_t<request_type, inner_client_business_card_type>::on_reclaim_tickets(int count) {
125-
int to_relinquish = std::min(count, free_tickets);
126-
if (to_relinquish > 0) {
127-
free_tickets -= to_relinquish;
136+
/* We must try out best to relinquish as many tickets as the server asked us
137+
to. Otherwise the target tickets can drift increasingly far away
138+
from the actual tickets we have.
139+
To do this, we keep track of how many more tickets we are supposed to
140+
relinquish and then return them to the server as soon as we have something left
141+
to return. */
142+
to_relinquish += count;
143+
try_to_relinquish_tickets();
144+
}
145+
146+
template <class request_type, class inner_client_business_card_type>
147+
void multi_throttling_client_t<request_type, inner_client_business_card_type>::try_to_relinquish_tickets() {
148+
int can_relinquish = std::min(to_relinquish, free_tickets);
149+
if (can_relinquish > 0) {
150+
to_relinquish -= can_relinquish;
151+
free_tickets -= can_relinquish;
128152
coro_t::spawn_sometime(boost::bind(&multi_throttling_client_t<request_type, inner_client_business_card_type>::relinquish_tickets_blocking, this,
129-
to_relinquish,
153+
can_relinquish,
130154
auto_drainer_t::lock_t(&drainer)));
131155
}
132156
}

src/clustering/generic/multi_throttling_client.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,22 @@ class multi_throttling_client_t {
5353

5454
void on_give_tickets(int count);
5555

56+
void pump_free_tickets();
57+
5658
void on_reclaim_tickets(int count);
5759

60+
/* Relinquishes as many tickets as possible considering `free_tickets`, up to a
61+
maximum of `to_relinquish`. */
62+
void try_to_relinquish_tickets();
63+
5864
void relinquish_tickets_blocking(int count, auto_drainer_t::lock_t keepalive);
5965

6066
mailbox_manager_t *const mailbox_manager;
6167

6268
promise_t<server_business_card_t> intro_promise;
6369

6470
int free_tickets;
71+
int to_relinquish;
6572
intrusive_list_t<ticket_acq_t> ticket_queue;
6673

6774
auto_drainer_t drainer;

src/clustering/generic/multi_throttling_server.hpp

Lines changed: 100 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
#define CLUSTERING_GENERIC_MULTI_THROTTLING_SERVER_HPP_
44

55
#include <algorithm>
6+
#include <map>
67

78
#include "arch/timing.hpp"
9+
#include "containers/priority_queue.hpp"
810
#include "clustering/generic/multi_throttling_metadata.hpp"
911
#include "clustering/generic/registrar.hpp"
1012
#include "rpc/mailbox/typed.hpp"
@@ -23,7 +25,7 @@ class multi_throttling_server_t :
2325
int capacity) :
2426
mailbox_manager(mm),
2527
user_data(ud),
26-
total_tickets(capacity), free_tickets(capacity),
28+
goal_capacity(capacity), total_tickets(capacity), free_tickets(capacity),
2729
reallocate_timer(reallocate_interval_ms, this),
2830
registrar(mailbox_manager, this)
2931
{ }
@@ -35,6 +37,7 @@ class multi_throttling_server_t :
3537

3638
private:
3739
static const int reallocate_interval_ms = 1000;
40+
static const int fair_fraction_denom = 5;
3841

3942
class client_t :
4043
public intrusive_list_node_t<client_t>,
@@ -66,11 +69,13 @@ class multi_throttling_server_t :
6669
server_business_card_t(request_mailbox->get_address(),
6770
relinquish_tickets_mailbox->get_address()));
6871
parent->clients.push_back(this);
72+
parent->adjust_total_tickets();
6973
parent->recompute_allocations();
7074
}
7175

7276
~client_t() {
7377
parent->clients.remove(this);
78+
parent->adjust_total_tickets();
7479
parent->recompute_allocations();
7580
request_mailbox.reset();
7681
relinquish_tickets_mailbox.reset();
@@ -182,11 +187,11 @@ class multi_throttling_server_t :
182187
/* We divide the total number of tickets into two pools. The first pool
183188
is distributed evenly among all the clients. The second pool is
184189
distributed in proportion to the clients' QPS. */
185-
static const double fair_fraction = 0.1;
186-
int fair_tickets = static_cast<int>(total_tickets * fair_fraction);
190+
int fair_tickets = std::max(static_cast<int>(clients.size()),
191+
total_tickets / fair_fraction_denom);
187192
int qps_tickets = total_tickets - fair_tickets;
188193
int total_qps = 0;
189-
for (client_t *c = clients.head(); c; c = clients.next(c)) {
194+
for (client_t *c = clients.head(); c != NULL; c = clients.next(c)) {
190195
total_qps += c->estimate_qps();
191196
}
192197
if (clients.size() == 0) {
@@ -197,7 +202,7 @@ class multi_throttling_server_t :
197202
tickets will be distributed, but that's OK. */
198203
total_qps = 1;
199204
}
200-
for (client_t *c = clients.head(); c; c = clients.next(c)) {
205+
for (client_t *c = clients.head(); c != NULL; c = clients.next(c)) {
201206
/* This math isn't exact, but it's OK if the target tickets of all
202207
the clients don't add up to `total_tickets`. */
203208
c->set_target_tickets(fair_tickets / clients.size() +
@@ -206,69 +211,117 @@ class multi_throttling_server_t :
206211
redistribute_tickets();
207212
}
208213

214+
void adjust_total_tickets() {
215+
/* If new clients connect, we adapt the total_tickets number, rather than
216+
just leaving it at goal_capacity.
217+
This serves two purposes:
218+
1. It makes sure that when a new client connect, we always have some
219+
free_ticket available to give to that client (note that clients here mean
220+
cluster nodes, not application clients).
221+
Otherwise new clients would have to wait until we send a relinquish_tickets
222+
message to one of the existing clients, then wait until that other client
223+
returns some of its tickets to us, which we could only then pass on to the
224+
newly connected client. The result would be a delay until a new client
225+
could actually process any query which we would like to avoid.
226+
2. If we have more clients than total_tickets/fair_fraction_denom, we would
227+
end up assigning 0 tickets to some clients. Those clients could never
228+
process any query. */
229+
230+
/* So fair_tickets in recompute_allocations() is at least 1 per client. */
231+
int per_client_capacity = fair_fraction_denom;
232+
int new_total_tickets = goal_capacity + clients.size() * per_client_capacity;
233+
/* Note: This can temporarily make free_tickets negative */
234+
int diff = new_total_tickets - total_tickets;
235+
free_tickets += diff;
236+
total_tickets = new_total_tickets;
237+
}
238+
209239
void return_tickets(int tickets) {
210240
free_tickets += tickets;
211-
guarantee(free_tickets <= total_tickets);
212241
redistribute_tickets();
213242
}
214243

215244
void redistribute_tickets() {
216-
static const int chunk_size = 100;
217-
static const int min_reasonable_tickets = 10;
218-
client_t *neediest;
219-
int gift_size;
220-
221-
/* First, look for a client with a critically low number of tickets.
222-
They get priority in tickets. This prevents starvation. */
223-
while (free_tickets > 0) {
224-
gift_size = -1;
225-
neediest = NULL;
226-
for (client_t *c = clients.head(); c; c = clients.next(c)) {
227-
if (c->get_current_tickets() < min_reasonable_tickets && c->get_current_tickets() < c->get_target_tickets()) {
228-
if (!neediest || c->get_current_tickets() < neediest->get_current_tickets()) {
229-
neediest = c;
230-
gift_size = std::min(c->get_target_tickets() - c->get_current_tickets(), free_tickets);
231-
}
245+
if (free_tickets <= 0 || clients.empty()) {
246+
return;
247+
}
248+
249+
const int min_chunk_size = ceil_divide(100, static_cast<int>(clients.size()));
250+
const int min_reasonable_tickets = 10;
251+
252+
{
253+
/* We cannot risk a client disconnecting while we are in here. That would
254+
invalidate the pointers in tickets_to_give. */
255+
ASSERT_NO_CORO_WAITING;
256+
std::map<client_t *, int> tickets_to_give;
257+
258+
/* First, look for clients with a critically low number of tickets.
259+
They get priority in tickets. This prevents starvation. */
260+
std::vector<client_t *> critical_clients;
261+
critical_clients.reserve(clients.size());
262+
for (client_t *c = clients.head(); c != NULL; c = clients.next(c)) {
263+
if (c->get_current_tickets() < min_reasonable_tickets
264+
&& c->get_current_tickets() < c->get_target_tickets()) {
265+
critical_clients.push_back(c);
232266
}
233267
}
234-
235-
if (!neediest) {
236-
break;
268+
/* Distribute the available tickets among critical clients, up to a
269+
gift size of `min_reasonable_tickets`. As a consequence of the
270+
`ceil_divide()` in here we still set gift_size to 1 even if we don't
271+
have enough free tickets to give at least 1 to every critical client.
272+
That way we will at least give something to the first couple
273+
of clients.*/
274+
if (!critical_clients.empty()) {
275+
int gift_size_for_critical_clients = std::min(min_reasonable_tickets,
276+
ceil_divide(free_tickets, critical_clients.size()));
277+
for (auto itr = critical_clients.begin(); itr != critical_clients.end(); ++itr) {
278+
int tickets_client_actually_wants = std::max(0,
279+
(*itr)->get_target_tickets() - (*itr)->get_current_tickets());
280+
int gift_size = std::min(free_tickets,
281+
std::min(tickets_client_actually_wants, gift_size_for_critical_clients));
282+
free_tickets -= gift_size;
283+
tickets_to_give[*itr] += gift_size;
284+
}
237285
}
238-
guarantee(gift_size >= 0);
239-
free_tickets -= gift_size;
240-
neediest->give_tickets(gift_size);
241-
}
242286

243-
/* Next, look for clients with a large difference between their target
244-
number of tickets and their current number of tickets. But if the
245-
difference is less than `chunk_size`, don't send any tickets at all
246-
to avoid flooding the network with many small ticket updates. */
247-
while (free_tickets > chunk_size) {
248-
gift_size = -1;
249-
neediest = NULL;
250-
for (client_t *c = clients.head(); c; c = clients.next(c)) {
251-
int need_size = c->get_target_tickets() - c->get_current_tickets();
252-
if (need_size > chunk_size && (!neediest || need_size > neediest->get_target_tickets() - neediest->get_current_tickets())) {
253-
neediest = c;
254-
gift_size = chunk_size;
287+
/* Next, look for clients with a large difference between their target
288+
number of tickets and their current number of tickets. But if the
289+
difference is less than `min_chunk_size`, don't send any tickets at all
290+
to avoid flooding the network with many small ticket updates. */
291+
priority_queue_t<std::pair<int, client_t *> > needy_clients;
292+
for (client_t *c = clients.head(); c != NULL; c = clients.next(c)) {
293+
int need_size = c->get_target_tickets()
294+
- c->get_current_tickets()
295+
- tickets_to_give[c];
296+
if (need_size >= min_chunk_size) {
297+
needy_clients.push(std::pair<int, client_t *>(need_size, c));
298+
}
299+
}
300+
while (free_tickets >= min_chunk_size && !needy_clients.empty()) {
301+
std::pair<int, client_t *> neediest = needy_clients.pop();
302+
free_tickets -= min_chunk_size;
303+
tickets_to_give[neediest.second] += min_chunk_size;
304+
neediest.first -= min_chunk_size;
305+
if (neediest.first >= min_chunk_size) {
306+
/* Re-insert the client so it gets more tickets later */
307+
needy_clients.push(neediest);
255308
}
256309
}
257310

258-
if (!neediest) {
259-
break;
311+
/* Now actually send the tickets to the clients */
312+
for (auto itr = tickets_to_give.begin(); itr != tickets_to_give.end(); ++itr) {
313+
if (itr->second > 0) {
314+
itr->first->give_tickets(itr->second);
315+
}
260316
}
261-
guarantee(gift_size >= 0);
262-
free_tickets -= gift_size;
263-
neediest->give_tickets(gift_size);
264317
}
265318
}
266319

267320
mailbox_manager_t *const mailbox_manager;
268321
user_data_type user_data;
269322

270323
intrusive_list_t<client_t> clients;
271-
int total_tickets, free_tickets;
324+
int goal_capacity, total_tickets, free_tickets;
272325

273326
repeating_timer_t reallocate_timer;
274327

src/clustering/immediate_consistency/branch/broadcaster.cc

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
#include "rpc/semilattice/view/field.hpp"
1818
#include "rpc/semilattice/view/member.hpp"
1919

20-
template <class protocol_t>
21-
const int broadcaster_t<protocol_t>::MAX_OUTSTANDING_WRITES =
22-
listener_t<protocol_t>::MAX_OUTSTANDING_WRITES_FROM_BROADCASTER;
23-
2420
template <class protocol_t>
2521
broadcaster_t<protocol_t>::write_callback_t::write_callback_t() : write(NULL) { }
2622

@@ -44,7 +40,6 @@ broadcaster_t<protocol_t>::broadcaster_t(mailbox_manager_t *mm,
4440
mailbox_manager(mm),
4541
branch_id(generate_uuid()),
4642
branch_history_manager(bhm),
47-
enforce_max_outstanding_writes(MAX_OUTSTANDING_WRITES),
4843
registrar(mailbox_manager, this)
4944

5045
{
@@ -136,14 +131,12 @@ template <class protocol_t>
136131
class broadcaster_t<protocol_t>::incomplete_write_t : public home_thread_mixin_debug_only_t {
137132
public:
138133
incomplete_write_t(broadcaster_t *p, const typename protocol_t::write_t &w, transition_timestamp_t ts, write_callback_t *cb) :
139-
write(w), timestamp(ts), callback(cb), sem_acq(&p->enforce_max_outstanding_writes), parent(p), incomplete_count(0) { }
134+
write(w), timestamp(ts), callback(cb), parent(p), incomplete_count(0) { }
140135

141136
const typename protocol_t::write_t write;
142137
const transition_timestamp_t timestamp;
143138
write_callback_t *callback;
144139

145-
semaphore_assertion_t::acq_t sem_acq;
146-
147140
private:
148141
friend class incomplete_write_ref_t;
149142

@@ -565,7 +558,6 @@ void broadcaster_t<protocol_t>::end_write(boost::shared_ptr<incomplete_write_t>
565558
guarantee(newest_complete_timestamp == removed_write->timestamp.timestamp_before());
566559
newest_complete_timestamp = removed_write->timestamp.timestamp_after();
567560
}
568-
write->sem_acq.reset();
569561
if (write->callback) {
570562
guarantee(write->callback->write == write.get());
571563
write->callback->write = NULL;

src/clustering/immediate_consistency/branch/broadcaster.hpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,6 @@ class broadcaster_t : public home_thread_mixin_debug_only_t {
3939
class incomplete_write_t;
4040

4141
public:
42-
/* If the number of calls to `spawn_write()` minus the number of writes that
43-
have completed is equal to `MAX_OUTSTANDING_WRITES`, it's illegal to call
44-
`spawn_write()` again. */
45-
static const int MAX_OUTSTANDING_WRITES;
46-
4742
class write_callback_t {
4843
public:
4944
write_callback_t();
@@ -176,7 +171,6 @@ class broadcaster_t : public home_thread_mixin_debug_only_t {
176171
std::list<boost::shared_ptr<incomplete_write_t> > incomplete_writes;
177172
state_timestamp_t current_timestamp, newest_complete_timestamp;
178173
order_checkpoint_t order_checkpoint;
179-
semaphore_assertion_t enforce_max_outstanding_writes;
180174

181175
std::map<dispatchee_t *, auto_drainer_t::lock_t> dispatchees;
182176
intrusive_list_t<dispatchee_t> readable_dispatchees;

0 commit comments

Comments
 (0)
0