3
3
4
4
from jsonpickle import encode
5
5
from jsonpickle .unpickler import Unpickler
6
- from azure .storage .blob import BlockBlobService , Blob , PublicAccess
6
+ from azure .core import MatchConditions
7
+ from azure .core .exceptions import (
8
+ HttpResponseError ,
9
+ ResourceExistsError ,
10
+ ResourceNotFoundError ,
11
+ )
12
+ from azure .storage .blob .aio import (
13
+ BlobServiceClient ,
14
+ BlobClient ,
15
+ StorageStreamDownloader ,
16
+ )
7
17
from botbuilder .core import Storage
8
18
9
- # TODO: sanitize_blob_name
10
-
11
19
12
20
class BlobStorageSettings :
21
+ """The class for Azure Blob configuration for the Azure Bot Framework.
22
+
23
+ :param container_name: Name of the Blob container.
24
+ :type container_name: str
25
+ :param account_name: Name of the Blob Storage account. Required if not using connection_string.
26
+ :type account_name: str
27
+ :param account_key: Key of the Blob Storage account. Required if not using connection_string.
28
+ :type account_key: str
29
+ :param connection_string: Connection string of the Blob Storage account.
30
+ Required if not using account_name and account_key.
31
+ :type connection_string: str
32
+ """
33
+
13
34
def __init__ (
14
35
self ,
15
36
container_name : str ,
@@ -23,56 +44,105 @@ def __init__(
23
44
self .connection_string = connection_string
24
45
25
46
47
+ # New Azure Blob SDK only allows connection strings, but our SDK allows key+name.
48
+ # This is here for backwards compatibility.
49
+ def convert_account_name_and_key_to_connection_string (settings : BlobStorageSettings ):
50
+ if not settings .account_name or not settings .account_key :
51
+ raise Exception (
52
+ "account_name and account_key are both required for BlobStorageSettings if not using a connections string."
53
+ )
54
+ return (
55
+ f"DefaultEndpointsProtocol=https;AccountName={ settings .account_name } ;"
56
+ f"AccountKey={ settings .account_key } ;EndpointSuffix=core.windows.net"
57
+ )
58
+
59
+
26
60
class BlobStorage (Storage ):
61
+ """An Azure Blob based storage provider for a bot.
62
+
63
+ This class uses a single Azure Storage Blob Container.
64
+ Each entity or StoreItem is serialized into a JSON string and stored in an individual text blob.
65
+ Each blob is named after the store item key, which is encoded so that it conforms a valid blob name.
66
+ If an entity is an StoreItem, the storage object will set the entity's e_tag
67
+ property value to the blob's e_tag upon read. Afterward, an match_condition with the ETag value
68
+ will be generated during Write. New entities start with a null e_tag.
69
+
70
+ :param settings: Settings used to instantiate the Blob service.
71
+ :type settings: :class:`botbuilder.azure.BlobStorageSettings`
72
+ """
73
+
27
74
def __init__ (self , settings : BlobStorageSettings ):
75
+ if not settings .container_name :
76
+ raise Exception ("Container name is required." )
77
+
28
78
if settings .connection_string :
29
- client = BlockBlobService (connection_string = settings .connection_string )
30
- elif settings .account_name and settings .account_key :
31
- client = BlockBlobService (
32
- account_name = settings .account_name , account_key = settings .account_key
79
+ blob_service_client = BlobServiceClient .from_connection_string (
80
+ settings .connection_string
33
81
)
34
82
else :
35
- raise Exception (
36
- "Connection string should be provided if there are no account name and key"
83
+ blob_service_client = BlobServiceClient . from_connection_string (
84
+ convert_account_name_and_key_to_connection_string ( settings )
37
85
)
38
86
39
- self .client = client
40
- self .settings = settings
87
+ self .__container_client = blob_service_client .get_container_client (
88
+ settings .container_name
89
+ )
90
+
91
+ self .__initialized = False
92
+
93
+ async def _initialize (self ):
94
+ if self .__initialized is False :
95
+ # This should only happen once - assuming this is a singleton.
96
+ # ContainerClient.exists() method is available in an unreleased version of the SDK. Until then, we use:
97
+ try :
98
+ await self .__container_client .create_container ()
99
+ except ResourceExistsError :
100
+ pass
101
+ self .__initialized = True
102
+ return self .__initialized
41
103
42
104
async def read (self , keys : List [str ]) -> Dict [str , object ]:
105
+ """Retrieve entities from the configured blob container.
106
+
107
+ :param keys: An array of entity keys.
108
+ :type keys: Dict[str, object]
109
+ :return dict:
110
+ """
43
111
if not keys :
44
112
raise Exception ("Keys are required when reading" )
45
113
46
- self .client .create_container (self .settings .container_name )
47
- self .client .set_container_acl (
48
- self .settings .container_name , public_access = PublicAccess .Container
49
- )
114
+ await self ._initialize ()
115
+
50
116
items = {}
51
117
52
118
for key in keys :
53
- if self .client .exists (
54
- container_name = self .settings .container_name , blob_name = key
55
- ):
56
- items [key ] = self ._blob_to_store_item (
57
- self .client .get_blob_to_text (
58
- container_name = self .settings .container_name , blob_name = key
59
- )
60
- )
119
+ blob_client = self .__container_client .get_blob_client (key )
120
+
121
+ try :
122
+ items [key ] = await self ._inner_read_blob (blob_client )
123
+ except HttpResponseError as err :
124
+ if err .status_code == 404 :
125
+ continue
61
126
62
127
return items
63
128
64
129
async def write (self , changes : Dict [str , object ]):
130
+ """Stores a new entity in the configured blob container.
131
+
132
+ :param changes: The changes to write to storage.
133
+ :type changes: Dict[str, object]
134
+ :return:
135
+ """
65
136
if changes is None :
66
137
raise Exception ("Changes are required when writing" )
67
138
if not changes :
68
139
return
69
140
70
- self .client .create_container (self .settings .container_name )
71
- self .client .set_container_acl (
72
- self .settings .container_name , public_access = PublicAccess .Container
73
- )
141
+ await self ._initialize ()
74
142
75
143
for (name , item ) in changes .items ():
144
+ blob_reference = self .__container_client .get_blob_client (name )
145
+
76
146
e_tag = None
77
147
if isinstance (item , dict ):
78
148
e_tag = item .get ("e_tag" , None )
@@ -81,39 +151,46 @@ async def write(self, changes: Dict[str, object]):
81
151
e_tag = None if e_tag == "*" else e_tag
82
152
if e_tag == "" :
83
153
raise Exception ("blob_storage.write(): etag missing" )
154
+
84
155
item_str = self ._store_item_to_str (item )
85
- try :
86
- self .client .create_blob_from_text (
87
- container_name = self .settings .container_name ,
88
- blob_name = name ,
89
- text = item_str ,
90
- if_match = e_tag ,
156
+
157
+ if e_tag :
158
+ await blob_reference .upload_blob (
159
+ item_str , match_condition = MatchConditions .IfNotModified , etag = e_tag
91
160
)
92
- except Exception as error :
93
- raise error
161
+ else :
162
+ await blob_reference . upload_blob ( item_str , overwrite = True )
94
163
95
164
async def delete (self , keys : List [str ]):
165
+ """Deletes entity blobs from the configured container.
166
+
167
+ :param keys: An array of entity keys.
168
+ :type keys: Dict[str, object]
169
+ """
96
170
if keys is None :
97
171
raise Exception ("BlobStorage.delete: keys parameter can't be null" )
98
172
99
- self .client .create_container (self .settings .container_name )
100
- self .client .set_container_acl (
101
- self .settings .container_name , public_access = PublicAccess .Container
102
- )
173
+ await self ._initialize ()
103
174
104
175
for key in keys :
105
- if self .client .exists (
106
- container_name = self .settings .container_name , blob_name = key
107
- ):
108
- self .client .delete_blob (
109
- container_name = self .settings .container_name , blob_name = key
110
- )
111
-
112
- def _blob_to_store_item (self , blob : Blob ) -> object :
113
- item = json .loads (blob .content )
114
- item ["e_tag" ] = blob .properties .etag
115
- result = Unpickler ().restore (item )
116
- return result
176
+ blob_client = self .__container_client .get_blob_client (key )
177
+ try :
178
+ await blob_client .delete_blob ()
179
+ # We can't delete what's already gone.
180
+ except ResourceNotFoundError :
181
+ pass
117
182
118
183
def _store_item_to_str (self , item : object ) -> str :
119
184
return encode (item )
185
+
186
+ async def _inner_read_blob (self , blob_client : BlobClient ):
187
+ blob = await blob_client .download_blob ()
188
+
189
+ return await self ._blob_to_store_item (blob )
190
+
191
+ @staticmethod
192
+ async def _blob_to_store_item (blob : StorageStreamDownloader ) -> object :
193
+ item = json .loads (await blob .content_as_text ())
194
+ item ["e_tag" ] = blob .properties .etag .replace ('"' , "" )
195
+ result = Unpickler ().restore (item )
196
+ return result
0 commit comments