8000 K8s Java client upgrade by utk-12 · Pull Request #3075 · azkaban/azkaban · GitHub
[go: up one dir, main page]

Skip to content

K8s Java client upgrade #3075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Using GenericKubernetesApi for service deletion and adding config val…
…ues for K8s watch timeout
  • Loading branch information
utk-12 committed Apr 21, 2022
commit e0b13c987a5b9fa387df530778ac282c078a3887
6 changes: 6 additions & 0 deletions az-core/src/main/java/azkaban/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ public static class ContainerizedDispatchManagerProperties {
public static final String KUBERNETES_WATCH_ENABLED = KUBERNETES_WATCH_PREFIX + "enabled";
public static final String KUBERNETES_WATCH_EVENT_CACHE_MAX_ENTRIES =
KUBERNETES_WATCH_PREFIX + "cache.max.entries";
public static final String KUBERNETES_WATCH_CALL_TIMEOUT = KUBERNETES_WATCH_PREFIX + "call"
+ ".timeout";
public static final String KUBERNETES_WATCH_CONNECT_TIMEOUT = KUBERNETES_WATCH_PREFIX +
"connect.timeout";
public static final String KUBERNETES_WATCH_READ_TIMEOUT = KUBERNETES_WATCH_PREFIX + "read"
+ ".timeout";

// Periodicity of lookup and cleanup of stale executions.
public static final String CONTAINERIZED_STALE_EXECUTION_CLEANUP_INTERVAL_MIN =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public ContainerCleanupManager(final Props azkProps, final ExecutorLoader execut
.put(Status.EXECUTION_STOPPED, new Pair<>(Duration.ofMinutes(maxExecStoppedValidity), UPDATE_TIME))
.put(Status.FAILED_FINISHING, new Pair<>(Duration.ofMinutes(runningFlowValidity), START_TIME))
.build();

}

public void cleanUpStaleFlows() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.kubernetes.client.custom.QuantityFormatException;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.ApiResponse;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container.ImagePullPolicyEnum;
import io.kubernetes.client.openapi.models.V1DeleteOptions;
Expand All @@ -63,9 +62,11 @@
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
import io.kubernetes.client.util.ClientBuilder;
import io.kubernetes.client.util.KubeConfig;
import io.kubernetes.client.util.Yaml;
import io.kubernetes.client.util.generic.GenericKubernetesApi;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -1232,20 +1233,23 @@ private void deletePod(final int executionId) throws ExecutorManagerException {
private void deleteService(final int executionId) throws ExecutorManagerException {
final String serviceName = getServiceName(executionId);
try {
final ApiResponse<V1Service> deleteResult = this.coreV1Api.deleteNamespacedServiceWithHttpInfo(
serviceName,
this.namespace,
null,
null,
null,
null,
null,
new V1DeleteOptions());
logger.info("ExecId: {}, Action: Service Deletion, Service Name: {}, code: {}, message: {}",
executionId,
serviceName,
deleteResult.getStatusCode(),
deleteResult.getData());
// Using GenericKubernetesApi due to a Known issue in K8s Java client and OpenAPIv2:
// See more here: https://github.com/kubernetes-client/java/wiki/6.-Known-Issues

GenericKubernetesApi<V1Service, V1ServiceList> serviceClient =
new GenericKubernetesApi<>(V1Service.class, V1ServiceList.class, "",
"v1", "services", this.client);
V1Service deletedService = serviceClient.delete(
this.namespace, serviceName).throwsApiException().getObject();
if (deletedService != null) {
logger.info(
"Received after-deletion status of the service deletion request for " +serviceName+
" will be deleting in background!");
}
else{
logger.info("ExecId: {}, Action: Service Deletion, Service Name: {}",
executionId, serviceName);
}
} catch (final ApiException e) {
logger.error("ExecId: {}, Unable to delete service in Kubernetes: {}", executionId,
e.getResponseBody());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import static java.util.Objects.requireNonNull;

import azkaban.Constants.ContainerizedDispatchManagerProperties;
import azkaban.executor.container.ContainerizedWatch;
import azkaban.utils.Props;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.openapi.ApiClient;
Expand Down Expand Up @@ -50,6 +52,12 @@ public class KubernetesWatch implements ContainerizedWatch {
private final CoreV1Api coreV1Api;
private final PodWatchParams podWatchParams;
private final Thread watchRunner;
private final int DEFAULT_KUBERNETES_WATCH_CALL_TIMEOUT = 86400;
private final int DEFAULT_KUBERNETES_WATCH_CONNECT_TIMEOUT = 30;
private final int DEFAULT_KUBERNETES_WATCH_READ_TIMEOUT = 86400;
private final int callTimeout;
private final int connectTimeout;
private final int readTimeout;
private Watch<V1Pod> podWatch;
private RawPodWatchEventListener podWatchEventListener;
private int podWatchInitCount = 0;
Expand All @@ -66,7 +74,7 @@ public class KubernetesWatch implements ContainerizedWatch {
* @param podWatchParams
*/
@Inject
public KubernetesWatch(ApiClient apiClient,
public KubernetesWatch(final Props azkProps, ApiClient apiClient,
RawPodWatchEventListener podWatchEventListener,
PodWatchParams podWatchParams) {
requireNonNull(apiClient);
Expand All @@ -77,13 +85,22 @@ public KubernetesWatch(ApiClient apiClient,
this.podWatchEventListener = podWatchEventListener;
this.podWatchParams = podWatchParams;
this.apiClient = apiClient;
// no timeout for request completion
this.callTimeout =
azkProps.getInt(ContainerizedDispatchManagerProperties.KUBERNETES_WATCH_CALL_TIMEOUT,
DEFAULT_KUBERNETES_WATCH_CALL_TIMEOUT);
this.connectTimeout =
azkProps.getInt(ContainerizedDispatchManagerProperties.KUBERNETES_WATCH_CONNECT_TIMEOUT,
DEFAULT_KUBERNETES_WATCH_CONNECT_TIMEOUT);
this.readTimeout =
azkProps.getInt(ContainerizedDispatchManagerProperties.KUBERNETES_WATCH_READ_TIMEOUT,
DEFAULT_KUBERNETES_WATCH_READ_TIMEOUT);
// Setting read and call timeouts. So pod watch will be reinitialized.
OkHttpClient httpClient =
this.apiClient.getHttpClient().newBuilder()
.protocols(Arrays.asList(Protocol.HTTP_2,Protocol.HTTP_1_1))
.connectTimeout(60, TimeUnit.SECONDS)
.callTimeout(1200, TimeUnit.SECONDS)
.readTimeout(600, TimeUnit.SECONDS)
.connectTimeout(this.connectTimeout, TimeUnit.SECONDS)
.callTimeout(this.callTimeout, TimeUnit.SECONDS)
.readTimeout(this.readTimeout, TimeUnit.SECONDS)
.build();
this.apiClient.setHttpClient(httpClient);
this.coreV1Api = new CoreV1Api(this.apiClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void setUp() throws Exception {
private KubernetesWatch kubernetesWatchWithMockListener() {
Props azkProps = localProperties();
ApiClient localApiClient = WatchUtils.createApiClient(azkProps);
return new KubernetesWatch(localApiClient, new AzPodStausExtractingListener(),
return new KubernetesWatch(azkProps, localApiClient, new AzPodStausExtractingListener(),
WatchUtils.createPodWatchParams(azkProps)
);
}
Expand Down Expand Up @@ -501,7 +501,7 @@ public PreInitializedWatch(ApiClient apiClient,
Watch<V1Pod> preInitPodWatch,
PodWatchParams podWatchParams,
int maxInitCount) {
super(apiClient, podWatchEventListener, podWatchParams);
super(new Props(), apiClient, podWatchEventListener, podWatchParams);
requireNonNull(preInitPodWatch, "pre init pod watch must not be null");
this.preInitPodWatch = preInitPodWatch;
this.maxInitCount = maxInitCount;
Expand Down
0