Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions cloud_pipelines_backend/launchers/kubernetes_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
RESOURCES_SHARED_MEMORY_ANNOTATION_KEY = (
"cloud-pipelines.net/launchers/generic/resources.shared_memory"
)
RESOURCES_EPHEMERAL_STORAGE_ANNOTATION_KEY = (
"cloud-pipelines.net/launchers/generic/resources.ephemeral_storage"
)
RESOURCES_ACCELERATORS_ANNOTATION_KEY = (
"cloud-pipelines.net/launchers/generic/resources.accelerators"
)
Expand Down Expand Up @@ -360,15 +363,16 @@ def get_output_path(output_name: str) -> str:

annotations = annotations or {}

resources: k8s_client_lib.V1ResourceRequirements = (
main_container_spec.resources or k8s_client_lib.V1ResourceRequirements()
)
main_container_spec.resources = resources
resources.requests = resources.requests or {}
resources.limits = resources.limits or {}

cpu_resource_request = annotations.get(RESOURCES_CPU_ANNOTATION_KEY)
memory_resource_request = annotations.get(RESOURCES_MEMORY_ANNOTATION_KEY)
if cpu_resource_request or memory_resource_request:
resources: k8s_client_lib.V1ResourceRequirements = (
main_container_spec.resources or k8s_client_lib.V1ResourceRequirements()
)
main_container_spec.resources = resources
resources.requests = resources.requests or {}
resources.limits = resources.limits or {}
if cpu_resource_request:
resources.requests["cpu"] = cpu_resource_request
if memory_resource_request:
Expand All @@ -393,6 +397,24 @@ def get_output_path(output_name: str) -> str:
volume_map[volume.name] = volume
volume_mounts.append(volume_mount)

ephemeral_storage_size = annotations.get(
RESOURCES_EPHEMERAL_STORAGE_ANNOTATION_KEY
)
if ephemeral_storage_size:
ephemeral_volume_name = "ephemeral-storage"
volume = k8s_client_lib.V1Volume(
name=ephemeral_volume_name,
empty_dir=k8s_client_lib.V1EmptyDirVolumeSource(),
)
volume_mount = k8s_client_lib.V1VolumeMount(
name=ephemeral_volume_name,
mount_path="/tmp",
)
volume_map[volume.name] = volume
volume_mounts.append(volume_mount)
# Specifying the ephemeral storage resource request
resources.requests["ephemeral-storage"] = ephemeral_storage_size

# Optionally adding the IPC_LOCK capability that may be needed for InfiniBand.
# ContainerSpec.securityContext.capabilities.add=["IPC_LOCK"]
enable_capability_ipc_lock: str = annotations.get(
Expand Down
Loading