8000 Bug fix 3.3/agency update endpoints (#6663) · mnemosdev/arangodb@fd800f1 · GitHub
[go: up one dir, main page]

Skip to content

Commit fd800f1

Browse files
kvahedneunhoef
authored andcommitted
Bug fix 3.3/agency update endpoints (arangodb#6663)
* server building * tests
1 parent 2b59eca commit fd800f1

27 files changed

+771
-267
lines changed

CHANGELOG

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ v3.3.17 (XXXX-XX-XX)
1414

1515
* fix some TLS errors that occurred when combining HTTPS/TLS transport with the
1616
VelocyStream protocol (VST)
17-
17+
1818
That combination could have led to spurious errors such as "TLS padding error"
1919
or "Tag mismatch" and connections being closed
2020

21+
* agency endpoint updates now go through RAFT
22+
2123

2224
v3.3.16 (2018-09-19)
2325
--------------------

arangod/Agency/AgencyComm.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ AgencyCommManager::createNewConnection() {
732732
std::string const& spec = _endpoints.front();
733733
std::unique_ptr<Endpoint> endpoint(Endpoint::clientFactory(spec));
734734
if (endpoint.get() == nullptr) {
735-
LOG_TOPIC(ERR, arangodb::Logger::FIXME) << "invalid value for "
735+
LOG_TOPIC(ERR, arangodb::Logger::AGENCYCOMM) << "invalid value for "
736736
<< "--server.endpoint ('" << spec << "')";
737737
THROW_ARANGO_EXCEPTION(TRI_ERROR_BAD_PARAMETER);
738738
}

arangod/Agency/AgencyCommon.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ extern std::string const NO_LEADER;
4242

4343
enum role_t { FOLLOWER, CANDIDATE, LEADER };
4444

45+
enum apply_ret_t {APPLIED, PRECONDITION_FAILED, FORBIDDEN, UNKNOWN_ERROR};
46+
4547
typedef std::chrono::duration<long, std::ratio<1, 1000>> duration_t;
4648
typedef uint64_t index_t;
4749
typedef uint64_t term_t;
@@ -63,11 +65,11 @@ struct read_ret_t {
6365
struct write_ret_t {
6466
bool accepted; // Query accepted (i.e. we are leader)
6567
std::string redirect; // If not accepted redirect id
66-
std::vector<bool> applied;
68+
std::vector<apply_ret_t> applied;
6769
std::vector<index_t> indices; // Indices of log entries (if any) to wait for
6870
write_ret_t() : accepted(false), redirect("") {}
6971
write_ret_t(bool a, std::string const& id) : accepted(a), redirect(id) {}
70-
write_ret_t(bool a, std::string const& id, std::vector<bool> const& app,
72+
write_ret_t(bool a, std::string const& id, std::vector<apply_ret_t> const& app,
7173
std::vector<index_t> const& idx)
7274
: accepted(a), redirect(id), applied(app), indices(idx) {}
7375
};

arangod/Agency/AgencyStrings.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
////////////////////////////////////////////////////////////////////////////////
2+
/// DISCLAIMER
3+
///
4+
/// Copyright 2014-2018 ArangoDB GmbH, Cologne, Germany
5+
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
6+
///
7+
/// Licensed under the Apache License, Version 2.0 (the "License");
8+
/// you may not use this file except in compliance with the License.
9+
/// You may obtain a copy of the License at
10+
///
11+
/// http://www.apache.org/licenses/LICENSE-2.0
12+
///
13+
/// Unless required by applicable law or agreed to in writing, software
14+
/// distributed under the License is distributed on an "AS IS" BASIS,
15+
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
/// See the License for the specific language governing permissions and
17+
/// limitations under the License.
18+
///
19+
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
20+
///
21+
/// @author Kaveh Vahedipour
22+
////////////////////////////////////////////////////////////////////////////////
23+
24+
#include <string>
25+
26+
namespace arangodb {
27+
namespace consensus {
28+
29+
constexpr char const* DATABASES = "Databases";
30+
constexpr char const* COLLECTIONS = "Collections";
31+
constexpr char const* RECONFIGURE = ".agency";
32+
constexpr char const* VERSION = "Version";
33+
34+
constexpr char const* CURRENT = "Current";
35+
constexpr char const* CURRENT_VERSION = "Current/Version";
36+
constexpr char const* CURRENT_COLLECTIONS = "Current/Collections/";
37+
constexpr char const* CURRENT_DATABASES = "Current/Databases/";
38+
39+
constexpr char const* PLAN = "Plan";
40+
constexpr char const* PLAN_VERSION = "Plan/Version";
41+
constexpr char const* PLAN_COLLECTIONS = "Plan/Collections/";
42+
constexpr char const* PLAN_DATABASES = "Plan/Databases/";
43+
44+
}}

arangod/Agency/Agent.cpp

Lines changed: 140 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,9 +1120,9 @@ write_ret_t Agent::inquire(query_t const& query) {
11201120

11211121

11221122
/// Write new entries to replicated state and store
1123-
write_ret_t Agent::write(query_t const& query, bool discardStartup) {
1123+
write_ret_t Agent::write(query_t const& query, WriteMode const& wmode) {
11241124

1125-
std::vector<bool> applied;
1125+
std::vector<apply_ret_t> applied;
11261126
std::vector<index_t> indices;
11271127
auto multihost = size()>1;
11281128

@@ -1135,7 +1135,7 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
11351135
return write_ret_t(false, leader);
11361136
}
11371137

1138-
if (!discardStartup) {
1138+
if (!wmode.discardStartup()) {
11391139
CONDITION_LOCKER(guard, _waitForCV);
11401140
while (getPrepareLeadership() != 0) {
11411141
_waitForCV.wait(100);
@@ -1179,7 +1179,7 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
11791179
_tiLock.assertNotLockedByCurrentThread();
11801180
MUTEX_LOCKER(ioLocker, _ioLock);
11811181

1182-
applied = _spearhead.applyTransactions(chunk);
1182+
applied = _spearhead.applyTransactions(chunk, wmode);
11831183
auto tmp = _state.logLeaderMulti(chunk, applied, term());
11841184
indices.insert(indices.end(), tmp.begin(), tmp.end());
11851185

@@ -1340,19 +1340,23 @@ void Agent::persistConfiguration(term_t t) {
13401340
{ VPackArrayBuilder trxs(agency.get());
13411341
{ VPackArrayBuilder trx(agency.get());
13421342
{ VPackObjectBuilder oper(agency.get());
1343-
agency->add(VPackValue(".agency"));
1343+
agency->add(VPackValue(RECONFIGURE));
13441344
{ VPackObjectBuilder a(agency.get());
1345-
agency->add("term", VPackValue(t));
1346-
agency->add("id", VPackValue(id()));
1347-
agency->add("active", _config.activeToBuilder()->slice());
1348-
agency->add("pool", _config.poolToBuilder()->slice());
1349-
agency->add("size", VPackValue(size()));
1350-
agency->add("timeoutMult", VPackValue(_config.timeoutMult()));
1351-
}}}}
1345+
agency->add("op", VPackValue("set"));
1346+
agency->add(VPackValue("new"));
1347+
{ VPackObjectBuilder aa(agency.get());
1348+
agency->add("term", VPackValue(t));
1349+
agency->add(idStr, VPackValue(id()));
1350+
agency->add(activeStr, _config.activeToBuilder()->slice());
1351+
agency->add(poolStr, _config.poolToBuilder()->slice());
1352+
agency->add("size", VPackValue(size()));
1353+
agency->add(timeoutMultStr, VPackValue(_config.timeoutMult()));
1354+
}}}}}
13521355

13531356
// In case we've lost leadership, no harm will arise as the failed write
13541357
// prevents bogus agency configuration to be replicated among agents. ***
1355-
write(agency, true);
1358+
write(agency, WriteMode(true,true));
1359+
13561360
}
13571361

13581362

@@ -1519,6 +1523,11 @@ void Agent::updatePeerEndpoint(query_t const& message) {
15191523

15201524
}
15211525

1526+
1527+
bool Agent::addGossipPeer(std::string const& endpoint) {
1528+
return _config.addGossipPeer(endpoint);
1529+
}
1530+
15221531
void Agent::updatePeerEndpoint(std::string const& id, std::string const& ep) {
15231532
if (_config.updateEndpoint(id, ep)) {
15241533
if (!challengeLeadership()) {
@@ -1715,8 +1724,7 @@ bool Agent::booting() { return (!_config.poolComplete()); }
17151724
/// If I know more immediately contact peer with my list.
17161725
query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
17171726

1718-
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Incoming gossip: "
1719-
<< in->slice().toJson();
1727+
LOG_TOPIC(DEBUG, Logger::AGENCY) << "Incoming gossip: " << in->slice().toJson();
17201728

17211729
VPackSlice slice = in->slice();
17221730
if (!slice.isObject()) {
@@ -1726,12 +1734,37 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
17261734
slice.typeName());
17271735
}
17281736

1737+
if (slice.hasKey(StaticStrings::Error)) {
1738+
if (slice.get(StaticStrings::Code).getNumber<int>() == 403) {
1739+
LOG_TOPIC(FATAL, Logger::AGENCY)
1740+
<< "Gossip peer does not have us in their pool " << slice.toJson();
1741+
FATAL_ERROR_EXIT(); /// We don't belong here
1742+
} else {
1743+
LOG_TOPIC(DEBUG, Logger::AGENCY)
1744+
<< "Received gossip error. We'll retry " << slice.toJson();
1745+
}
1746+
query_t out = std::make_shared<Builder>();
1747+
return out;
1748+
}
1749+
17291750
if (!slice.hasKey("id") || !slice.get("id").isString()) {
17301751
THROW_ARANGO_EXCEPTION_MESSAGE(
17311752
20002, "Gossip message must contain string parameter 'id'");
17321753
}
17331754
std::string id = slice.get("id").copyString();
17341755

1756+
// If pool is complete and id not in our pool reject under all circumstances
1757+
if (_config.poolComplete() && !_config.findInPool(id)) {
1758+
query_t ret = std::make_shared<VPackBuilder>();
1759+
{ VPackObjectBuilder o(ret.get());
1760+
ret->add(StaticStrings::Code, VPackValue(403));
1761+
ret->add(StaticStrings::Error, VPackValue(true));
1762+
ret->add(StaticStrings::ErrorMessage,
1763+
VPackValue("This agents is not member of this pool"));
1764+
ret->add(StaticStrings::ErrorNum, VPackValue(403)); }
1765+
return ret;
1766+
}
1767+
17351768
if (!slice.hasKey("endpoint") || !slice.get("endpoint").isString()) {
17361769
THROW_ARANGO_EXCEPTION_MESSAGE(
17371770
20003, "Gossip message must contain string parameter 'endpoint'");
@@ -1742,15 +1775,6 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
17421775
_inception->reportVersionForEp(endpoint, version);
17431776
}
17441777

1745-
// If pool complete but knabe is not member => reject at all times
1746-
if (_config.poolComplete()) {
1747-
auto pool = _config.pool();
1748-
if (pool.find(id) == pool.end()) {
1749-
THROW_ARANGO_EXCEPTION_MESSAGE(
1750-
20003, "Gossip message from new peer while my pool is complete.");
1751-
}
1752-
}
1753-
17541778
LOG_TOPIC(TRACE, Logger::AGENCY)
17551779
<< "Gossip " << ((isCallback) ? "callback" : "call") << " from "
17561780
<< endpoint;
@@ -1771,51 +1795,109 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
17711795
}
17721796

17731797
query_t out = std::make_shared<Builder>();
1774-
1775-
{
1776-
VPackObjectBuilder b(out.get());
1777-
1778-
std::vector<std::string> gossipPeers = _config.gossipPeers();
1779-
if (!gossipPeers.empty()) {
1780-
try {
1781-
_config.eraseFromGossipPeers(endpoint);
1782-
} catch (std::exception const& e) {
1783-
LOG_TOPIC(ERR, Logger::AGENCY)
1784-
<< __FILE__ << ":" << __LINE__ << " " << e.what();
1785-
}
1798+
1799+
VPackObjectBuilder b(out.get());
1800+
1801+
std::unordered_set<std::string> gossipPeers = _config.gossipPeers();
1802+
if (!gossipPeers.empty() && !isCallback) {
1803+
try {
1804+
_config.eraseGossipPeer(endpoint);
1805+
} catch (std::exception const& e) {
1806+
LOG_TOPIC(ERR, Logger::AGENCY)
1807+
<< __FILE__ << ":" << __LINE__ << " " << e.what();
17861808
}
1787-
1788-
/// disagreement over pool membership: fatal!
1809+
}
1810+
1811+
std::string err;
1812+
1813+
/// Pool incomplete or the other guy is in my pool: I'll gossip.
1814+
if (!_config.poolComplete() || _config.matchPeer(id, endpoint)) {
1815+
17891816
if (!_config.upsertPool(pslice, id)) {
17901817
LOG_TOPIC(FATAL, Logger::AGENCY) << "Discrepancy in agent pool!";
1791-
FATAL_ERROR_EXIT();
1818+
FATAL_ERROR_EXIT(); /// disagreement over pool membership are fatal!
17921819
}
1793-
1794-
if (!isCallback) { // no gain in callback to a callback.
1795-
auto pool = _config.pool();
1796-
auto active = _config.active();
1797-
1798-
// Wrapped in envelope in RestAgencyPrivHandler
1799-
out->add(VPackValue("pool"));
1800-
{
1801-
VPackObjectBuilder bb(out.get());
1802-
for (auto const& i : pool) {
1803-
out->add(i.first, VPackValue(i.second));
1820+
auto pool = _config.pool();
1821+
1822+
// Wrapped in envelope in RestAgencyPrivHandler
1823+
out->add(VPackValue("pool"));
1824+
{ VPackObjectBuilder bb(out.get());
1825+
for (auto const& i : pool) {
1826+
out->add(i.first, VPackValue(i.second));
1827+
}}
1828+
1829+
} else { // Pool complete & id's endpoint not matching.
1830+
1831+
// Not leader: redirect / 503
1832+
if (challengeLeadership()) {
1833+
out->add("redirect", VPackValue(true));
1834+
out->add("id", VPackValue(leaderID()));
1835+
} else { // leader magic
1836+
auto tmp = _config;
1837+
tmp.upsertPool(pslice, id);
1838+
auto query = std::make_shared<VPackBuilder>();
1839+
{ VPackArrayBuilder trs(query.get());
1840+
{ VPackArrayBuilder tr(query.get());
1841+
{ VPackObjectBuilder o(query.get());
1842+
query->add(VPackValue(RECONFIGURE));
1843+
{ VPackObjectBuilder o(query.get());
1844+
query->add("op", VPackValue("set"));
1845+
query->add(VPackValue("new"));
1846+
{ VPackObjectBuilder c(query.get());
1847+
tmp.toBuilder(*query); }}}}}
1848+
1849+
LOG_TOPIC(DEBUG, Logger::AGENCY)
1850+
<< "persisting new agency configuration via RAFT: " << query->toJson();
1851+
1852+
// Do write
1853+
write_ret_t ret;
1854+
try {
1855+
ret = write(query, WriteMode(false,true));
1856+
arangodb::consensus::index_t max_index = 0;
1857+
if (ret.indices.size() > 0) {
1858+
max_index =
1859+
*std::max_element(ret.indices.begin(), ret.indices.end());
1860+
}
1861+
if (max_index > 0) { // We have a RAFT index. Wait for the RAFT commit.
1862+
auto result = waitFor(max_index);
1863+
if (result != Agent::raft_commit_t::OK) {
1864+
err = "failed to retrieve RAFT index for updated agency endpoints";
1865+
} else {
1866+
auto pool = _config.pool();
1867 F438 +
out->add(VPackValue("pool"));
1868+
{ VPackObjectBuilder bb(out.get());
1869+
for (auto const& i : pool) {
1870+
out->add(i.first, VPackValue(i.second));
1871+
}}
1872+
}
1873+
} else {
1874+
err = "failed to retrieve RAFT index for updated agency endpoints";
18041875
}
1876+
} catch (std::exception const& e) {
1877+
err = std::string("failed to write new agency to RAFT") + e.what();
1878+
LOG_TOPIC(ERR, Logger::AGENCY) << err;
18051879
}
1880+
18061881
}
1882+
1883+
if (!err.empty()) {
1884+
out->add(StaticStrings::Code, VPackValue(500));
1885+
out->add(StaticStrings::Error, VPackValue(true));
1886+
out->add(StaticStrings::ErrorMessage, VPackValue(err));
1887+
out->add(StaticStrings::ErrorNum, VPackValue(500));
1888+
}
18071889
}
18081890

18091891
if (!isCallback) {
1810-
LOG_TOPIC(TRACE, Logger::AGENCY) << "Answering with gossip "
1811-
<< out->slice().toJson();
1892+
LOG_TOPIC(TRACE, Logger::AGENCY)
1893+
<< "Answering with gossip " << out->slice().toJson();
18121894
}
1813-
1895+
18141896
// let gossip loop know that it has new data
18151897
if ( _inception != nullptr && isCallback) {
18161898
_inception->signalConditionVar();
18171899
}
1818-
1900+
18191901
return out;
18201902
}
18211903

@@ -1922,4 +2004,8 @@ Inception const* Agent::inception() const {
19222004
return _inception.get();
19232005
}
19242006

2007+
void Agent::updateConfiguration(Slice const& slice) {
2008+
_config.updateConfiguration(slice);
2009+
}
2010+
19252011
}} // namespace

0 commit comments

Comments
 (0)
0