diff --git a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java index d82b68db74f3..1474640b5bba 100644 --- a/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java +++ b/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java @@ -18,16 +18,19 @@ import static com.google.common.base.MoreObjects.firstNonNull; +import com.google.api.gax.core.ApiFuture; +import com.google.api.gax.core.ApiFutureCallback; +import com.google.api.gax.core.ApiFutures; import com.google.cloud.MonitoredResource; import com.google.cloud.logging.Logging.WriteOption; -import com.google.api.gax.core.ApiFutures; -import com.google.api.gax.core.ApiFutureCallback; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Set; import java.util.logging.ErrorManager; import java.util.logging.Filter; import java.util.logging.Formatter; @@ -120,6 +123,10 @@ public class LoggingHandler extends Handler { // https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 . private final Level baseLevel; + private final Object writeLock = new Object(); + private final Set> pendingWrites = + Collections.newSetFromMap(new IdentityHashMap, Boolean>()); + /** * Creates an handler that publishes messages to Stackdriver Logging. */ @@ -376,6 +383,9 @@ public void publish(LogRecord record) { if (entry != null) { write(entry, writeOptions); } + if (record.getLevel().intValue() >= flushLevel.intValue()) { + flush(); + } } finally { inPublishCall.remove(); } @@ -457,28 +467,60 @@ void write(LogEntry entry, WriteOption... options) { reportError(null, ex, ErrorManager.FLUSH_FAILURE); } break; + case ASYNC: default: - ApiFutures.addCallback(getLogging().writeAsync(entryList, options), new ApiFutureCallback() { - @Override - public void onSuccess(Void v) {} - - @Override - public void onFailure(Throwable t) { - if (t instanceof Exception) { - reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); - } else { - reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); - } - } - }); + final ApiFuture writeFuture = getLogging().writeAsync(entryList, options); + synchronized(writeLock) { + pendingWrites.add(writeFuture); + } + ApiFutures.addCallback( + writeFuture, + new ApiFutureCallback() { + private void removeFromPending() { + synchronized(writeLock) { + pendingWrites.remove(writeFuture); + } + } + + @Override + public void onSuccess(Void v) { + removeFromPending(); + } + + @Override + public void onFailure(Throwable t) { + try { + if (t instanceof Exception) { + reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE); + } else { + reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE); + } + } finally { + removeFromPending(); + } + } + }); break; } } @Override public void flush() { - // BUG(1795): flush is broken, need support from batching implementation. + // BUG(1795): We should force batcher to issue RPC call for buffered messages, + // so the code below doesn't wait uselessly. + + ArrayList> writesToFlush = new ArrayList<>(); + synchronized(writeLock) { + writesToFlush.addAll(pendingWrites); + } + for (ApiFuture write : writesToFlush) { + try { + Uninterruptibles.getUninterruptibly(write); + } catch (Exception e) { + // Ignore exceptions, they are propagated to the error manager. + } + } } /** diff --git a/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java b/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java index 7ac494013234..a9d31cbc726e 100644 --- a/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java +++ b/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java @@ -18,8 +18,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import com.google.api.gax.core.ApiFutures; +import com.google.api.gax.core.SettableApiFuture; import com.google.cloud.MonitoredResource; import com.google.cloud.logging.LogEntry.Builder; import com.google.cloud.logging.Logging.WriteOption; @@ -380,6 +383,41 @@ public void testFlushLevel() { handler.publish(newLogRecord(Level.WARNING, MESSAGE)); } + @Test + public void testFlush() throws InterruptedException { + final SettableApiFuture mockRpc = SettableApiFuture.create(); + + EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(options.getService()).andReturn(logging); + logging.writeAsync(ImmutableList.of(INFO_ENTRY), DEFAULT_OPTIONS); + EasyMock.expectLastCall().andReturn(mockRpc); + EasyMock.replay(options, logging); + final LoggingHandler handler = new LoggingHandler(LOG_NAME, options); + handler.setFormatter(new TestFormatter()); + + // no messages, nothing to flush. + handler.flush(); + + // send a message + handler.publish(newLogRecord(Level.INFO, MESSAGE)); + Thread flushWaiter = new Thread(new Runnable() { + @Override + public void run() { + handler.flush(); + } + }); + flushWaiter.start(); + + // flushWaiter should be waiting for mockRpc to complete. + flushWaiter.join(1000); + assertTrue(flushWaiter.isAlive()); + + // With the RPC completed, flush should return, and the thread should terminate. + mockRpc.set(null); + flushWaiter.join(1000); + assertFalse(flushWaiter.isAlive()); + } + @Test public void testSyncWrite() { EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();