8000 Spanner: Create new instance if existing Spanner is closed by olavloite · Pull Request #5200 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -494,24 +494,40 @@ static String getServiceAccountProjectId(String credentialsPath) {
*/
@SuppressWarnings("unchecked")
public ServiceT getService() {
if (service == null) {
if (shouldRefreshService(service)) {
service = serviceFactory.create((OptionsT) this);
}
return service;
}

/**
* @param cachedService The currently cached service object
* @return true if the currently cached service object should be refreshed.
*/
protected boolean shouldRefreshService(ServiceT cachedService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this and shouldRefreshRpc be static?

It looks like we're not using anything from the instance but we're allowing for that possibility in the future. If so, should we instead provide a protected getter for the cached service/rpc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to let these two methods be overridable. The base Service and ServiceRpc classes are not closeable and therefore only does a cachedService == null check.

The Spanner class (a subclass of Service) implements AutoCloseable and SpannerOptions therefore implements a custom check in the shouldRefreshService() method that calls Spanner#isClosed() instead of the cachedService == null check.

return cachedService == null;
}

/**
* Returns a Service RPC object for the current service. For instance, when using Google Cloud
* Storage, it returns a StorageRpc object.
*/
@SuppressWarnings("unchecked")
public ServiceRpc getRpc() {
if (rpc == null) {
if (shouldRefreshRpc(rpc)) {
rpc = serviceRpcFactory.create((OptionsT) this);
}
return rpc;
}

/**
* @param cachedRpc The currently cached service object
* @return true if the currently cached service object should be refreshed.
*/
protected boolean shouldRefreshRpc(ServiceRpc cachedRpc) {
return cachedRpc == null;
}

/**
* Returns the project ID. Return value can be null (for services that don't require a project
* ID).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,7 @@ public interface Spanner extends Service<SpannerOptions>, AutoCloseable {
*/
@Override
void close();

/** @return <code>true</code> if this {@link Spanner} object is closed. */
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public void close() {
}
}

@Override
public boolean isClosed() {
return spannerIsClosed;
}

/**
* Checks that the current context is still valid, throwing a CANCELLED or DEADLINE_EXCEEDED error
* if not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,26 @@ protected SpannerRpc getSpannerRpcV1() {
return (SpannerRpc) getRpc();
}

/**
* @return <code>true</code> if the cached Spanner service instance is <code>null</code> or
* closed. This will cause the method {@link #getService()} to create a new {@link SpannerRpc}
* instance when one is requested.
*/
@Override
protected boolean shouldRefreshService(Spanner cachedService) {
return cachedService == null || cachedService.isClosed();
}

/**
* @return <code>true</code> if the cached {@link ServiceRpc} instance is <code>null</code> or
* closed. This will cause the method {@link #getRpc()} to create a new {@link Spanner}
* instance when one is requested.
*/
@Override
protected boolean shouldRefreshRpc(ServiceRpc cachedRpc) {
return cachedRpc == null || ((SpannerRpc) cachedRpc).isClosed();
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private synchronized void shutdown() {
private static final int DEFAULT_PERIOD_SECONDS = 10;

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStub databaseAdminStub;
Expand Down Expand Up @@ -600,13 +601,19 @@ private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String

@Override
public void shutdown() {
this.rpcIsClosed = true;
this.spannerStub.close();
this.instanceAdminStub.close();
this.databaseAdminStub.close();
this.spannerWatchdog.shutdown();
this.executorProvider.shutdown();
}

@Override
public boolean isClosed() {
return rpcIsClosed;
}

/**
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
* the {@link ResultStreamConsumer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,6 @@ PartitionResponse partitionRead(PartitionReadRequest request, @Nullable Map<Opti
throws SpannerException;

public void shutdown();

boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;

import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceRpc;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import java.util.HashMap;
Expand Down Expand Up @@ -122,4 +126,42 @@ public void getDbclientAfterCloseThrows() {
assertThat(e.getMessage()).contains("Cloud Spanner client has been closed");
}
}

@Test
public void testSpannerClosed() throws InterruptedException {
SpannerOptions options = createSpannerOptions();
Spanner spanner1 = options.getService();
Spanner spanner2 = options.getService();
ServiceRpc rpc1 = options.getRpc();
ServiceRpc rpc2 = options.getRpc();
// The SpannerOptions object should return the same instance.
assertThat(spanner1 == spanner2, is(true));
assertThat(rpc1 == rpc2, is(true));
spanner1.close();
// A new instance should be returned as the Spanner instance has been closed.
Spanner spanner3 = options.getService();
assertThat(spanner1 == spanner3, is(false));
// A new instance should be returned as the Spanner instance has been closed.
ServiceRpc rpc3 = options.getRpc();
assertThat(rpc1 == rpc3, is(false));
// Creating a copy of the SpannerOptions should result in new instances.
options = options.toBuilder().build();
Spanner spanner4 = options.getService();
ServiceRpc rpc4 = options.getRpc();
assertThat(spanner4 == spanner3, is(false));
assertThat(rpc4 == rpc3, is(false));
Spanner spanner5 = options.getService();
ServiceRpc rpc5 = options.getRpc();
assertThat(spanner4 == spanner5, is(true));
assertThat(rpc4 == rpc5, is(true));
spanner3.close();
spanner4.close();
}

private SpannerOptions createSpannerOptions() {
return SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setCredentials(NoCredentials.getInstance())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.NoCredentials;
import com.google.cloud.TransportOptions;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
Expand Down Expand Up @@ -353,4 +356,30 @@ public void testNullSessionLabels() {
thrown.expect(NullPointerException.class);
SpannerOptions.newBuilder().setSessionLabels(null);
}

@Test
public void testDoNotCacheClosedSpannerInstance() {
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setCredentials(NoCredentials.getInstance())
.build();
// Getting a service twice should give the same instance.
Spanner service1 = options.getService();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for completeness and coverage, can you add in a few assertions like:
assertThat(spanner1.isClosed()).isFalse();

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assertions.

Spanner service2 = options.getService();
assertThat(service1 == service2, is(true));
assertThat(service1.isClosed()).isFalse();
// Closing a service instance should cause the SpannerOptions to create a new service.
service1.close();
Spanner service3 = options.getService();
assertThat(service3 == service1, is(false));
assertThat(service1.isClosed()).isTrue();
assertThat(service3.isClosed()).isFalse();
;
// Getting another service from the SpannerOptions should return the new cached instance.
Spanner service4 = options.getService();
assertThat(service3 == service4, is(true));
assertThat(service3.isClosed()).isFalse();
service3.close();
}
}
0