-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathexamplelib.py
More file actions
193 lines (157 loc) · 5.22 KB
/
examplelib.py
File metadata and controls
193 lines (157 loc) · 5.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import os
import time
import yaml
from urllib3.exceptions import ReadTimeoutError
from kubernetes import config, watch
import kubevirt
class ExampleLibException(Exception):
pass
class WaitForException(ExampleLibException):
pass
class WaitForFailureMatch(WaitForException):
def __init__(self, event):
self.event = event
def __str__(self):
return "Failure condition satisfied with item: %s" % self.event
class WaitForTimeout(WaitForException):
def __init__(self, source, timeout):
self.source = source
self.timeout = timeout
def __str__(self):
return "Waiting for events from %s reached timeout: %ss" % (
self.source, self.timeout
)
class Watch(object):
"""
Watch collection on changes.
"""
def __init__(self, event_source, *args, **kwargs):
"""
Create instance of Watch object
Args:
event_source (func): API call to get objects collection
args (tuple), kwargs(dict): Prameters for `event_source` function
"""
self._es = event_source
self._args = args
self._kw = kwargs
def _event_source(self, **kwargs):
kw = self._kw.copy()
kw.update(kwargs)
return self._es(*self._args, **kw)
def watch(self, request_timeout=None):
"""
Generator returning events from `event_source`
Args:
request_timeout (int): Timeout for request (seconds)
Returns:
Generator: Event objects
"""
kw = dict()
if request_timeout:
kw['_request_timeout'] = request_timeout
w = watch.Watch()
try:
for e in w.stream(self._event_source, **kw):
yield e
except GeneratorExit:
w.stop()
raise
def _wait_for_x(
self, timeout, filter_condition, success_condition,
failure_condition
):
step = 5 # 5s
endtime = timeout + time.time()
while timeout < endtime:
try:
for e in self.watch(request_timeout=step):
if not filter_condition(e):
continue
if success_condition(e):
return e['object']
if failure_condition(e):
raise WaitForFailureMatch(e)
except ReadTimeoutError:
continue
raise WaitForTimeout(self._es, self.timeout)
def wait_for_item(
self, name, timeout, success_condition,
failure_condition=lambda e: False
):
"""
Wait for change on specific object by name.
Args:
name (str): Name of object
timeout (int): Timeout for request (seconds)
success_condition (func): Predicate for success and exit loop,
taking event as argument
failure_condition (func): Predicate to fail loop,
taking event as argument
Returns:
Event: Event which matches `success_condition`
Raises:
WaitForTimeout: On request_timeout
WaitForFailureMatch: On `failure_condition`
"""
return self._wait_for_x(
timeout,
lambda e: get_name(e['object']) == name,
success_condition, failure_condition
)
def wait_for(
self, timeout, success_condition,
failure_condition=lambda e: False
):
"""
Wait for any change on given `event_source`.
Args:
timeout (int): Timeout for request (seconds)
success_condition (func): Predicate for success and exit loop,
taking event as argument
failure_condition (func): Predicate to fail loop,
taking event as argument
Returns:
Event: Event which matches `success_condition`
Raises:
WaitForTimeout: On request_timeout
WaitForFailureMatch: On `failure_condition`
"""
return self._wait_for_x(
timeout, lambda e: True, success_condition,
failure_condition
)
def get_client(kubeconfig=None):
"""
This function loads kubeconfig and return kubevirt.DefaultApi() object.
Args:
kubeconfig (str): Path to kubeconfig
Returns:
kubevirt.DefaultApi: Instance of KubeVirt client
"""
if kubeconfig is None:
kubeconfig = os.environ.get("KUBECONFIG")
if kubeconfig is None:
kubeconfig = os.path.expanduser("~/.kube/config")
cl = config.kube_config._get_kube_config_loader_for_yaml_file(kubeconfig)
cl.load_and_set(kubevirt.configuration)
return kubevirt.DefaultApi()
def read_yaml_file(path):
"""
Read and parse YAML from given file.
Args:
path (str): Path to YAML file
Returns:
dict: content of file
"""
path = os.path.join(os.path.dirname(__file__), path)
with open(path) as fh:
return yaml.load(fh)
def get_name(obj):
if isinstance(obj, dict):
return obj.get('metadata', dict()).get('name')
return obj.metadata.name
def get_status(obj):
if isinstance(obj, dict):
return obj.get('status', dict()).get('phase')
return obj.status.phase