10BC0 chore: 优化缓存 · TTB-Network/python-openbmclapi@91d4412 · GitHub
[go: up one dir, main page]

Skip to content

Commit 91d4412

Browse files
committed
chore: 优化缓存
1 parent c30770e commit 91d4412

File tree

8 files changed

+106
-132
lines changed

8 files changed

+106
-132
lines changed

core/cluster.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,8 +387,10 @@ async def enable(self):
387387
self._retry_times = 0
388388
logger.tinfo("cluster.enable", id=self.id)
389389
self._keepalive_lock.release()
390+
except TimeoutError:
391+
logger.ttraceback("cluster.enable.timeout", id=self.id)
390392
except:
391-
logger.terror("cluster.enable.timeout", id=self.id)
393+
logger.traceback()
392394
finally:
393395
self._want_enable = False
394396
if not self._enabled:

core/dashboard.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ async def put(self, event: str, data: Any):
3737
async def setup(
3838
app: fastapi.FastAPI
3939
):
40+
# TODO: 面板
41+
return
42+
4043
@app.get("/favicon.ico")
4144
def favicon():
4245
return fastapi.responses.FileResponse(

core/storage/alist.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ def __init__(
4545
self._username = username
4646
self._password = password
4747
self._redirect_urls: utils.UnboundTTLCache[str, abc.ResponseFile] = utils.UnboundTTLCache(
48-
maxsize=int(units.parse_number_units(kwargs.get("cache_size", "10000"))),
49-
ttl=units.parse_time_units(kwargs.get("cache_ttl", "5m"))
48+
maxsize=units.parse_number_units(kwargs.get("cache_size", "inf")),
49+
ttl=units.parse_time_units(kwargs.get("cache_ttl", "1h"))
5050
)
5151
self._token = None
5252

core/storage/minio.py

Lines changed: 88 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
from datetime import timedelta
2+
import datetime
13
import io
24
import tempfile
35
import time
6+
from typing import Optional
47
import urllib.parse as urlparse
58
import aiohttp
69
import anyio.abc
@@ -11,6 +14,7 @@
1114

1215
from .abc import CPath, FileInfo, Storage
1316
from miniopy_async import Minio
17+
from miniopy_async.api import BaseURL, presign_v4
1418
from miniopy_async.datatypes import Object
1519
from tianxiu2b2t import units
1620

@@ -39,14 +43,10 @@ def __init__(
3943
self.public_endpoint = kwargs.get("public_endpoint")
4044

4145
url = urlparse.urlparse(self.endpoint)
42-
43-
self._cache_files: UnboundTTLCache[str, FileInfo] = UnboundTTLCache(
44-
maxsize=int(units.parse_number_units(kwargs.get("cache_size", "10000"))),
45-
ttl=units.parse_time_units(kwargs.get("cache_files_ttl", "120s"))
46-
)
46+
self.ttl = units.parse_time_units(kwargs.get("cache_ttl", "1h"))
4747
self._cache: UnboundTTLCache[str, ResponseFile] = UnboundTTLCache(
48-
maxsize=int(units.parse_number_units(kwargs.get("cache_size", "10000"))),
49-
ttl=units.parse_time_units(kwargs.get("cache_ttl", "5m"))
48+
maxsize=units.parse_number_units(kwargs.get("cache_size", "inf")),
49+
ttl=self.ttl
5050
)
5151

5252
self.minio = Minio(
@@ -136,69 +136,98 @@ async def get_file(self, path: str) -> ResponseFile:
136136
file = self._cache.get(cpath)
137137
if file is not None:
138138
return file
139-
async with aiohttp.ClientSession() as session:
140-
fileinfo = self._cache_files.get(cpath)
141-
resp = None
142-
if not fileinfo:
143-
resp = await self.minio.get_object(
144-
self.bucket,
145-
cpath[1:],
146-
session,
147-
)
148-
fileinfo = FileInfo(
149-
name=cname,
150-
size=int(resp.headers.get("content-length") or 0),
151-
path=cpath,
152-
)
153-
self._cache_files[cpath] = fileinfo
154-
if self.custom_host is None and self.public_endpoint is None:
155-
if resp is None:
156-
resp = await self.minio.get_object(
157-
self.bucket,
158-
cpath[1:],
159-
session,
160-
)
161-
file = ResponseFileMemory(
162-
data=await resp.read(),
163-
size=fileinfo.size
164-
)
165-
resp.release()
166-
resp = None
167-
if file is not None:
168-
self._cache[cpath] = file
169-
return file
170-
if self.custom_host is not None:
139+
stat = await self.minio.stat_object(
140+
self.bucket,
141+
cpath[1:],
142+
)
143+
if stat.size == 0:
144+
file = ResponseFileMemory(
145+
b"",
146+
0
147+
)
148+
elif self.custom_host is not None:
171149
file = ResponseFileRemote(
172-
f"{self.custom_host}{cpath}",
173-
fileinfo.size
150+
f"{self.custom_host}/{self.bucket}{cpath}",
151+
int(stat.size or 0),
174152
)
175153
elif self.public_endpoint is not None:
176-
url = await self.minio.get_presigned_url(
154+
url = await get_presigned_url(
155+
self.minio,
177156
"GET",
178157
self.bucket,
179158
cpath[1:],
180-
)
181-
urlobj = urlparse.urlparse(url)
182-
# replace host
183-
pub_urlobj = urlparse.urlparse(self.public_endpoint)
184-
url: str = urlparse.urlunparse(
185-
(
186-
pub_urlobj.scheme or urlobj.scheme,
187-
pub_urlobj.netloc or urlobj.netloc,
188-
urlobj.path,
189-
urlobj.params,
190-
urlobj.query,
191-
urlobj.fragment,
192-
)
159+
region=self.region,
160+
change_host=self.public_endpoint,
193161
)
194162
file = ResponseFileRemote(
195163
url,
196-
fileinfo.size
164+
int(stat.size or 0),
197165
)
198-
if file is None:
199-
return ResponseFileNotFound()
166+
else:
167+
async with aiohttp.ClientSession() as session:
168+
async with (await self.minio.get_object(
169+
self.bucket,
170+
cpath[1:],
171+
session
172+
)) as resp:
173+
file = ResponseFileMemory(
174+
await resp.read(),
175+
int(stat.size or 0),
176+
)
200177
self._cache[cpath] = file
201178
return file
179+
180+
181+
202182

203183

204-
184+
185+
186+
async def get_presigned_url(
187+
minio: Minio,
188+
method: str,
189+
bucket_name: str,
190+
object_name: str,
191+
region: Optional[str] = None,
192+
expires: timedelta = timedelta(days=7),
193+
request_date: Optional[datetime.datetime] = None,
194+
extra_query_params=None,
195+
change_host=None,
196+
):
197+
if expires.total_seconds() < 1 or expires.total_seconds() > 604800:
198+
raise ValueError("expires must be between 1 second to 7 days")
199+
region = region or await minio._get_region(bucket_name, None)
200+
query_params = extra_query_params or {}
201+
creds = minio._provider.retrieve() if minio._provider else None
202+
if creds and creds.session_token:
203+
query_params["X-Amz-Security-Token"] = creds.session_token
204+
url = None
205+
if change_host:
206+
url = BaseURL(
207+
change_host,
208+
region,
209+
).build(
210+
method,
211+
region,
212+
bucket_name=bucket_name,
213+
object_name=object_name,
214+
query_params=query_params,
215+
)
216+
else:
217+
url = minio._base_url.build(
218+
method,
219+
region,
220+
bucket_name=bucket_name,
221+
object_name=object_name,
222+
query_params=query_params,
223+
)
224+
if creds:
225+
url = presign_v4(
226+
method,
227+
url,
228+
region,
229+
creds,
230+
request_date or datetime.datetime.now(datetime.UTC),
231+
int(expires.total_seconds()),
232+
)
233+
return urlparse.urlunsplit(url)

core/storage/s3.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,12 @@ def __init__(
6363
self.public_endpoint = kwargs.get("public_endpoint", "")
6464
self.session = aioboto3.Session()
6565
self._cache: UnboundTTLCache[str, abc.ResponseFile] = UnboundTTLCache(
66-
maxsize=int(units.parse_number_units(kwargs.get("cache_size", "10000"))),
67-
ttl=units.parse_time_units(kwargs.get("cache_ttl", "5m"))
66+
maxsize=units.parse_number_units(kwargs.get("cache_size", "inf")),
67+
ttl=units.parse_time_units(kwargs.get("cache_ttl", "1h"))
6868
)
6969
self._cache_files: UnboundTTLCache[str, abc.FileInfo] = UnboundTTLCache(
70-
maxsize=int(units.parse_number_units(kwargs.get("cache_size", "10000"))),
71-
ttl=units.parse_time_units(kwargs.get("cache_files_ttl", "120s"))
70+
maxsize=units.parse_number_units(kwargs.get("cache_size", "inf")),
71+
ttl=units.parse_time_units(kwargs.get("cache_ttl", "1h"))
7272
)
7373
self._config = {
7474
"endpoint_url": self.endpoint,

core/storage/webdav.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ def __init__(
3131
self.username = username
3232
self.password = password
3333
self._cache_redirects: utils.UnboundTTLCache[str, abc.ResponseFile] = utils.UnboundTTLCache(
34-
maxsize=int(units.parse_number_units(kwargs.get("cache_size", "10000"))),
35-
ttl=units.parse_time_units(kwargs.get("cache_ttl", "5m"))
34+
maxsize=units.parse_number_units(kwargs.get("cache_size", "inf")),
35+
ttl=units.parse_time_units(kwargs.get("cache_ttl", "1h"))
3636
)
3737
self._cache_files: utils.UnboundTTLCache[str, abc.FileInfo] = utils.UnboundTTLCache(
38-
maxsize=int(units.parse_number_units(kwargs.get("cache_size", "10000"))),
39-
ttl=units.parse_time_units(kwargs.get("cache_files_ttl", "120s"))
38+
maxsize=units.parse_number_units(kwargs.get("cache_size", "inf")),
39+
ttl=units.parse_time_units(kwargs.get("cache_ttl", "1h"))
4040
)
4141
self._mkdir_lock = utils.Lock()
4242

core/utils.py

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,6 @@ def get_monotic(self):
4747
def get_perf_counter(self):
4848
return time.perf_counter() - self._perf_counter / 1e9
4949

50-
class TLSHandshake:
51-
def __init__(
52-
self,
53-
version: int,
54-
sni: Optional[str]
55-
):
56-
self.version = version
57-
self.sni = sni
58-
59-
def __repr__(self) -> str:
60-
return f"TLSHandshake(version={self.version}, sni={self.sni})"
61-
6250
class AvroParser:
6351
def __init__(
6452
self,
@@ -248,37 +236,6 @@ def split_workload(
248236
):
249237
return [items[i::n] for i in range(n)]
250238

251-
def parse_tls_handshake(
252-
data: bytes
253-
):
254-
version = 0
255-
sni = None
256-
try:
257-
buf = io.BytesIO(data)
258-
type = buf.read(1)[0]
259-
if type != 22:
260-
raise
261-
version = int.from_bytes(buf.read(2))
262-
# skip 40 bytes
263-
buf.read(40)
264-
buf.read(buf.read(1)[0])
265-
buf.read(int.from_bytes(buf.read(2), 'big'))
266-
buf.read(buf.read(1)[0])
267-
extensions_length = int.from_bytes(buf.read(2), 'big')
268-
current_extension_cur = 0
269-
while current_extension_cur < extensions_length:
270-
extension_type = int.from_bytes(buf.read(2), 'big')
271-
extension_length = int.from_bytes(buf.read(2), 'big')
272-
extension_data = buf.read(extension_length)
273-
if extension_type == 0x00: # SNI
274-
sni = extension_data[5:].decode("utf-8")
275-
break
276-
current_extension_cur += extension_length + 4
277-
except:
278-
return None
279-
280-
return TLSHandshake(version, sni)
281-
282239def schedule_once(
283240
task_group: anyio.abc.TaskGroup,
284241
func: Callable[..., Awaitable],
@@ -290,23 +247,6 @@ async def _schedule_once():
290247
await func(*args, **kwargs)
291248
task_group.start_soon(_schedule_once)
292249

293-
async def _gather(
294-
coro: Coroutine,
295-
results: dict[Coroutine, Any]
296-
):
297-
results[coro] = await coro
298-
299-
async def gather(
300-
*coro: Coroutine
301-
):
302-
results: dict[Coroutine, Any] = {}
303-
async with anyio.create_task_group() as task_group:
304-
for c in coro:
305-
task_group.start_soon(_gather, c, results)
306-
res = []
307-
for c in coro:
308-
res.append(results[c])
309-
return res
310250

311251
def get_hash_obj(
312252
hash: str
@@ -451,7 +391,7 @@ def __init__(
451391
self.end = end
452392

453393
class UnboundTTLCache(cachetools.TTLCache[K, V]):
454-
def __init__(self, maxsize: Optional[int], ttl: float, timer=time.monotonic):
394+
def __init__(self, maxsize: Optional[float], ttl: float, timer=time.monotonic):
455395
cachetools.TTLCache.__init__(self, maxsize or math.inf, ttl, timer)
456396

457397
@property

locale/zh_cn.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"debug.web.forward.tls_port": "TLS 端口在 [${port}] 上开启",
55
"debug.web.uvicorn.port": "Web 服务器监听端口 [${port}]",
66
"error.cluster.enable": "节点 [${id}] 上线失败 [${err}]",
7-
"error.cluster.enable.timeout": "节点 [${id}] 注册超时",
7+
"traceback.cluster.enable.timeout": "节点 [${id}] 注册超时",
88
"error.cluster.exception": "节点 [${id}] 异常:[${msg}]",
99
"error.cluster.kicked": "节点 [${id}] 被主控踢出下线",
1010
"error.cluster.refresh_token": "刷新节点 [${id}] Token 失败",

0 commit comments

Comments
 (0)
0