8000 Avoid eagerly consuming child iterator · kecmu/java-driver@fc6b74a · GitHub
[go: up one dir, main page]

Skip to content 8000

Commit fc6b74a

Browse files
author
Alexandre Dutra
committed
Avoid eagerly consuming child iterator
1 parent df467c4 commit fc6b74a

File tree

1 file changed

+36
-15
lines changed

1 file changed

+36
-15
lines changed

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datastax.driver.core.policies;
1717

1818
import com.datastax.driver.core.*;
19+
import com.google.common.collect.AbstractIterator;
1920

2021
import java.nio.ByteBuffer;
2122
import java.util.*;
@@ -116,27 +117,47 @@ public Iterator<Host> newQueryPlan(final String loggedKeyspace, final Statement
116117
if (keyspace == null)
117118
keyspace = loggedKeyspace;
118119

119-
if (partitionKey == null || keyspace == null)
120-
return childPolicy.newQueryPlan(keyspace, statement);
121-
122-
Set<Host> replicas = clusterMetadata.getReplicas(Metadata.quote(keyspace), partitionKey);
120+
final Iterator<Host> childIterator = childPolicy.newQueryPlan(keyspace, statement);
123121

124-
Iterator<Host> childIterator = childPolicy.newQueryPlan(loggedKeyspace, statement);
122+
if (partitionKey == null || keyspace == null)
123+
return childIterator;
125124

125+
final Set<Host> replicas = clusterMetadata.getReplicas(Metadata.quote(keyspace), partitionKey);
126126
if (replicas.isEmpty())
127127
return childIterator;
128128

129-
List<Host> queryPlan = new ArrayList<Host>();
130-
int i = 0;
131-
while (childIterator.hasNext()) {
132-
Host host = childIterator.next();
133-
if (host.isUp() && childPolicy.distance(host) == HostDistance.LOCAL && replicas.contains(host))
134-
queryPlan.add(i++, host);
135-
else
136-
queryPlan.add(host);
137-
}
138-
return queryPlan.iterator();
129+
return new AbstractIterator<Host>() {
130+
131+
private Iterator<Host> nonReplicaIter;
132+
private List<Host> nonReplicas = new ArrayList<Host>();
133+
134+
@Override
135+
protected Host computeNext() {
136+
137+
while (childIterator.hasNext()) {
138+
139+
Host host = childIterator.next();
140+
141+
if (host.isUp() && replicas.contains(host) && childPolicy.distance(host) == HostDistance.LOCAL) {
142+
// UP replicas should be prioritized, retaining order from childPolicy
143+
return host;
144+
} else {
145+
// save for later
146+
nonReplicas.add(host);
147+
}
148+
149+
}
150+
151+
// This should 7904 only engage if all local replicas are DOWN
152+
if (nonReplicaIter == null)
153+
nonReplicaIter = nonReplicas.iterator();
154+
155+
if (nonReplicaIter.hasNext())
156+
return nonReplicaIter.next();
139157

158+
return endOfData();
159+
}
160+
};
140161
}
141162

142163
@Override

0 commit comments

Comments
 (0)
0