Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,18 @@
],
"sqlState" : "42702"
},
"EXECUTOR_KUBERNETES_SERVICE_COOL_DOWN_PERIOD_INVALID" : {
"message" : [
"The executor Kubernetes service cool down period of <period> seconds configured via <key> must not be negative."
],
"sqlState" : "42000"
},
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT" : {
"message" : [
"Enabling the executor Kubernetes service requires <blockManagerPortConfigKey> to be set to a positive number, for instance <defaultShuffleServicePort>."
],
"sqlState" : "42000"
},
"EXEC_IMMEDIATE_DUPLICATE_ARGUMENT_ALIASES" : {
"message" : [
"The USING clause of this EXECUTE IMMEDIATE command contained multiple arguments with same alias (<aliases>), which is invalid; please update the command to specify unique aliases and then try it again."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,35 @@ private[spark] object Config extends Logging {
.toSequence
.createWithDefault(Nil)

val KUBERNETES_EXECUTOR_SERVICE_COOL_DOWN_PERIOD_KEY =
"spark.kubernetes.executor.service.coolDownPeriod"
val KUBERNETES_EXECUTOR_SERVICE_ENABLED =
ConfigBuilder("spark.kubernetes.executor.service.enabled")
.doc("If true, a Kubernetes service is created for the executor. " +
"An executor is usually connected to via the pod IP. Connecting to a decommissioned" +
"executor fails after a 'connection timeout', which is set via NETWORK_TIMEOUT and " +
"defaults to 2 minutes. Connecting to the executor via a Kubernetes service instantly " +
"fails with 'connection refused' error. " +
"For this to work, the executor kubernetes service outlives its executor pod by at least " +
KUBERNETES_EXECUTOR_SERVICE_COOL_DOWN_PERIOD_KEY + " seconds. " +
"This kubernetes service provides access to the executor's " +
"block manager, so BLOCK_MANAGER_PORT has to be given a value greater than zero.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)

val KUBERNETES_EXECUTOR_SERVICE_COOL_DOWN_PERIOD =
ConfigBuilder(KUBERNETES_EXECUTOR_SERVICE_COOL_DOWN_PERIOD_KEY)
.doc(s"The number of seconds the executor kubernetes service enabled via " +
KUBERNETES_EXECUTOR_SERVICE_ENABLED.key + " lives beyond the lifetime of the " +
"corresponding executor pod. The service has to live longer than the executor, " +
"because connecting to a non-existing kubernetes service fails after a 'connection " +
"timeout', which defeats its very purpose. " +
s"See ${KUBERNETES_EXECUTOR_SERVICE_ENABLED.key} for more information.")
.version("4.2.0")
.intConf
.createWithDefault(300)

val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL =
ConfigBuilder("spark.kubernetes.executor.decommissionLabel")
.doc("Label to apply to a pod which is being decommissioned." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ object Constants {
val SPARK_POD_DRIVER_ROLE = "driver"
val SPARK_POD_EXECUTOR_ROLE = "executor"
val SPARK_EXECUTOR_INACTIVE_LABEL = "spark-exec-inactive"
val SPARK_EXECUTOR_SERVICE_STATE_LABEL = "spark-exec-service-state"
val SPARK_EXECUTOR_SERVICE_ALIVE_STATE = "alive"
val SPARK_EXECUTOR_SERVICE_COOLDOWN_STATE = "cooldown"

// Credentials secrets
val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
Expand Down Expand Up @@ -109,6 +112,11 @@ object Constants {
val CONNECT_GRPC_BINDING_PORT = "spark.connect.grpc.binding.port"
val EXIT_EXCEPTION_ANNOTATION = "spark.exit-exception"
val POD_DELETION_COST = "controller.kubernetes.io/pod-deletion-cost"
val OWNER_REFERENCE_ANNOTATION = "spark.owner-reference"
val OWNER_REFERENCE_ANNOTATION_DRIVER_VALUE = "driver"
val OWNER_REFERENCE_ANNOTATION_EXECUTOR_VALUE = "executor"
val COOLDOWN_PERIOD_ANNOTATION = "spark.cooldown-period"
val COOLDOWN_DEADLINE_ANNOTATION = "spark.cooldown-deadline"

// Hadoop Configuration
val HADOOP_CONF_VOLUME = "hadoop-properties"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, ServiceBuilder}

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_SERVICE_COOL_DOWN_PERIOD
import org.apache.spark.deploy.k8s.Constants.{COOLDOWN_PERIOD_ANNOTATION, OWNER_REFERENCE_ANNOTATION, OWNER_REFERENCE_ANNOTATION_DRIVER_VALUE, OWNER_REFERENCE_ANNOTATION_EXECUTOR_VALUE, SPARK_APP_ID_LABEL, SPARK_EXECUTOR_ID_LABEL, SPARK_EXECUTOR_SERVICE_ALIVE_STATE, SPARK_EXECUTOR_SERVICE_STATE_LABEL}
import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, SHUFFLE_SERVICE_PORT}

class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
private val service_selector_labels = Set(SPARK_APP_ID_LABEL, SPARK_EXECUTOR_ID_LABEL)
private lazy val selector = conf.labels
.filter { case (key, _) => service_selector_labels.contains(key) }
private lazy val labels = selector ++
Map(SPARK_EXECUTOR_SERVICE_STATE_LABEL -> SPARK_EXECUTOR_SERVICE_ALIVE_STATE)

private lazy val sparkAppSelector = getLabel(SPARK_APP_ID_LABEL)
private lazy val sparkExecId = getLabel(SPARK_EXECUTOR_ID_LABEL)
// name length is 8 + 38 + 6 + 10 = 62
// which fits in KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH = 63
private lazy val serviceName = s"svc-$sparkAppSelector-exec-$sparkExecId"

// The service lives for this number of seconds
private val coolDownPeriod = conf.sparkConf.get(KUBERNETES_EXECUTOR_SERVICE_COOL_DOWN_PERIOD)
SparkException.require(coolDownPeriod >= 0,
"EXECUTOR_KUBERNETES_SERVICE_COOL_DOWN_PERIOD_INVALID",
Map(
"period" -> coolDownPeriod.toString,
"key" -> KUBERNETES_EXECUTOR_SERVICE_COOL_DOWN_PERIOD.key));

// The executor kubernetes services requires BLOCK_MANAGER_PORT to be set
private val blockManagerPortName = "spark-block-manager"
private val blockManagerPort = conf.sparkConf.get(BLOCK_MANAGER_PORT)
SparkException.require(blockManagerPort > 0,
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT",
Map(
"blockManagerPortConfigKey" -> BLOCK_MANAGER_PORT.key,
"defaultShuffleServicePort" -> SHUFFLE_SERVICE_PORT.defaultValue.get.toString));

private def getLabel(label: String): String = {
val value = conf.labels.get(label)
value.getOrElse(
throw new SparkException(s"This feature step requires label $label")
)
}

override def configurePod(pod: SparkPod): SparkPod = {
SparkPod(
new PodBuilder(pod.pod)
.editSpec()
// otherwise, executor pods get 8 environment variables for each other executor service
// with some thousands executor pods you would see ARG_MAX limit issues in endpoint.sh
.withEnableServiceLinks(false)
.endSpec()
.build(),
// tell the executor entry point its Kubernetes service name
new ContainerBuilder(pod.container)
.addNewEnv()
.withName("EXECUTOR_SERVICE_NAME")
.withValue(serviceName)
.endEnv()
.build())
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
val owner = if (coolDownPeriod > 0) {
OWNER_REFERENCE_ANNOTATION_DRIVER_VALUE
} else {
OWNER_REFERENCE_ANNOTATION_EXECUTOR_VALUE
}

val annotation = Map(
OWNER_REFERENCE_ANNOTATION -> owner,
COOLDOWN_PERIOD_ANNOTATION -> coolDownPeriod.toString
)
val service = new ServiceBuilder()
.withNewMetadata()
.withName(serviceName)
.withLabels(labels.asJava)
.withAnnotations(annotation.asJava)
.endMetadata()
.withNewSpec()
.withSelector(selector.asJava)
.addNewPort()
.withName(blockManagerPortName)
.withPort(blockManagerPort)
.withNewTargetPort(blockManagerPort)
.endPort()
.endSpec()
.build()

Seq(service)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
*/
package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.time.{Instant, ZoneOffset}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.Try
import scala.util.control.NonFatal

import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder}
import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod, PodBuilder, ServiceBuilder}
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
import io.fabric8.kubernetes.client.dsl.base.{PatchContext, PatchType}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
Expand Down Expand Up @@ -231,6 +233,74 @@ class ExecutorPodsAllocator(
if (snapshots.nonEmpty) {
val existingExecs = lastSnapshot.executorPods.keySet
_deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)

// schedule all services of not-alive executors that have a cooldown period for deletion
val aliveExecs = existingExecs ++ newlyCreatedExecutors.keySet.diff(k8sKnownExecIds.toSet)
Utils.tryLogNonFatalError {
val start = clock.getTimeMillis()
kubernetesClient
.services()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_EXECUTOR_SERVICE_STATE_LABEL, SPARK_EXECUTOR_SERVICE_ALIVE_STATE)
.withLabelNotIn(SPARK_EXECUTOR_ID_LABEL, aliveExecs.toSeq.sorted.map(_.toString): _*)
.resources().forEach { service =>
val svc = service.get()
val cooldownString =
svc.getMetadata.getAnnotations.get(COOLDOWN_PERIOD_ANNOTATION)
if (cooldownString != null && cooldownString.toIntOption.isDefined) {
val cooldown = cooldownString.toInt
val deadline =
Instant.ofEpochMilli(currentTime + cooldown * 1000).atZone(ZoneOffset.UTC)
logInfo(s"Executor got deleted, removal of " +
s"service ${svc.getMetadata.getName} scheduled in ${cooldown}s")
Utils.tryLogNonFatalError {
service.patch(
PatchContext.of(PatchType.STRATEGIC_MERGE),
new ServiceBuilder()
.withNewMetadata()
.addToLabels(
SPARK_EXECUTOR_SERVICE_STATE_LABEL,
SPARK_EXECUTOR_SERVICE_COOLDOWN_STATE
)
.addToAnnotations(COOLDOWN_DEADLINE_ANNOTATION, deadline.toString)
.endMetadata()
.build()
)
}
}
}
val end = clock.getTimeMillis()
logInfo(s"Processed all services with alive state label in ${end - start}ms")
}
}

// delete services that passed their cooldown deadline
Utils.tryLogNonFatalError {
val start = clock.getTimeMillis()
kubernetesClient
.services()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_EXECUTOR_SERVICE_STATE_LABEL, SPARK_EXECUTOR_SERVICE_COOLDOWN_STATE)
.resources().forEach { service =>
val svc = service.get
Option(svc.getMetadata.getAnnotations.get(COOLDOWN_DEADLINE_ANNOTATION))
.flatMap(s => Try(Instant.parse(s)).toOption)
.filter(_.toEpochMilli <= currentTime)
.foreach { deadline =>
logInfo(s"Service deadline $deadline has passed current time $currentTime, " +
s"deleting service ${svc.getMetadata.getName}")
try {
service.delete()
} catch {
case NonFatal(e) =>
logWarning(s"Failed to delete service $service", e)
}
}
}
val end = clock.getTimeMillis()
logInfo(s"Processed all services with cooldown state label in ${end - start}ms")
}

val notDeletedPods = lastSnapshot.executorPods
Expand Down Expand Up @@ -459,10 +529,23 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val refAnnotation = OWNER_REFERENCE_ANNOTATION
val driverValue = OWNER_REFERENCE_ANNOTATION_DRIVER_VALUE
val executorValue = OWNER_REFERENCE_ANNOTATION_EXECUTOR_VALUE
val getOwnerReference = (r: HasMetadata) =>
r.getMetadata.getAnnotations.getOrDefault(refAnnotation, executorValue)
val (driverResources, executorResources) =
resources
.filter(r => Set(driverValue, executorValue).contains(getOwnerReference(r)))
.partition(r => getOwnerReference(r) == driverValue)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
addOwnerReference(createdExecutorPod, executorResources)
if (driverResources.nonEmpty && driverPod.nonEmpty) {
addOwnerReference(driverPod.get, driverResources)
}
kubernetesClient.resourceList(resources: _*).forceConflicts().serverSideApply()
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
Expand All @@ -484,6 +567,7 @@ class ExecutorPodsAllocator(
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
kubernetesClient.resourceList(resources: _*).delete()
throw e
}
}
Expand Down Expand Up @@ -542,6 +626,16 @@ class ExecutorPodsAllocator(
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
}

// delete all services with cooldown periods
Utils.tryLogNonFatalError {
kubernetesClient
.services()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_EXECUTOR_SERVICE_STATE_LABEL)
.delete()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,20 @@ private[spark] class KubernetesExecutorBuilder {
}
}

val optionalFeatures = Seq(
Some(conf.get(Config.KUBERNETES_EXECUTOR_SERVICE_ENABLED))
.filter(enabled => enabled)
.map(_ => new ExecutorServiceFeatureStep(conf))
).flatten

val allFeatures = Seq(
new BasicExecutorFeatureStep(conf, secMgr, resourceProfile),
new ExecutorKubernetesCredentialsFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new HadoopConfExecutorFeatureStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
new LocalDirsFeatureStep(conf)) ++ optionalFeatures ++ userFeatures

val features = allFeatures.filterNot(f =>
conf.get(Config.KUBERNETES_EXECUTOR_POD_EXCLUDED_FEATURE_STEPS).contains(f.getClass.getName))
Expand Down
Loading