diff --git a/google/cloud/logging_v2/handlers/handlers.py b/google/cloud/logging_v2/handlers/handlers.py index e71f673f7..c8ee9dbb5 100644 --- a/google/cloud/logging_v2/handlers/handlers.py +++ b/google/cloud/logging_v2/handlers/handlers.py @@ -188,7 +188,7 @@ def __init__( resource = detect_resource(client.project) self.name = name self.client = client - self.transport = transport(client, name, resource=resource) + self.transport = transport(client, name, resource=resource, **kwargs) self.project_id = client.project self.resource = resource self.labels = labels diff --git a/google/cloud/logging_v2/handlers/transports/background_thread.py b/google/cloud/logging_v2/handlers/transports/background_thread.py index 7cf2799f5..ea45c4073 100644 --- a/google/cloud/logging_v2/handlers/transports/background_thread.py +++ b/google/cloud/logging_v2/handlers/transports/background_thread.py @@ -34,6 +34,7 @@ _DEFAULT_GRACE_PERIOD = 5.0 # Seconds _DEFAULT_MAX_BATCH_SIZE = 10 _DEFAULT_MAX_LATENCY = 0 # Seconds +_DEFAULT_REGISTER_EXIT_CALLBACK = True _WORKER_THREAD_NAME = "google.cloud.logging.Worker" _WORKER_TERMINATOR = object() _LOGGER = logging.getLogger(__name__) @@ -79,6 +80,7 @@ def __init__( grace_period=_DEFAULT_GRACE_PERIOD, max_batch_size=_DEFAULT_MAX_BATCH_SIZE, max_latency=_DEFAULT_MAX_LATENCY, + register_exit_callback=_DEFAULT_REGISTER_EXIT_CALLBACK, ): """ Args: @@ -93,11 +95,15 @@ def __init__( than the grace_period. This means this is effectively the longest amount of time the background thread will hold onto log entries before sending them to the server. + register_exit_callback (Optional[bool]): Whether to register the atexit callback + or not. Starting Python 3.12 atexit does not allow to create new threads, what + happens when using gRPC. """ self._cloud_logger = cloud_logger self._grace_period = grace_period self._max_batch_size = max_batch_size self._max_latency = max_latency + self._register_exit_callback = register_exit_callback self._queue = queue.Queue(0) self._operational_lock = threading.Lock() self._thread = None @@ -162,7 +168,8 @@ def start(self): ) self._thread.daemon = True self._thread.start() - atexit.register(self._main_thread_terminated) + if self._register_exit_callback: + atexit.register(self._main_thread_terminated) def stop(self, *, grace_period=None): """Signals the background thread to stop. @@ -264,6 +271,7 @@ def __init__( batch_size=_DEFAULT_MAX_BATCH_SIZE, max_latency=_DEFAULT_MAX_LATENCY, resource=_GLOBAL_RESOURCE, + register_exit_callback=_DEFAULT_REGISTER_EXIT_CALLBACK, **kwargs, ): """ @@ -280,6 +288,9 @@ def __init__( than the grace_period. This means this is effectively the longest amount of time the background thread will hold onto log entries before sending them to the server. + register_exit_callback (Optional[bool]): Whether to register the atexit callback + or not. Starting Python 3.12 atexit does not allow to create new threads, what + happens when using gRPC. resource (Optional[Resource|dict]): The default monitored resource to associate with logs when not specified """ @@ -290,6 +301,7 @@ def __init__( grace_period=grace_period, max_batch_size=batch_size, max_latency=max_latency, + register_exit_callback=register_exit_callback, ) self.worker.start() diff --git a/tests/unit/handlers/transports/test_background_thread.py b/tests/unit/handlers/transports/test_background_thread.py index d4954ff7b..1f441ee8b 100644 --- a/tests/unit/handlers/transports/test_background_thread.py +++ b/tests/unit/handlers/transports/test_background_thread.py @@ -230,6 +230,25 @@ def test_start(self): self._start_with_thread_patch(worker) self.assertIs(current_thread, worker._thread) + def test_start_not_registering_exit_callback(self): + from google.cloud.logging_v2.handlers.transports import background_thread + + worker = self._make_one(_Logger(self.NAME), register_exit_callback=False) + + _, atexit_mock = self._start_with_thread_patch(worker) + + self.assertTrue(worker.is_alive) + self.assertIsNotNone(worker._thread) + self.assertTrue(worker._thread.daemon) + self.assertEqual(worker._thread._target, worker._thread_main) + self.assertEqual(worker._thread._name, background_thread._WORKER_THREAD_NAME) + atexit_mock.assert_not_called() + + # Calling start again should not start a new thread. + current_thread = worker._thread + self._start_with_thread_patch(worker) + self.assertIs(current_thread, worker._thread) + def test_stop(self): from google.cloud.logging_v2.handlers.transports import background_thread