@@ -901,14 +901,19 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
901
901
std::vector<ServerID> DBServers = getCurrentDBServers ();
902
902
903
903
int dbServerResult = -1 ;
904
+
904
905
std::function<bool (VPackSlice const & result)> dbServerChanged =
905
906
[&](VPackSlice const & result) {
906
907
size_t numDbServers;
907
908
{
908
909
MUTEX_LOCKER (guard, dbServersMutex);
909
910
numDbServers = DBServers.size ();
910
911
}
911
- if (result.isObject () && result.length () == numDbServers) {
912
+ if (result.isObject () && result.length () >= numDbServers) {
913
+ // We use >= here since the number of DBservers could have increased
914
+ // during the creation of the database and we might not yet have
915
+ // the latest list. Thus there could be more reports than we know
916
+ // servers.
912
917
VPackObjectIterator dbs (result);
913
918
914
919
std::string tmpMsg = " " ;
@@ -934,16 +939,24 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
934
939
}
935
940
}
936
941
if (tmpHaveError) {
942
+ MUTEX_LOCKER (guard, dbServersMutex);
937
943
errorMsg = " Error in creation of database:" + tmpMsg;
938
944
dbServerResult = TRI_ERROR_CLUSTER_COULD_NOT_CREATE_DATABASE;
939
945
return true ;
940
946
}
941
947
loadCurrent (); // update our cache
942
- dbServerResult = setErrormsg (TRI_ERROR_NO_ERROR, errorMsg);
948
+ {
949
+ MUTEX_LOCKER (guard, dbServersMutex);
950
+ dbServerResult = setErrormsg (TRI_ERROR_NO_ERROR, errorMsg);
951
+ }
943
952
}
944
953
return true ;
945
954
};
946
955
956
+ // ATTENTION: The following callback calls the above closure in a
957
+ // different thread. Nevertheless, the closure accesses some of our
958
+ // local variables. Therefore we have to protect all accesses to them
959
+ // by the above mutex.
947
960
auto agencyCallback = std::make_shared<AgencyCallback>(
948
961
ac, " Current/Databases/" + name, dbServerChanged, true , false );
949
962
_agencyCallbackRegistry->registerCallback (agencyCallback);
@@ -974,8 +987,11 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
974
987
int count = 0 ; // this counts, when we have to reload the DBServers
975
988
while (TRI_microtime () <= endTime) {
976
989
agencyCallback->executeByCallbackOrTimeout (getReloadServerListTimeout () / interval);
977
- if (dbServerResult >= 0 ) {
978
- break ;
990
+ {
991
+ MUTEX_LOCKER (guard, dbServersMutex);
992
+ if (dbServerResult >= 0 ) {
993
+ break ;
994
+ }
979
995
}
980
996
981
997
if (++count >= static_cast <int >(getReloadServerListTimeout () / interval)) {
@@ -991,8 +1007,11 @@ int ClusterInfo::createDatabaseCoordinator(std::string const& name,
991
1007
count = 0 ;
992
1008
}
993
1009
}
994
- if (dbServerResult >= 0 ) {
995
- return dbServerResult;
1010
+ {
1011
+ MUTEX_LOCKER (guard, dbServersMutex);
1012
+ if (dbServerResult >= 0 ) {
1013
+ return dbServerResult;
1014
+ }
996
1015
}
997
1016
return setErrormsg (TRI_ERROR_CLUSTER_TIMEOUT, errorMsg);
998
1017
}
0 commit comments