Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1255,13 +1255,14 @@ void testJobDetailsContainsSlotSharingGroupId() throws Exception {
new JobDetailsInfo.JobVertexDetailsInfo(
new JobVertexID(),
slotSharingGroupId,
null,
"jobVertex1",
2,
1,
ExecutionState.RUNNING,
1,
2,
1,
1L,
2L,
1L,
Collections.singletonMap(ExecutionState.RUNNING, 0),
jobVertexMetrics));
final JobDetailsInfo jobDetailsInfo =
Expand Down
171 changes: 87 additions & 84 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -169,38 +169,6 @@
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
}
}, {
"url" : "/applications/:applicationid/jobmanager/config",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "applicationid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
},
"response" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
"properties" : {
"key" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
}
}, {
"url" : "/applications/:applicationid/exceptions",
"method" : "GET",
Expand Down Expand Up @@ -251,6 +219,38 @@
}
}
}
}, {
"url" : "/applications/:applicationid/jobmanager/config",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "applicationid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
},
"response" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
"properties" : {
"key" : {
"type" : "string"
},
"value" : {
"type" : "string"
}
}
}
}
}, {
"url" : "/cluster",
"method" : "DELETE",
Expand Down Expand Up @@ -1370,6 +1370,9 @@
"slotSharingGroupId" : {
"type" : "any"
},
"slotSharingGroupName" : {
"type" : "string"
},
"name" : {
"type" : "string"
},
Expand Down Expand Up @@ -2834,6 +2837,57 @@
}
}
}
}, {
"url" : "/jobs/:jobid/rescales/config",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "jobid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
"properties" : {
"rescaleHistoryMax" : {
"type" : "integer"
},
"schedulerExecutionMode" : {
"type" : "string",
"enum" : [ "REACTIVE" ]
},
"submissionResourceWaitTimeoutInMillis" : {
"type" : "integer"
},
"submissionResourceStabilizationTimeoutInMillis" : {
"type" : "integer"
},
"slotIdleTimeoutInMillis" : {
"type" : "integer"
},
"executingCooldownTimeoutInMillis" : {
"type" : "integer"
},
"executingResourceStabilizationTimeoutInMillis" : {
"type" : "integer"
},
"maximumDelayForTriggeringRescaleInMillis" : {
"type" : "integer"
},
"rescaleOnFailedCheckpointCount" : {
"type" : "integer"
}
}
}
}, {
"url" : "/jobs/:jobid/rescaling",
"method" : "PATCH",
Expand Down Expand Up @@ -4781,56 +4835,5 @@
}
}
}
}, {
"url" : "/jobs/:jobid/rescales/config",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "jobid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
"properties" : {
"executingCooldownTimeoutInMillis" : {
"type" : "integer"
},
"executingResourceStabilizationTimeoutInMillis" : {
"type" : "integer"
},
"maximumDelayForTriggeringRescaleInMillis" : {
"type" : "integer"
},
"rescaleHistoryMax" : {
"type" : "integer"
},
"rescaleOnFailedCheckpointCount" : {
"type" : "integer"
},
"schedulerExecutionMode" : {
"type" : "string",
"enum" : [ "REACTIVE" ]
},
"slotIdleTimeoutInMillis" : {
"type" : "integer"
},
"submissionResourceStabilizationTimeoutInMillis" : {
"type" : "integer"
},
"submissionResourceWaitTimeoutInMillis" : {
"type" : "integer"
}
}
}
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ <h4 class="content-wrap">
<xhtml:div class="detail">{{ operator }}</xhtml:div>
<xhtml:div class="detail description" *ngIf="!pending">{{ description }}</xhtml:div>
<xhtml:div class="node-label">Parallelism: {{ parallelism }}</xhtml:div>
<xhtml:div class="node-label" *ngIf="slotSharingGroupName && !pending">
Slot Sharing Group: {{ slotSharingGroupName }}
</xhtml:div>
<xhtml:div
class="node-label metric"
title="Maximum back pressured percentage across all subtasks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export class NodeComponent {
backPressuredPercentage: number | undefined = NaN;
busyPercentage: number | undefined = NaN;
dataSkewPercentage: number | undefined = NaN;
slotSharingGroupId: string | null | undefined;
slotSharingGroupName: string | null | undefined;
pending: boolean = true;
backgroundColor: string;
borderColor: string;
Expand Down Expand Up @@ -67,6 +69,8 @@ export class NodeComponent {
this.operatorStrategy = this.decodeHTML(value.operator_strategy);
this.parallelism = value.parallelism;
this.lowWatermark = value.lowWatermark;
this.slotSharingGroupId = value.detail?.slotSharingGroupId;
this.slotSharingGroupName = value.detail?.slotSharingGroupName;
if (value?.job_vertex_id) {
this.pending = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ export interface VerticesItem {
duration: number;
tasks: TasksStatus;
metrics: MetricsStatus;
slotSharingGroupId?: string | null;
slotSharingGroupName?: string | null;
}

export interface VerticesItemRange extends VerticesItem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,16 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
counts.getAccumulateIdleTime(),
counts.getAccumulateBusyTime());

// Get slot sharing group name, use slot sharing group ID as fallback if name is not set
String slotSharingGroupName = ejv.getSlotSharingGroup().getSlotSharingGroupName();
if (slotSharingGroupName == null || slotSharingGroupName.isEmpty()) {
slotSharingGroupName = ejv.getSlotSharingGroup().getSlotSharingGroupId().toString();
}

return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
ejv.getSlotSharingGroup().getSlotSharingGroupId(),
slotSharingGroupName,
ejv.getName(),
ejv.getMaxParallelism(),
ejv.getParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ public static final class JobVertexDetailsInfo {

public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID = "slotSharingGroupId";

public static final String FIELD_NAME_SLOT_SHARING_GROUP_NAME = "slotSharingGroupName";

public static final String FIELD_NAME_JOB_VERTEX_NAME = "name";

public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism";
Expand All @@ -390,6 +392,9 @@ public static final class JobVertexDetailsInfo {
@JsonSerialize(using = SlotSharingGroupIDSerializer.class)
private final SlotSharingGroupId slotSharingGroupId;

@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME)
private final String slotSharingGroupName;

@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME)
private final String name;

Expand Down Expand Up @@ -425,6 +430,7 @@ public JobVertexDetailsInfo(
@JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
SlotSharingGroupId slotSharingGroupId,
@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME) String slotSharingGroupName,
@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name,
@JsonProperty(FIELD_NAME_MAX_PARALLELISM) int maxParallelism,
@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
Expand All @@ -437,6 +443,7 @@ public JobVertexDetailsInfo(
@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) {
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
this.slotSharingGroupName = slotSharingGroupName;
this.name = Preconditions.checkNotNull(name);
this.maxParallelism = maxParallelism;
this.parallelism = parallelism;
Expand All @@ -458,6 +465,11 @@ public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
}

@JsonIgnore
public String getSlotSharingGroupName() {
return slotSharingGroupName;
}

@JsonIgnore
public String getName() {
return name;
Expand Down Expand Up @@ -519,6 +531,7 @@ public boolean equals(Object o) {
&& duration == that.duration
&& Objects.equals(jobVertexID, that.jobVertexID)
&& Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
&& Objects.equals(slotSharingGroupName, that.slotSharingGroupName)
&& Objects.equals(name, that.name)
&& executionState == that.executionState
&& Objects.equals(tasksPerState, that.tasksPerState)
Expand All @@ -530,6 +543,7 @@ public int hashCode() {
return Objects.hash(
jobVertexID,
slotSharingGroupId,
slotSharingGroupName,
name,
maxParallelism,
parallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random ra
return new JobDetailsInfo.JobVertexDetailsInfo(
new JobVertexID(),
new SlotSharingGroupId(),
"slotSharingGroup" + random.nextLong(),
"jobVertex" + random.nextLong(),
2 * parallelism,
parallelism,
Expand Down