8000 Bump to v7.16.0 by lihsai0 · Pull Request #457 · qiniu/python-sdk · GitHub
[go: up one dir, main page]

Skip to content

Bump to v7.16.0 #457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 25, 2025
Prev Previous commit
Next Next commit
feat: add single flight and apply to QueryRegionsProvider
  • Loading branch information
lihsai0 committed Dec 11, 2024
commit 642c71397a29ed7ccf107ee013ba6e0420628c0a
14 changes: 13 additions & 1 deletion qiniu/http/regions_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .region import Region, ServiceName
from .default_client import qn_http_client
from .middleware import RetryDomainsMiddleware
from .single_flight import SingleFlight


class RegionsProvider:
Expand Down Expand Up @@ -70,6 +71,9 @@ def _get_region_from_query(data, **kwargs):
)


_query_regions_single_flight = SingleFlight()


class QueryRegionsProvider(RegionsProvider):
def __init__(
self,
Expand All @@ -95,7 +99,15 @@ def __init__(
self.max_retry_times_per_endpoint = max_retry_times_per_endpoint

def __iter__(self):
regions = self.__fetch_regions()
endpoints_md5 = io_md5([
to_bytes(e.host) for e in self.endpoints_provider
])
flight_key = ':'.join([
endpoints_md5,
self.access_key,
self.bucket_name
])
regions = _query_regions_single_flight.do(flight_key, self.__fetch_regions)
# change to `yield from` when min version of python update to >= 3.3
for r in regions:
yield r
Expand Down
50 changes: 50 additions & 0 deletions qiniu/http/single_flight.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import threading


class _FlightLock:
"""
Do not use dataclass which caused the event created only once
"""
def __init__(self):
self.event = threading.Event()
self.result = None
self.error = None


class SingleFlight:
def __init__(self):
self._locks = {}
self._lock = threading.Lock()

def do(self, key, fn, *args, **kwargs):
# here does not use `with` statement
# because need to wait by another object if it exists,
# and reduce the `acquire` times if it not exists
self._lock.acquire()
if key in self._locks:
flight_lock = self._locks[key]

self._lock.release()
flight_lock.event.wait()

if flight_lock.error:
raise flight_lock.error
return flight_lock.result

flight_lock = _FlightLock()
self._locks[key] = flight_lock
self._lock.release()

try:
flight_lock.result = fn(*args, **kwargs)
except Exception as e:
flight_lock.error = e
finally:
flight_lock.event.set()

with self._lock:
del self._locks[key]

if flight_lock.error:
raise flight_lock.error
return flight_lock.result
59 changes: 59 additions & 0 deletions tests/cases/test_http/test_single_flight.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import pytest
import time
from multiprocessing.pool import ThreadPool

from qiniu.http.single_flight import SingleFlight

class TestSingleFlight:
def test_single_flight_success(self):
sf = SingleFlight()

def fn():
return "result"

result = sf.do("key1", fn)
assert result == "result"

def test_single_flight_exception(self):
sf = SingleFlight()

def fn():
raise ValueError("error")

with pytest.raises(ValueError, match="error"):
sf.do("key2", fn)

def test_single_flight_concurrent(self):
sf = SingleFlight()
share_state = []
results = []

def fn():
time.sleep(1)
share_state.append('share_state')
return "result"

def worker(_n):
result = sf.do("key3", fn)
results.append(result)

ThreadPool(2).map(worker, range(5))

assert len(share_state) == 3
assert all(result == "result" for result in results)

def test_single_flight_different_keys(self):
sf = SingleFlight()
results = []

def fn():
time.sleep(1)
return "result"

def worker(n):
result = sf.do("key{}".format(n), fn)
results.append(result)

ThreadPool(2).map(worker, range(2))
assert len(results) == 2
assert all(result == "result" for result in results)
0