Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,22 @@ def _create_pipeline_run_response(
)
response.pipeline_name = pipeline_name
if include_execution_stats:
execution_status_stats = self._calculate_execution_status_stats(
session=session, root_execution_id=pipeline_run.root_execution_id
response.execution_status_stats = self._get_execution_status_stats(
session=session,
root_execution_id=pipeline_run.root_execution_id,
)
response.execution_status_stats = {
status.value: count for status, count in execution_status_stats.items()
}
return response

def _get_execution_status_stats(
self,
session: orm.Session,
root_execution_id: bts.IdType,
) -> dict[str, int]:
stats = self._calculate_execution_status_stats(
session=session, root_execution_id=root_execution_id
)
return {status.value: count for status, count in stats.items()}

def _calculate_execution_status_stats(
self, session: orm.Session, root_execution_id: bts.IdType
) -> dict[bts.ContainerExecutionStatus, int]:
Expand Down
107 changes: 107 additions & 0 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import json
from collections.abc import Callable

import pytest
import sqlalchemy
Expand Down Expand Up @@ -1633,3 +1634,109 @@ def test_returns_none_on_malformed_dict(self):
task_spec_dict={"bad": "data"}
)
assert result is None


def _initialize_db_and_get_session_factory() -> Callable[[], orm.Session]:
db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://")
return lambda: orm.Session(bind=db_engine)


def _create_execution_node(
*,
session: orm.Session,
task_spec: dict | None = None,
status: bts.ContainerExecutionStatus | None = None,
parent: bts.ExecutionNode | None = None,
) -> bts.ExecutionNode:
"""Helper to create an ExecutionNode with optional status and parent."""
node = bts.ExecutionNode(task_spec=task_spec or {})
if parent is not None:
node.parent_execution = parent
if status is not None:
node.container_execution_status = status
session.add(node)
session.flush()
return node


def _link_ancestor(
*,
session: orm.Session,
execution_node: bts.ExecutionNode,
ancestor_node: bts.ExecutionNode,
) -> None:
"""Create an ExecutionToAncestorExecutionLink."""
link = bts.ExecutionToAncestorExecutionLink(
ancestor_execution=ancestor_node,
execution=execution_node,
)
session.add(link)
session.flush()


def _create_pipeline_run(
*,
session: orm.Session,
root_execution: bts.ExecutionNode,
) -> bts.PipelineRun:
"""Helper to create a PipelineRun linked to a root execution node."""
run = bts.PipelineRun(root_execution=root_execution)
session.add(run)
session.flush()
return run


class TestPipelineRunServiceList:
def test_list_empty(self) -> None:
session_factory = _initialize_db_and_get_session_factory()
service = api_server_sql.PipelineRunsApiService_Sql()
with session_factory() as session:
result = service.list(session=session)
assert result.pipeline_runs == []
assert result.next_page_token is None

def test_list_returns_pipeline_runs(self) -> None:
session_factory = _initialize_db_and_get_session_factory()
service = api_server_sql.PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session=session)
root_id = root.id
_create_pipeline_run(session=session, root_execution=root)
session.commit()

with session_factory() as session:
result = service.list(session=session)
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].root_execution_id == root_id
assert result.pipeline_runs[0].created_by is None
assert result.pipeline_runs[0].execution_status_stats is None

def test_list_with_execution_stats(self) -> None:
session_factory = _initialize_db_and_get_session_factory()
service = api_server_sql.PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session=session)
root_id = root.id
child1 = _create_execution_node(
session=session,
parent=root,
status=bts.ContainerExecutionStatus.SUCCEEDED,
)
child2 = _create_execution_node(
session=session,
parent=root,
status=bts.ContainerExecutionStatus.RUNNING,
)
_link_ancestor(session=session, execution_node=child1, ancestor_node=root)
_link_ancestor(session=session, execution_node=child2, ancestor_node=root)
_create_pipeline_run(session=session, root_execution=root)
session.commit()

with session_factory() as session:
result = service.list(session=session, include_execution_stats=True)
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].root_execution_id == root_id
stats = result.pipeline_runs[0].execution_status_stats
assert stats is not None
assert stats["SUCCEEDED"] == 1
assert stats["RUNNING"] == 1
Loading