-
-
Notifications
You must be signed in to change notification settings - Fork 243
/
momento_cachedb.py
108 lines (85 loc) · 3.1 KB
/
momento_cachedb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import json
import logging
import os
from datetime import timedelta
from typing import Any, Dict, List
from langroid.cachedb.base import CacheDBConfig
from langroid.exceptions import LangroidImportError
try:
import momento
from momento.responses import CacheGet
except ImportError:
raise LangroidImportError(package="momento", extra="momento")
from dotenv import load_dotenv
from langroid.cachedb.base import CacheDB
logger = logging.getLogger(__name__)
class MomentoCacheConfig(CacheDBConfig):
"""Configuration model for Momento Cache."""
ttl: int = 60 * 60 * 24 * 7 # 1 week
cachename: str = "langroid_momento_cache"
class MomentoCache(CacheDB):
"""Momento implementation of the CacheDB."""
def __init__(self, config: MomentoCacheConfig):
"""
Initialize a MomentoCache with the given config.
Args:
config (MomentoCacheConfig): The configuration to use.
"""
self.config = config
load_dotenv()
momento_token = os.getenv("MOMENTO_AUTH_TOKEN")
if momento_token is None:
raise ValueError("""MOMENTO_AUTH_TOKEN not set in .env file""")
else:
self.client = momento.CacheClient(
configuration=momento.Configurations.Laptop.v1(),
credential_provider=momento.CredentialProvider.from_environment_variable(
"MOMENTO_AUTH_TOKEN"
),
default_ttl=timedelta(seconds=self.config.ttl),
)
self.client.create_cache(self.config.cachename)
def clear(self) -> None:
"""Clear keys from current db."""
self.client.flush_cache(self.config.cachename)
def store(self, key: str, value: Any) -> None:
"""
Store a value associated with a key.
Args:
key (str): The key under which to store the value.
value (Any): The value to store.
"""
self.client.set(self.config.cachename, key, json.dumps(value))
def retrieve(self, key: str) -> Dict[str, Any] | str | None:
"""
Retrieve the value associated with a key.
Args:
key (str): The key to retrieve the value for.
Returns:
dict: The value associated with the key.
"""
value = self.client.get(self.config.cachename, key)
if isinstance(value, CacheGet.Hit):
return json.loads(value.value_string) # type: ignore
else:
return None
def delete_keys(self, keys: List[str]) -> None:
"""
Delete the keys from the cache.
Args:
keys (List[str]): The keys to delete.
"""
for key in keys:
self.client.delete(self.config.cachename, key)
def delete_keys_pattern(self, pattern: str) -> None:
"""
Delete the keys from the cache with the given pattern.
Args:
prefix (str): The pattern to match.
"""
raise NotImplementedError(
"""
MomentoCache does not support delete_keys_pattern.
Please use RedisCache instead.
"""
)