From df27b967205cc8ba3c1d2c275dc26653eeee708f Mon Sep 17 00:00:00 2001 From: Kenneth Jung Date: Tue, 2 Jul 2019 21:54:32 -0400 Subject: [PATCH 1/2] Update resumption strategy to use format-independent row count. This change modifies the ReadRowsResumptionStrategy helper class in the BigQuery storage client to use the new format-independent row count value in the ReadRowsResponse message in order to track stream position. It also modifies various test files to use the new row count value. --- .../readrows/ReadRowsResumptionStrategy.java | 2 +- .../it/ITBigQueryStorageLongRunningTest.java | 20 +++---- .../v1beta1/it/ITBigQueryStorageTest.java | 55 ++++++------------- .../stub/readrows/ReadRowsRetryTest.java | 5 +- 4 files changed, 27 insertions(+), 55 deletions(-) diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java index 9684b8d2fc79..a6533133798f 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java @@ -42,7 +42,7 @@ public StreamResumptionStrategy createNew() { @Override public ReadRowsResponse processResponse(ReadRowsResponse response) { - rowsProcessed += response.getAvroRows().getRowCount(); + rowsProcessed += response.getRowCount(); return response; } diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java index 5f6df6ab361c..fea5e0733bd1 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java @@ -121,12 +121,12 @@ public Long call() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(tasks.size()); List> results = executor.invokeAll(tasks); - long avroRowCount = 0; + long rowCount = 0; for (Future result : results) { - avroRowCount += result.get(); + rowCount += result.get(); } - assertEquals(313_797_035, avroRowCount); + assertEquals(313_797_035, rowCount); } private long readAllRowsFromStream(Stream stream) { @@ -135,19 +135,13 @@ private long readAllRowsFromStream(Stream stream) { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); - long avroRowCount = 0; + long rowCount = 0; ServerStream serverStream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : serverStream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far from stream '%s'. ReadRows response:%n%s", - avroRowCount, stream.getName(), response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); } - LOG.info( - String.format("Read total of %d rows from stream '%s'.", avroRowCount, stream.getName())); - return avroRowCount; + LOG.info(String.format("Read total of %d rows from stream '%s'.", rowCount, stream.getName())); + return rowCount; } } diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java index dfb186beb617..aec24a4abc40 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java @@ -149,18 +149,13 @@ public void testSimpleRead() { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); - long avroRowCount = 0; + long rowCount = 0; ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); } - assertEquals(164_656, avroRowCount); + assertEquals(164_656, rowCount); } @Test @@ -187,12 +182,12 @@ public void testSimpleReadAndResume() { // We have to read some number of rows in order to be able to resume. More details: // https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#google.cloud.bigquery.storage.v1beta1.ReadRowsRequest - long avroRowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846); + long rowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846); StreamPosition readPosition = StreamPosition.newBuilder() .setStream(session.getStreams(0)) - .setOffset(avroRowCount) + .setOffset(rowCount) .build(); ReadRowsRequest readRowsRequest = @@ -201,17 +196,12 @@ public void testSimpleReadAndResume() { ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); } // Verifies that the number of rows skipped and read equals to the total number of rows in the // table. - assertEquals(164_656, avroRowCount); + assertEquals(164_656, rowCount); } @Test @@ -252,17 +242,11 @@ public void testFilter() throws IOException { SimpleRowReader reader = new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); - long avroRowCount = 0; + long rowCount = 0; ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); - + rowCount += response.getRowCount(); reader.processRows( response.getAvroRows(), new SimpleRowReader.AvroRowConsumer() { @@ -276,7 +260,7 @@ public void accept(GenericData.Record record) { }); } - assertEquals(1_333, avroRowCount); + assertEquals(1_333, rowCount); } @Test @@ -336,15 +320,10 @@ public void testColumnSelection() throws IOException { SimpleRowReader reader = new SimpleRowReader(avroSchema); - long avroRowCount = 0; + long rowCount = 0; ServerStream stream = client.readRowsCallable().call(readRowsRequest); for (ReadRowsResponse response : stream) { - assertTrue( - String.format( - "Response is missing 'avro_rows'. Read %d rows so far. ReadRows response:%n%s", - avroRowCount, response.toString()), - response.hasAvroRows()); - avroRowCount += response.getAvroRows().getRowCount(); + rowCount += response.getRowCount(); reader.processRows( response.getAvroRows(), new SimpleRowReader.AvroRowConsumer() { @@ -362,7 +341,7 @@ public void accept(GenericData.Record record) { }); } - assertEquals(1_333, avroRowCount); + assertEquals(1_333, rowCount); } @Test @@ -864,19 +843,19 @@ private long ReadStreamToOffset(Stream stream, long rowOffset) { ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); - long avroRowCount = 0; + long rowCount = 0; ServerStream serverStream = client.readRowsCallable().call(readRowsRequest); Iterator responseIterator = serverStream.iterator(); while (responseIterator.hasNext()) { ReadRowsResponse response = responseIterator.next(); - avroRowCount += response.getAvroRows().getRowCount(); - if (avroRowCount >= rowOffset) { + rowCount += response.getRowCount(); + if (rowCount >= rowOffset) { return rowOffset; } } - return avroRowCount; + return rowCount; } /** diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java index 52da9b504eaa..bc602e41ed83 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java @@ -19,7 +19,6 @@ import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.ServerStream; -import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroRows; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageGrpc.BigQueryStorageImplBase; import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; @@ -167,7 +166,7 @@ private int getRowCount(ReadRowsRequest request) { ServerStream serverStream = client.readRowsCallable().call(request); int rowCount = 0; for (ReadRowsResponse readRowsResponse : serverStream) { - rowCount += readRowsResponse.getAvroRows().getRowCount(); + rowCount += readRowsResponse.getRowCount(); } return rowCount; } @@ -233,7 +232,7 @@ static ReadRowsRequest createRequest(String streamName, long offset) { static ReadRowsResponse createResponse(int numberOfRows) { return ReadRowsResponse.newBuilder() - .setAvroRows(AvroRows.newBuilder().setRowCount(numberOfRows)) + .setRowCount(numberOfRows) .build(); } From 610760ba6b69513c661caed695af814599f373c3 Mon Sep 17 00:00:00 2001 From: Kenneth Jung Date: Wed, 3 Jul 2019 15:27:53 -0400 Subject: [PATCH 2/2] Fix checkstyle errors --- .../v1beta1/stub/readrows/ReadRowsResumptionStrategy.java | 3 +++ .../v1beta1/it/ITBigQueryStorageLongRunningTest.java | 1 - .../bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java | 6 +----- .../storage/v1beta1/stub/readrows/ReadRowsRetryTest.java | 4 +--- 4 files changed, 5 insertions(+), 9 deletions(-) diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java index a6533133798f..ecad11e15f6c 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsResumptionStrategy.java @@ -19,6 +19,7 @@ import com.google.api.gax.retrying.StreamResumptionStrategy; import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse; +import javax.annotation.Nonnull; /** * An implementation of a {@link StreamResumptionStrategy} for the ReadRows API. This class tracks @@ -36,11 +37,13 @@ public class ReadRowsResumptionStrategy private long rowsProcessed = 0; @Override + @Nonnull public StreamResumptionStrategy createNew() { return new ReadRowsResumptionStrategy(); } @Override + @Nonnull public ReadRowsResponse processResponse(ReadRowsResponse response) { rowsProcessed += response.getRowCount(); return response; diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java index fea5e0733bd1..5043c6044672 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageLongRunningTest.java @@ -17,7 +17,6 @@ package com.google.cloud.bigquery.storage.v1beta1.it; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.ServiceOptions; diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java index aec24a4abc40..79727cd1f26f 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.RetryOption; @@ -185,10 +184,7 @@ public void testSimpleReadAndResume() { long rowCount = ReadStreamToOffset(session.getStreams(0), /* rowOffset = */ 34_846); StreamPosition readPosition = - StreamPosition.newBuilder() - .setStream(session.getStreams(0)) - .setOffset(rowCount) - .build(); + StreamPosition.newBuilder().setStream(session.getStreams(0)).setOffset(rowCount).build(); ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); diff --git a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java index bc602e41ed83..714e30b39027 100644 --- a/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java +++ b/google-cloud-clients/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/stub/readrows/ReadRowsRetryTest.java @@ -231,9 +231,7 @@ static ReadRowsRequest createRequest(String streamName, long offset) { } static ReadRowsResponse createResponse(int numberOfRows) { - return ReadRowsResponse.newBuilder() - .setRowCount(numberOfRows) - .build(); + return ReadRowsResponse.newBuilder().setRowCount(numberOfRows).build(); } RpcExpectation expectRequest(String streamName, long offset) {