12
12
13
13
import base64
14
14
import collections .abc
15
+ import dataclasses
15
16
import json
16
17
import logging
17
18
import re
23
24
24
25
import asyncclick as click
25
26
27
+ from devtools .helpers .smartcamerarequests import SMARTCAMERA_REQUESTS
26
28
from devtools .helpers .smartrequests import SmartRequest , get_component_requests
27
29
from kasa import (
28
30
AuthenticationError ,
46
48
from kasa .smartprotocol import SmartProtocol , _ChildProtocolWrapper
47
49
48
50
Call = namedtuple ("Call" , "module method" )
49
- SmartCall = namedtuple ("SmartCall" , "module request should_succeed child_device_id" )
50
51
FixtureResult = namedtuple ("FixtureResult" , "filename, folder, data" )
51
52
52
53
SMART_FOLDER = "kasa/tests/fixtures/smart/"
54
+ SMARTCAMERA_FOLDER = "kasa/tests/fixtures/smartcamera/"
53
55
SMART_CHILD_FOLDER = "kasa/tests/fixtures/smart/child/"
54
56
IOT_FOLDER = "kasa/tests/fixtures/"
55
57
58
60
_LOGGER = logging .getLogger (__name__ )
59
61
60
62
63
+ @dataclasses .dataclass
64
+ class SmartCall :
65
+ """Class for smart and smartcamera calls."""
66
+
67
+ module : str
68
+ request : dict
69
+ should_succeed : bool
70
+ child_device_id : str
71
+ supports_multiple : bool = True
72
+
73
+
61
74
def scrub (res ):
62
75
"""Remove identifiers from the given dict."""
63
76
keys_to_scrub = [
@@ -136,7 +149,7 @@ def scrub(res):
136
149
v = base64 .b64encode (b"#MASKED_SSID#" ).decode ()
137
150
elif k in ["nickname" ]:
138
151
v = base64 .b64encode (b"#MASKED_NAME#" ).decode ()
139
- elif k in ["alias" , "device_alias" ]:
152
+ elif k in ["alias" , "device_alias" , "device_name" ]:
140
153
v = "#MASKED_NAME#"
141
154
elif isinstance (res [k ], int ):
142
155
v = 0
@@ -477,6 +490,44 @@ def format_exception(e):
477
490
return exception_str
478
491
479
492
493
+ async def _make_final_calls (
494
+ protocol : SmartProtocol ,
495
+ calls : list [SmartCall ],
496
+ name : str ,
497
+ batch_size : int ,
498
+ * ,
499
+ child_device_id : str ,
500
+ ) -> dict [str , dict ]:
501
+ """Call all successes again.
502
+
503
+ After trying each call individually make the calls again either as a
504
+ multiple request or as single requests for those that don't support
505
+ multiple queries.
506
+ """
507
+ multiple_requests = {
508
+ key : smartcall .request [key ]
509
+ for smartcall in calls
510
+ if smartcall .supports_multiple and (key := next (iter (smartcall .request )))
511
+ }
512
+ final = await _make_requests_or_exit (
513
+ protocol ,
514
+ multiple_requests ,
515
+ name + " - multiple" ,
516
+ batch_size ,
517
+ child_device_id = child_device_id ,
518
+ )
519
+ single_calls = [smartcall for smartcall in calls if not smartcall .supports_multiple ]
520
+ for smartcall in single_calls :
521
+ final [smartcall .module ] = await _make_requests_or_exit (
522
+ protocol ,
523
+ smartcall .request ,
524
+ f"{ name } + { smartcall .module } " ,
525
+ batch_size ,
526
+ child_device_id = child_device_id ,
527
+ )
528
+ return final
529
+
530
+
480
531
async def _make_requests_or_exit (
481
532
protocol : SmartProtocol ,
482
533
requests : dict ,
@@ -534,69 +585,28 @@ async def get_smart_camera_test_calls(protocol: SmartProtocol):
534
585
test_calls : list [SmartCall ] = []
535
586
successes : list [SmartCall ] = []
536
587
537
- requests = {
538
- "getAlertTypeList" : {"msg_alarm" : {"name" : "alert_type" }},
539
- "getNightVisionCapability" : {"image_capability" : {"name" : ["supplement_lamp" ]}},
540
- "getDeviceInfo" : {"device_info" : {"name" : ["basic_info" ]}},
541
- "getDetectionConfig" : {"motion_detection" : {"name" : ["motion_det" ]}},
542
- "getPersonDetectionConfig" : {"people_detection" : {"name" : ["detection" ]}},
543
- "getVehicleDetectionConfig" : {"vehicle_detection" : {"name" : ["detection" ]}},
544
- "getBCDConfig" : {"sound_detection" : {"name" : ["bcd" ]}},
545
- "getPetDetectionConfig" : {"pet_detection" : {"name" : ["detection" ]}},
546
- "getBarkDetectionConfig" : {"bark_detection" : {"name" : ["detection" ]}},
547
- "getMeowDetectionConfig" : {"meow_detection" : {"name" : ["detection" ]}},
548
- "getGlassDetectionConfig" : {"glass_detection" : {"name" : ["detection" ]}},
549
- "getTamperDetectionConfig" : {"tamper_detection" : {"name" : "tamper_det" }},
550
- "getLensMaskConfig" : {"lens_mask" : {"name" : ["lens_mask_info" ]}},
551
- "getLdc" : {"image" : {"name" : ["switch" , "common" ]}},
552
- "getLastAlarmInfo" : {"msg_alarm" : {"name" : ["chn1_msg_alarm_info" ]}},
553
- "getLedStatus" : {"led" : {"name" : ["config" ]}},
554
- "getTargetTrackConfig" : {"target_track" : {"name" : ["target_track_info" ]}},
555
- "getPresetConfig" : {"preset" : {"name" : ["preset" ]}},
556
- "getFirmwareUpdateStatus" : {"cloud_config" : {"name" : "upgrade_status" }},
557
- "getMediaEncrypt" : {"cet" : {"name" : ["media_encrypt" ]}},
558
- "getConnectionType" : {"network" : {"get_connection_type" : []}},
559
- "getAlarmConfig" : {"msg_alarm" : {}},
560
- "getAlarmPlan" : {"msg_alarm_plan" : {}},
561
- "getSirenTypeList" : {"siren" : {}},
562
- "getSirenConfig" : {"siren" : {}},
563
- "getAlertConfig" : {
564
- "msg_alarm" : {
565
- "name" : ["chn1_msg_alarm_info" , "capability" ],
566
- "table" : ["usr_def_audio" ],
567
- }
568
- },
569
- "getLightTypeList" : {"msg_alarm" : {}},
570
- "getSirenStatus" : {"siren" : {}},
571
- "getLightFrequencyInfo" : {"image" : {"name" : "common" }},
572
- "getLightFrequencyCapability" : {"image" : {"name" : "common" }},
573
- "getRotationStatus" : {"image" : {"name" : ["switch" ]}},
574
- "getNightVisionModeConfig" : {"image" : {"name" : "switch" }},
575
- "getWhitelampStatus" : {"image" : {"get_wtl_status" : ["null" ]}},
576
- "getWhitelampConfig" : {"image" : {"name" : "switch" }},
577
- "getMsgPushConfig" : {"msg_push" : {"name" : ["chn1_msg_push_info" ]}},
578
- "getSdCardStatus" : {"harddisk_manage" : {"table" : ["hd_info" ]}},
579
- "getCircularRecordingConfig" : {"harddisk_manage" : {"name" : "harddisk" }},
580
- "getRecordPlan" : {"record_plan" : {"name" : ["chn1_channel" ]}},
581
- "getAudioConfig" : {"audio_config" : {"name" : ["speaker" , "microphone" ]}},
582
- "getFirmwareAutoUpgradeConfig" : {"auto_upgrade" : {"name" : ["common" ]}},
583
- "getVideoQualities" : {"video" : {"name" : ["main" ]}},
584
- "getVideoCapability" : {"video_capability" : {"name" : "main" }},
585
- }
586
588
test_calls = []
587
- for method , params in requests .items ():
589
+ for request in SMARTCAMERA_REQUESTS :
590
+ method = next (iter (request ))
591
+ if method == "get" :
592
+ module = method + "_" + next (iter (request [method ]))
593
+ else :
594
+ module = method
588
595
test_calls .append (
589
596
SmartCall (
590
- module = method ,
591
- request = { method : params } ,
597
+ module = module ,
598
+ request = request ,
592
599
should_succeed = True ,
593
600
child_device_id = "" ,
601
+ supports_multiple = (method != "get" ),
594
602
)
595
603
)
596
604
597
605
# Now get the child device requests
606
+ child_request = {
607
+ "getChildDeviceList" : {"childControl" : {"start_index" : 0 }},
608
+ }
598
609
try :
599
- child_request = {"getChildDeviceList" : {"childControl" : {"start_index" : 0 }}}
600
610
child_response = await protocol .query (child_request )
601
611
except Exception :
602
612
_LOGGER .debug ("Device does not have any children." )
@@ -607,6 +617,7 @@ async def get_smart_camera_test_calls(protocol: SmartProtocol):
607
617
request = child_request ,
608
618
should_succeed = True ,
609
619
child_device_id = "" ,
620
+ supports_multiple = True ,
610
621
)
611
622
)
612
623
child_list = child_response ["getChildDeviceList" ]["child_device_list" ]
@@ -660,11 +671,14 @@ async def get_smart_camera_test_calls(protocol: SmartProtocol):
660
671
click .echo (f"Skipping { component_id } .." , nl = False )
661
672
click .echo (click .style ("UNSUPPORTED" , fg = "yellow" ))
662
673
else : # Not a smart protocol device so assume camera protocol
663
- for method , params in requests .items ():
674
+ for request in SMARTCAMERA_REQUESTS :
675
+ method = next (iter (request ))
676
+ if method == "get" :
677
+ method = method + "_" + next (iter (request [method ]))
664
678
test_calls .append (
665
679
SmartCall (
666
680
module = method ,
667
- request = { method : params } ,
681
+ request = request ,
668
682
should_succeed = True ,
669
683
child_device_id = child_id ,
670
684
)
@@ -804,7 +818,9 @@ async def get_smart_test_calls(protocol: SmartProtocol):
804
818
click .echo (click .style ("UNSUPPORTED" , fg = "yellow" ))
805
819
# Add the extra calls for each child
806
820
for extra_call in extra_test_calls :
807
- extra_child_call = extra_call ._replace (child_device_id = child_device_id )
821
+ extra_child_call = dataclasses .replace (
822
+ extra_call , child_device_id = child_device_id
823
+ )
808
824
test_calls .append (extra_child_call )
809
825
810
826
return test_calls , successes
@@ -879,35 +895,32 @@ async def get_smart_fixtures(
879
895
finally :
880
896
await protocol .close ()
881
897
882
- device_requests : dict [str , dict ] = {}
898
+ device_requests : dict [str , list [ SmartCall ] ] = {}
883
899
for success in successes :
884
- device_request = device_requests .setdefault (success .child_device_id , {} )
885
- device_request .update (success . request )
900
+ device_request = device_requests .setdefault (success .child_device_id , [] )
901
+ device_request .append (success )
886
902
887
903
scrubbed_device_ids = {
888
904
device_id : f"SCRUBBED_CHILD_DEVICE_ID_{ index } "
889
905
for index , device_id in enumerate (device_requests .keys ())
890
906
if device_id != ""
891
907
}
892
908
893
- final = await _make_requests_or_exit (
894
- protocol ,
895
- device_requests ["" ],
896
- "all successes at once" ,
897
- batch_size ,
898
- child_device_id = "" ,
909
+ final = await _make_final_calls (
910
+ protocol , device_requests ["" ], "All successes" , batch_size , child_device_id = ""
899
911
10000
)
900
912
fixture_results = []
901
913
for child_device_id , requests in device_requests .items ():
902
914
if child_device_id == "" :
903
915
continue
904
- response = await _make_requests_or_exit (
916
+ response = await _make_final_calls (
905
917
protocol ,
906
918
requests ,
907
- "all child successes at once " ,
919
+ "All child successes" ,
908
920
batch_size ,
909
921
child_device_id = child_device_id ,
910
922
)
923
+
911
924
scrubbed = scrubbed_device_ids [child_device_id ]
912
925
if "get_device_info" in response and "device_id" in response ["get_device_info" ]:
913
926
response ["get_device_info" ]["device_id" ] = scrubbed
@@ -963,23 +976,27 @@ async def get_smart_fixtures(
963
976
click .echo (click .style ("## device info file ##" , bold = True ))
964
977
965
978
if "get_device_info" in final :
979
+ # smart protocol
966
980
hw_version = final ["get_device_info" ]["hw_ver" ]
967
981
sw_version = final ["get_device_info" ]["fw_ver" ]
968
982
if discovery_info :
969
983
model = discovery_info ["device_model" ]
970
984
else :
971
985
model = final ["get_device_info" ]["model" ] + "(XX)"
972
986
sw_version = sw_version .split (" " , maxsplit = 1 )[0 ]
987
+ copy_folder = SMART_FOLDER
973
988
else :
989
+ # smart camera protocol
974
990
hw_version = final ["getDeviceInfo" ]["device_info" ]["basic_info" ]["hw_version" ]
975
991
sw_version = final ["getDeviceInfo" ]["device_info" ]["basic_info" ]["sw_version" ]
976
992
model = final ["getDeviceInfo" ]["device_info" ]["basic_info" ]["device_model" ]
977
993
region = final ["getDeviceInfo" ]["device_info" ]["basic_info" ]["region" ]
978
994
sw_version = sw_version .split (" " , maxsplit = 1 )[0 ]
979
995
model = f"{ model } ({ region } )"
996
+ copy_folder = SMARTCAMERA_FOLDER
980
997
981
998
save_filename = f"{ model } _{ hw_version } _{ sw_version } .json"
982
- copy_folder = SMART_FOLDER
999
+
983
1000
fixture_results .insert (
984
1001
0 , FixtureResult (filename = save_filename, folder = copy_folder , data = final )
985
1002
)
0 commit comments