@@ -34,6 +34,7 @@ def __init__(
3434 self ._secret = secret
3535 self ._token = None
3636 self ._task_group = None
37+ self ._display_name = None
3738
3839
3940 async def get_token (self ):
@@ -80,20 +81,20 @@ async def fetch_token(self):
8081 def setup (self , task_group : anyio .abc .TaskGroup ):
8182 self ._task_group = task_group
8283
83- # scheduleRefreshToken
84+ # scheduleRefreshToken
8485 def schedule_refresh_token (self , ttl : float ):
8586 if self ._task_group is None :
8687 raise RuntimeError ("Task group is not set" )
8788 self ._task_group .start_soon (self ._schedule_refresh_token , ttl )
8889
8990 async def _schedule_refresh_token (self , ttl : float ):
9091 next = max (ttl - 600 , ttl / 2 )
91- logger .tdebug ("cluster.refresh_token.schedule" , id = self ._id , next = next )
92+ logger .tdebug ("cluster.refresh_token.schedule" , id = self ._id , name = self . display_name , next = next )
9293 await anyio .sleep (ttl )
9394 try :
9495 await self .refresh_token ()
9596 except :
96- logger .ttraceback ("cluster.refresh_token" , id = self ._id )
97+ logger .ttraceback ("cluster.refresh_token" , id = self ._id , name = self . display_name )
9798
9899 async def refresh_token (self ):
99100 async with aiohttp .ClientSession (
@@ -119,6 +120,14 @@ async def get_socketio_token(self):
119120 "token" : await self .get_token (),
120121 }
121122
123+ @property
124+ def display_name (self ):
125+ return self ._display_name or self ._id
126+
127+ @display_name .setter
128+ def display_name (self , display_name : str ):
129+ self ._display_name = display_name
130+
122131class ClusterCounter :
123132 def __init__ (
124133 self ,
@@ -153,14 +162,23 @@ def __init__(
153162 self ._failed_keepalive = 0
154163 self ._retry_times = 0
155164 self ._task_group = None
165+ self ._display_name = None
156166 self ._manager = manager
157167
158168 @property
159169 def id (self ):
160170 return self ._token ._id
161171
172+ @property
173+ def display_name (self ):
174+ return self ._token .display_name or self ._token ._id
175+
176+ @display_name .setter
177+ def display_name (self , display_name : str ):
178+ self ._token .display_name = display_name
179+
162180 def __repr__ (self ) -> str :
163- return f'<Cluster { self .id } >'
181+ return f'<Cluster { self .display_name } >'
164182
165183 async def setup (
166184 self ,
@@ -171,33 +189,33 @@ async def setup(
171189
172190 @self .sio .on ("warden-error" ) # type: ignore
173191 async def _ (message : Any ):
174- logger .twarning ("cluster.warden" , id = self .id , msg = message )
192+ logger .twarning ("cluster.warden" , id = self .id , name = self . display_name , msg = message )
175193 await get_db ().insert_cluster_info (self .id , "server-push" , "warden-error" , message )
176194
177195 @self .sio .on ("exception" ) # type: ignore
178196 async def _ (message : Any ):
179- logger .terror ("cluster.exception" , id = self .id , msg = message )
197+ logger .terror ("cluster.exception" , id = self .id , name = self . display_name , msg = message )
180198 await get_db ().insert_cluster_info (self .id , "server-push" , "exception" , message )
181199
182200 @self .sio .on ("message" ) # type: ignore
183201 async def _ (message : Any ):
184- logger .tinfo ("cluster.message" , id = self .id , msg = message )
202+ logger .tinfo ("cluster.message" , id = self .id , name = self . display_name , msg = message )
185203 await get_db ().insert_cluster_info (self .id , "server-push" , "message" , message )
186204
187205 @self .sio .on ("connect" ) # type: ignore
188206 async def _ ():
189- logger .tinfo ("cluster.connected" , id = self .id )
207+ logger .tinfo ("cluster.connected" , id = self .id , name = self . display_name )
190208 if not self ._enabled :
191209 return
192210 self ._enabled = False
193- logger .tinfo ("cluster.reconnect" , id = self .id )
211+ logger .tinfo ("cluster.reconnect" , id = self .id , name = self . display_name )
194212 await get_db ().insert_cluster_info (self .id , "socketio" , "reconnect" )
195213 await self .enable ()
196214
197215
198216 @self .sio .on ("disconnect" ) # type: ignore
199217 async def _ ():
200- logger .tinfo ("cluster.disconnected" , id = self .id )
218+ logger .tinfo ("cluster.disconnected" , id = self .id , name = self . display_name )
201219
202220 task_group .start_soon (self .keepalive )
203221
@@ -228,7 +246,7 @@ async def keepalive(self):
228246 logger .terror ("cluster.kicked" , id = self .id )
229247 await self .disable ()
230248 else :
231- logger .twarning ("cluster.keepalive" , id = self .id , failed = self ._failed_keepalive )
249+ logger .twarning ("cluster.keepalive" , id = self .id , name = self . display_name , failed = self ._failed_keepalive )
232250 else :
233251 self ._failed_keepalive = 0
234252
@@ -239,17 +257,17 @@ async def keepalive(self):
239257 await get_db ().upsert_cluster_counter (self .id , current_counter .hits , current_counter .bytes )
240258
241259 resp_time = time .time () - datetime .datetime .fromisoformat (res .ack ).timestamp ()
242- logger .tsuccess ("cluster.keepalive" , id = self .id , hits = units .format_number (current_counter .hits ), bytes = units .format_number (current_counter .bytes ), delay = F"{ resp_time * 1000 :.4f} " )
260+ logger .tsuccess ("cluster.keepalive" , id = self .id , name = self . display_name , hits = units .format_number (current_counter .hits ), bytes = units .format_number (current_counter .bytes ), delay = F"{ resp_time * 1000 :.4f} " )
243261 except :
244262 logger .debug_traceback ()
245263 await anyio .sleep (60 )
246264
247265 async def request_cert (self ):
248- logger .tinfo ("cluster.request_cert" , id = self .id )
266+ logger .tinfo ("cluster.request_cert" , id = self .id , name = self . display_name )
249267 res = await self .emit ("request-cert" )
250268
251269 if res .err is not None :
252- logger .terror ("cluster.request_cert.error" , id = self .id , err = res .err )
270+ logger .terror ("cluster.request_cert.error" , id = self .id , name = self . display_name , err = res .err )
253271 return None
254272
255273 dir = Path (cfg .get ("cert.dir" ))
@@ -330,7 +348,7 @@ async def get_files(self) -> list[BMCLAPIFile]:
330348 reader .read_long () / 1000.0 ,
331349 ))
332350 self ._last_modified = max (results , key = lambda x : x .mtime ).mtime
333- logger .tdebug ("cluster.get_files" , id = self .id , count = len (results ), size = units .format_bytes (sum ([f .size for f in results ])), last_modified = units .format_datetime_from_timestamp (self ._last_modified ))
351+ logger .tdebug ("cluster.get_files" , id = self .id , name = self . display_name , count = len (results ), size = units .format_bytes (sum ([f .size for f in results ])), last_modified = units .format_datetime_from_timestamp (self ._last_modified ))
334352 return results
335353
336354 async def get_configuration (self ) -> OpenBMCLAPIConfiguration :
@@ -365,7 +383,7 @@ async def enable(self):
365383 logger .debug ("cluster want enable again" )
366384 return
367385 self ._want_enable = True
368- logger .tinfo ("cluster.want_enable" , id = self .id )
386+ logger .tinfo ("cluster.want_enable" , id = self .id , name = self . display_name )
369387 async with sem :
370388 try :
371389 res = await self .emit ("enable" , {
@@ -380,15 +398,15 @@ async def enable(self):
380398 }
381399 }, 300 )
382400 if res .err is not None :
383- logger .terror ("cluster.enable" , id = self .id , err = res .err )
401+ logger .terror ("cluster.enable" , id = self .id , name = self . display_name , err = res .err )
384402 return
385403
386404 self ._enabled = True
387405 self ._retry_times = 0
388- logger .tinfo ("cluster.enable" , id = self .id )
406+ logger .tinfo ("cluster.enable" , id = self .id , name = self . display_name )
389407 self ._keepalive_lock .release ()
390408 except TimeoutError :
391- logger .ttraceback ("cluster.enable.timeout" , id = self .id )
409+ logger .ttraceback ("cluster.enable.timeout" , id = self .id , name = self . display_name )
392410 except :
393411 logger .traceback ()
394412 finally :
@@ -406,7 +424,7 @@ async def disable(self):
406424 finally :
407425 self ._enabled = False
408426 self ._failed_keepalive = 0
409- logger .tinfo ("cluster.disable" , id = self .id )
427+ logger .tinfo ("cluster.disable" , id = self .id , name = self . display_name )
410428
411429 if self ._stop :
412430 return
@@ -418,7 +436,7 @@ def retry(self):
418436 return
419437 self ._retry_times += 1
420438 next = min (3600 , self ._retry_times * 300 )
421- logger .tinfo ("cluster.retry" , id = self .id , time = next )
439+ logger .tinfo ("cluster.retry" , id = self .id , time = next , name = self . display_name )
422440 utils .schedule_once (self ._task_group , self .enable , delay = next )
423441
424442 async def stop_serve (self ):
@@ -573,6 +591,7 @@ def __init__(
573591 self ._storages = StorageManager ()
574592 self ._task_group = None
575593 self ._certificate_type : Optional [CertificateType ] = None
594+ self ._cluster_name = False
576595
577596 def add_cluster (
578597 self ,
@@ -586,7 +605,7 @@ def get_cluster(
586605 id : str
587606 ) -> Cluster :
588607 return self ._clusters [id ]
589-
608+
590609 @property
591610 def storages (self ):
592611 return self ._storages
@@ -612,6 +631,8 @@ async def _(msg: Any):
612631 for cluster in self .clusters :
613632 await cluster .enable ()
614633
634+ await self .fetch_cluster_name ()
635+
615636 for cluster in self .clusters :
616637 await cluster .setup (task_group )
617638
@@ -656,10 +677,6 @@ async def sync(self):
656677
657678 utils .schedule_once (self ._task_group , self .sync , 600 )
658679
659- async def _sync (self ):
660- await anyio .sleep (600 )
661- await self .sync ()
662-
663680 async def serve (self ):
664681 async with anyio .create_task_group () as task_group :
665682 for cluster in self .clusters :
@@ -669,6 +686,29 @@ async def stop(self):
669686 for cluster in self .clusters :
670687 await cluster .stop_serve ()
671688
689+ async def fetch_cluster_name (self ):
690+ assert self ._task_group is not None
691+ async with aiohttp .ClientSession (
692+ cfg .bd_url
693+ ) as session :
694+ async with session .get (
695+ "/openbmclapi/metric/rank"
696+ ) as response :
697+ for resp_cluster in await response .json ():
698+ print (resp_cluster )
699+ id = resp_cluster ["_id" ]
700+ name = resp_cluster ["name" ]
701+ if id in self ._clusters :
702+ self ._clusters [id ].display_name = name
703+ if self ._cluster_name :
704+ return
705+ utils .schedule_once (self ._task_group , self ._fetch_cluster_name , 600 )
706+
707+
708+ async def _fetch_cluster_name (self ):
709+ self ._cluster_name = False
710+ await self .fetch_cluster_name ()
711+
672712 def hit (self , cluster_id : str , bytes : int ):
673713 self ._clusters [cluster_id ].counter .hits += 1
674714 self ._clusters [cluster_id ].counter .bytes += bytes
0 commit comments