Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,21 @@ def create(
session.refresh(pipeline_run)
return PipelineRunResponse.from_db(pipeline_run)

def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse:
def get(
self,
session: orm.Session,
id: bts.IdType,
include_execution_stats: bool = False,
) -> PipelineRunResponse:
pipeline_run = session.get(bts.PipelineRun, id)
if not pipeline_run:
raise errors.ItemNotFoundError(f"Pipeline run {id} not found.")
return PipelineRunResponse.from_db(pipeline_run)
response = PipelineRunResponse.from_db(pipeline_run)
if include_execution_stats:
response = self._populate_execution_stats(
session=session, response=response
)
return response

def terminate(
self,
Expand Down Expand Up @@ -274,12 +284,22 @@ def _create_pipeline_run_response(
)
response.pipeline_name = pipeline_name
if include_execution_stats:
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=pipeline_run.root_execution_id,
response = self._populate_execution_stats(
session=session, response=response
)
response.execution_status_stats = stats
response.execution_summary = summary
return response

def _populate_execution_stats(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I prefer functions that return value, rather than methods that modify things in-palce.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuechao-qin Seems like this comment is not addressed

self,
session: orm.Session,
response: PipelineRunResponse,
) -> PipelineRunResponse:
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=response.root_execution_id,
)
response.execution_status_stats = stats
response.execution_summary = summary
return response

def _get_execution_stats_and_summary(
Expand Down
64 changes: 64 additions & 0 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1740,3 +1740,67 @@ def test_list_with_execution_stats(self) -> None:
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False


class TestPipelineRunServiceGet:
def test_get_not_found(self):
session_factory = _initialize_db_and_get_session_factory()
service = api_server_sql.PipelineRunsApiService_Sql()
with session_factory() as session:
with pytest.raises(errors.ItemNotFoundError):
service.get(session=session, id="nonexistent-id")

def test_get_returns_pipeline_run(self):
session_factory = _initialize_db_and_get_session_factory()
service = api_server_sql.PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session=session)
root_id = root.id
run = _create_pipeline_run(session=session, root_execution=root)
run_id = run.id
session.commit()

with session_factory() as session:
result = service.get(session=session, id=run_id)
assert result.id == run_id
assert result.root_execution_id == root_id
assert result.execution_status_stats is None
assert result.execution_summary is None

def test_get_with_execution_stats(self):
session_factory = _initialize_db_and_get_session_factory()
service = api_server_sql.PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session=session)
root_id = root.id
child1 = _create_execution_node(
session=session,
parent=root,
status=bts.ContainerExecutionStatus.SUCCEEDED,
)
child2 = _create_execution_node(
session=session,
parent=root,
status=bts.ContainerExecutionStatus.RUNNING,
)
_link_ancestor(session=session, execution_node=child1, ancestor_node=root)
_link_ancestor(session=session, execution_node=child2, ancestor_node=root)
run = _create_pipeline_run(session=session, root_execution=root)
run_id = run.id
session.commit()

with session_factory() as session:
result = service.get(
session=session, id=run_id, include_execution_stats=True
)
assert result.id == run_id
assert result.root_execution_id == root_id
stats = result.execution_status_stats
assert stats is not None
assert stats["SUCCEEDED"] == 1
assert stats["RUNNING"] == 1
summary = result.execution_summary
assert summary is not None
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False