@@ -1120,9 +1120,9 @@ write_ret_t Agent::inquire(query_t const& query) {
1120
1120
1121
1121
1122
1122
// / Write new entries to replicated state and store
1123
- write_ret_t Agent::write (query_t const & query, bool discardStartup ) {
1123
+ write_ret_t Agent::write (query_t const & query, WriteMode const & wmode ) {
1124
1124
1125
- std::vector<bool > applied;
1125
+ std::vector<apply_ret_t > applied;
1126
1126
std::vector<index_t > indices;
1127
1127
auto multihost = size ()>1 ;
1128
1128
@@ -1135,7 +1135,7 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
1135
1135
return write_ret_t (false , leader);
1136
1136
}
1137
1137
1138
- if (!discardStartup) {
1138
+ if (!wmode. discardStartup () ) {
1139
1139
CONDITION_LOCKER (guard, _waitForCV);
1140
1140
while (getPrepareLeadership () != 0 ) {
1141
1141
_waitForCV.wait (100 );
@@ -1179,7 +1179,7 @@ write_ret_t Agent::write(query_t const& query, bool discardStartup) {
1179
1179
_tiLock.assertNotLockedByCurrentThread ();
1180
1180
MUTEX_LOCKER (ioLocker, _ioLock);
1181
1181
1182
- applied = _spearhead.applyTransactions (chunk);
1182
+ applied = _spearhead.applyTransactions (chunk, wmode );
1183
1183
auto tmp = _state.logLeaderMulti (chunk, applied, term ());
1184
1184
indices.insert (indices.end (), tmp.begin (), tmp.end ());
1185
1185
@@ -1340,19 +1340,23 @@ void Agent::persistConfiguration(term_t t) {
1340
1340
{ VPackArrayBuilder trxs (agency.get ());
1341
1341
{ VPackArrayBuilder trx (agency.get ());
1342
1342
{ VPackObjectBuilder oper (agency.get ());
1343
- agency->add (VPackValue (" .agency " ));
1343
+ agency->add (VPackValue (RECONFIGURE ));
1344
1344
{ VPackObjectBuilder a (agency.get ());
1345
- agency->add (" term" , VPackValue (t));
1346
- agency->add (" id" , VPackValue (id ()));
1347
- agency->add (" active" , _config.activeToBuilder ()->slice ());
1348
- agency->add (" pool" , _config.poolToBuilder ()->slice ());
1349
- agency->add (" size" , VPackValue (size ()));
1350
- agency->add (" timeoutMult" , VPackValue (_config.timeoutMult ()));
1351
- }}}}
1345
+ agency->add (" op" , VPackValue (" set" ));
1346
+ agency->add (VPackValue (" new" ));
1347
+ { VPackObjectBuilder aa (agency.get ());
1348
+ agency->add (" term" , VPackValue (t));
1349
+ agency->add (idStr, VPackValue (id ()));
1350
+ agency->add (activeStr, _config.activeToBuilder ()->slice ());
1351
+ agency->add (poolStr, _config.poolToBuilder ()->slice ());
1352
+ agency->add (" size" , VPackValue (size ()));
1353
+ agency->add (timeoutMultStr, VPackValue (_config.timeoutMult ()));
1354
+ }}}}}
1352
1355
1353
1356
// In case we've lost leadership, no harm will arise as the failed write
1354
1357
// prevents bogus agency configuration to be replicated among agents. ***
1355
- write (agency, true );
1358
+ write (agency, WriteMode (true ,true ));
1359
+
1356
1360
}
1357
1361
1358
1362
@@ -1519,6 +1523,11 @@ void Agent::updatePeerEndpoint(query_t const& message) {
1519
1523
1520
1524
}
1521
1525
1526
+
1527
+ bool Agent::addGossipPeer (std::string const & endpoint) {
1528
+ return _config.addGossipPeer (endpoint);
1529
+ }
1530
+
1522
1531
void Agent::updatePeerEndpoint (std::string const & id, std::string const & ep) {
1523
1532
if (_config.updateEndpoint (id, ep)) {
1524
1533
if (!challengeLeadership ()) {
@@ -1715,8 +1724,7 @@ bool Agent::booting() { return (!_config.poolComplete()); }
1715
1724
// / If I know more immediately contact peer with my list.
1716
1725
query_t Agent::gossip (query_t const & in, bool isCallback, size_t version) {
1717
1726
1718
- LOG_TOPIC (DEBUG, Logger::AGENCY) << " Incoming gossip: "
1719
- << in->slice ().toJson ();
1727
+ LOG_TOPIC (DEBUG, Logger::AGENCY) << " Incoming gossip: " << in->slice ().toJson ();
1720
1728
1721
1729
VPackSlice slice = in->slice ();
1722
1730
if (!slice.isObject ()) {
@@ -1726,12 +1734,37 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
1726
1734
slice.typeName ());
1727
1735
}
1728
1736
1737
+ if (slice.hasKey (StaticStrings::Error)) {
1738
+ if (slice.get (StaticStrings::Code).getNumber <int >() == 403 ) {
1739
+ LOG_TOPIC (FATAL, Logger::AGENCY)
1740
+ << " Gossip peer does not have us in their pool " << slice.toJson ();
1741
+ FATAL_ERROR_EXIT (); // / We don't belong here
1742
+ } else {
1743
+ LOG_TOPIC (DEBUG, Logger::AGENCY)
1744
+ << " Received gossip error. We'll retry " << slice.toJson ();
1745
+ }
1746
+ query_t out = std::make_shared<Builder>();
1747
+ return out;
1748
+ }
1749
+
1729
1750
if (!slice.hasKey (" id" ) || !slice.get (" id" ).isString ()) {
1730
1751
THROW_ARANGO_EXCEPTION_MESSAGE (
1731
1752
20002 , " Gossip message must contain string parameter 'id'" );
1732
1753
}
1733
1754
std::string id = slice.get (" id" ).copyString ();
1734
1755
1756
+ // If pool is complete and id not in our pool reject under all circumstances
1757
+ if (_config.poolComplete () && !_config.findInPool (id)) {
1758
+ query_t ret = std::make_shared<VPackBuilder>();
1759
+ { VPackObjectBuilder o (ret.get ());
1760
+ ret->add (StaticStrings::Code, VPackValue (403 ));
1761
+ ret->add (StaticStrings::Error, VPackValue (true ));
1762
+ ret->add (StaticStrings::ErrorMessage,
1763
+ VPackValue (" This agents is not member of this pool" ));
1764
+ ret->add (StaticStrings::ErrorNum, VPackValue (403 )); }
1765
+ return ret;
1766
+ }
1767
+
1735
1768
if (!slice.hasKey (" endpoint" ) || !slice.get (" endpoint" ).isString ()) {
1736
1769
THROW_ARANGO_EXCEPTION_MESSAGE (
1737
1770
20003 , " Gossip message must contain string parameter 'endpoint'" );
@@ -1742,15 +1775,6 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
1742
1775
_inception->reportVersionForEp (endpoint, version);
1743
1776
}
1744
1777
1745
- // If pool complete but knabe is not member => reject at all times
1746
- if (_config.poolComplete ()) {
1747
- auto pool = _config.pool ();
1748
- if (pool.find (id) == pool.end ()) {
1749
- THROW_ARANGO_EXCEPTION_MESSAGE (
1750
- 20003 , " Gossip message from new peer while my pool is complete." );
1751
- }
1752
- }
1753
-
1754
1778
LOG_TOPIC (TRACE, Logger::AGENCY)
1755
1779
<< " Gossip " << ((isCallback) ? " callback" : " call" ) << " from "
1756
1780
<< endpoint;
@@ -1771,51 +1795,109 @@ query_t Agent::gossip(query_t const& in, bool isCallback, size_t version) {
1771
1795
}
1772
1796
1773
1797
query_t out = std::make_shared<Builder>();
1774
-
1775
- {
1776
- VPackObjectBuilder b (out.get ());
1777
-
1778
- std::vector<std::string> gossipPeers = _config.gossipPeers ();
1779
- if (!gossipPeers.empty ()) {
1780
- try {
1781
- _config.eraseFromGossipPeers (endpoint);
1782
- } catch (std::exception const & e) {
1783
- LOG_TOPIC (ERR, Logger::AGENCY)
1784
- << __FILE__ << " :" << __LINE__ << " " << e.what ();
1785
- }
1798
+
1799
+ VPackObjectBuilder b (out.get ());
1800
+
1801
+ std::unordered_set<std::string> gossipPeers = _config.gossipPeers ();
1802
+ if (!gossipPeers.empty () && !isCallback) {
1803
+ try {
1804
+ _config.eraseGossipPeer (endpoint);
1805
+ } catch (std::exception const & e) {
1806
+ LOG_TOPIC (ERR, Logger::AGENCY)
10000
code>
1807
+ << __FILE__ << " :" << __LINE__ << " " << e.what ();
1786
1808
}
1787
-
1788
- // / disagreement over pool membership: fatal!
1809
+ }
1810
+
1811
+ std::string err;
1812
+
1813
+ // / Pool incomplete or the other guy is in my pool: I'll gossip.
1814
+ if (!_config.poolComplete () || _config.matchPeer (id, endpoint)) {
1815
+
1789
1816
if (!_config.upsertPool (pslice, id)) {
1790
1817
LOG_TOPIC (FATAL, Logger::AGENCY) << " Discrepancy in agent pool!" ;
1791
- FATAL_ERROR_EXIT ();
1818
+ FATAL_ERROR_EXIT (); // / disagreement over pool membership are fatal!
1792
1819
}
1793
-
1794
- if (!isCallback) { // no gain in callback to a callback.
1795
- auto pool = _config.pool ();
1796
- auto active = _config.active ();
1797
-
1798
- // Wrapped in envelope in RestAgencyPrivHandler
1799
- out->add (VPackValue (" pool" ));
1800
- {
1801
- VPackObjectBuilder bb (out.get ());
1802
- for (auto const & i : pool) {
1803
- out->add (i.first , VPackValue (i.second ));
1820
+ auto pool = _config.pool ();
1821
+
1822
+ // Wrapped in envelope in RestAgencyPrivHandler
1823
+ out->add (VPackValue (" pool" ));
1824
+ { VPackObjectBuilder bb (out.get ());
1825
+ for (auto const & i : pool) {
1826
+ out->add (i.first , VPackValue (i.second ));
1827
+ }}
1828
+
1829
+ } else { // Pool complete & id's endpoint not matching.
1830
+
1831
+ // Not leader: redirect / 503
1832
+ if (challengeLeadership ()) {
1833
+ out->add (" redirect" , VPackValue (true ));
1834
+ out->add (" id" , VPackValue (leaderID ()));
1835
+ } else { // leader magic
1836
+ auto tmp = _config;
1837
+ tmp.upsertPool (pslice, id);
1838
+ auto query = std::make_shared<VPackBuilder>();
1839
+ { VPackArrayBuilder trs (query.get ());
1840
+ { VPackArrayBuilder tr (query.get ());
1841
+ { VPackObjectBuilder o (query.get ());
1842
+ query->add (VPackValue (RECONFIGURE));
1843
+ { VPackObjectBuilder o (query.get ());
1844
+ query->add (" op" , VPackValue (" set" ));
1845
+ query->add (VPackValue (" new" ));
1846
+ { VPackObjectBuilder c (query.get ());
1847
+ tmp.toBuilder (*query); }}}}}
1848
+
1849
+ LOG_TOPIC (DEBUG, Logger::AGENCY)
1850
+ << " persisting new agency configuration via RAFT: " << query->toJson ();
1851
+
1852
+ // Do write
1853
+ write_ret_t ret;
1854
+ try {
1855
+ ret = write (query, WriteMode (false ,true ));
1856
+ arangodb::consensus::index_t max_index = 0 ;
1857
+ if (ret.indices .size () > 0 ) {
1858
+ max_index =
1859
+ *std::max_element (ret.indices .begin (), ret.indices .end ());
1860
+ }
1861
+ if (max_index > 0 ) { // We have a RAFT index. Wait for the RAFT commit.
1862
+ auto result = waitFor (max_index);
1863
+ if (result != Agent::raft_commit_t ::OK) {
1864
+ err = " failed to retrieve RAFT index for updated agency endpoints" ;
1865
+ } else {
1866
+ auto pool = _config.pool ();
1867
F438
+ out->add (VPackValue (" pool" ));
1868
+ { VPackObjectBuilder bb (out.get ());
1869
+ for (auto const & i : pool) {
1870
+ out->add (i.first , VPackValue (i.second ));
1871
+ }}
1872
+ }
1873
+ } else {
1874
+ err = " failed to retrieve RAFT index for updated agency endpoints" ;
1804
1875
}
1876
+ } catch (std::exception const & e) {
1877
+ err = std::string (" failed to write new agency to RAFT" ) + e.what ();
1878
+ LOG_TOPIC (ERR, Logger::AGENCY) << err;
1805
1879
}
1880
+
1806
1881
}
1882
+
1883
+ if (!err.empty ()) {
1884
+ out->add (StaticStrings::Code, VPackValue (500 ));
1885
+ out->add (StaticStrings::Error, VPackValue (true ));
1886
+ out->add (StaticStrings::ErrorMessage, VPackValue (err));
1887
+ out->add (StaticStrings::ErrorNum, VPackValue (500 ));
1888
+ }
1807
1889
}
1808
1890
1809
1891
if (!isCallback) {
1810
- LOG_TOPIC (TRACE, Logger::AGENCY) << " Answering with gossip "
1811
- << out->slice ().toJson ();
1892
+ LOG_TOPIC (TRACE, Logger::AGENCY)
1893
+ << " Answering with gossip " << out->slice ().toJson ();
1812
1894
}
1813
-
1895
+
1814
1896
// let gossip loop know that it has new data
1815
1897
if ( _inception != nullptr && isCallback) {
1816
1898
_inception->signalConditionVar ();
1817
1899
}
1818
-
1900
+
1819
1901
return out;
1820
1902
}
1821
1903
@@ -1922,4 +2004,8 @@ Inception const* Agent::inception() const {
1922
2004
return _inception.get ();
1923
2005
}
1924
2006
2007
+ void Agent::updateConfiguration (Slice const & slice) {
2008
+ _config.updateConfiguration (slice);
2009
+ }
2010
+
1925
2011
}} // namespace
0 commit comments