diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bd3109..eb5433f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.43.0] - 2026-04-22 +### Changed +- Standardised case management return types to return `dict` instead of typed objects, consistent with the rest of the SDK + - `get_case()` now returns `dict[str, Any]` instead of `Case` + - `get_cases()` (batch) now returns `dict[str, Any]` instead of `CaseList` + - `patch_case()` now returns `dict[str, Any]` instead of `Case` + +### Removed +- `Case`, `CaseList`, and `SoarPlatformInfo` model classes. + ## [0.42.0] - 2026-04-15 ### Added - `fetch_parser_candidates()` method to retrieve parser candidates for a given log type diff --git a/README.md b/README.md index caeee03..52d8183 100644 --- a/README.md +++ b/README.md @@ -1376,19 +1376,19 @@ case_ids = {alert.get('caseName') for alert in alert_list if alert.get('caseName # Get case details using the batch API if case_ids: - cases = chronicle.get_cases(list(case_ids)) + result = chronicle.get_cases(list(case_ids)) # Process cases - for case in cases.cases: - print(f"Case: {case.display_name}") - print(f"Priority: {case.priority}") - print(f"Status: {case.status}") - print(f"Stage: {case.stage}") + for case in result.get("cases", []): + print(f"Case: {case['displayName']}") + print(f"Priority: {case['priority']}") + print(f"Status: {case['status']}") + print(f"Stage: {case['stage']}") # Access SOAR platform information if available - if case.soar_platform_info: - print(f"SOAR Case ID: {case.soar_platform_info.case_id}") - print(f"SOAR Platform: {case.soar_platform_info.platform_type}") + if case.get("soarPlatformInfo"): + print(f"SOAR Case ID: {case['soarPlatformInfo']['caseId']}") + print(f"SOAR Platform: {case['soarPlatformInfo']['responsePlatformType']}") ``` The alerts response includes: @@ -1404,24 +1404,6 @@ You can filter alerts using the snapshot query parameter with fields like: - `feedback_summary.priority` - `feedback_summary.status` -### Case Management Helpers - -The `CaseList` class provides helper methods for working with cases: - -```python -# Get details for specific cases (uses the batch API) -cases = chronicle.get_cases(["case-id-1", "case-id-2"]) - -# Filter cases by priority -high_priority = cases.filter_by_priority("PRIORITY_HIGH") - -# Filter cases by status -open_cases = cases.filter_by_status("STATUS_OPEN") - -# Look up a specific case -case = cases.get_case("case-id-1") -``` - > **Note**: The case management API uses the `legacy:legacyBatchGetCases` endpoint to retrieve multiple cases in a single request. You can retrieve up to 1000 cases in a single batch. ### Case Management @@ -1459,10 +1441,10 @@ Retrieve detailed information about a specific case: ```python # Get case by ID case = chronicle.get_case("12345") -print(f"Case: {case.display_name}") -print(f"Priority: {case.priority}") -print(f"Status: {case.status}") -print(f"Stage: {case.stage}") +print(f"Case: {case['displayName']}") +print(f"Priority: {case['priority']}") +print(f"Status: {case['status']}") +print(f"Stage: {case['stage']}") # Get case with expanded fields case_expanded = chronicle.get_case("12345", expand="tags,products") diff --git a/pyproject.toml b/pyproject.toml index f83e410..b2a02e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.42.0" +version = "0.43.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index ebf9121..610273e 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -114,8 +114,6 @@ AdvancedConfig, AlertCount, AlertState, - Case, - CaseList, DailyScheduleDetails, DataExport, DataExportStage, @@ -141,7 +139,6 @@ PrevalenceData, PythonVersion, ScheduleType, - SoarPlatformInfo, TargetMode, TileType, TimeInterval, @@ -364,8 +361,6 @@ "AdvancedConfig", "AlertCount", "AlertState", - "Case", - "CaseList", "DailyScheduleDetails", "Date", "DayOfWeek", @@ -379,7 +374,6 @@ "OneTimeScheduleDetails", "PrevalenceData", "ScheduleType", - "SoarPlatformInfo", "TimeInterval", "Timeline", "TimelineBucket", diff --git a/src/secops/chronicle/case.py b/src/secops/chronicle/case.py index a6e84df..4942161 100644 --- a/src/secops/chronicle/case.py +++ b/src/secops/chronicle/case.py @@ -19,9 +19,7 @@ from secops.chronicle.models import ( APIVersion, - Case, CaseCloseReason, - CaseList, CasePriority, ) from secops.chronicle.utils.format_utils import ( @@ -95,7 +93,7 @@ def get_cases( ) -def get_cases_from_list(client, case_ids: list[str]) -> CaseList: +def get_cases_from_list(client, case_ids: list[str]) -> dict[str, Any]: """Get cases from Chronicle. Args: @@ -103,7 +101,7 @@ def get_cases_from_list(client, case_ids: list[str]) -> CaseList: case_ids: List of case IDs to retrieve Returns: - CaseList object with case details + Dictionary containing cases data Raises: APIError: If the API request fails @@ -112,7 +110,7 @@ def get_cases_from_list(client, case_ids: list[str]) -> CaseList: if len(case_ids) > 1000: raise ValueError("Maximum of 1000 cases can be retrieved in a batch") - data = chronicle_request( + return chronicle_request( client, method="GET", endpoint_path="legacy:legacyBatchGetCases", @@ -121,13 +119,6 @@ def get_cases_from_list(client, case_ids: list[str]) -> CaseList: error_message="Failed to get cases", ) - cases = [] - if "cases" in data: - for case_data in data["cases"]: - cases.append(Case.from_dict(case_data)) - - return CaseList(cases) - def execute_bulk_add_tag( client, case_ids: list[int], tags: list[str] @@ -345,7 +336,9 @@ def execute_bulk_reopen( ) -def get_case(client, case_name: str, expand: str | None = None) -> Case: +def get_case( + client, case_name: str, expand: str | None = None +) -> dict[str, Any]: """Get a single case details. Args: @@ -357,7 +350,7 @@ def get_case(client, case_name: str, expand: str | None = None) -> Case: expand: Optional expand field for getting related resources Returns: - Case object with case details + Dictionary containing case details Raises: APIError: If the API request fails @@ -370,7 +363,7 @@ def get_case(client, case_name: str, expand: str | None = None) -> Case: } ) - data = chronicle_request( + return chronicle_request( client, method="GET", endpoint_path=f"cases/{endpoint_path}", @@ -379,8 +372,6 @@ def get_case(client, case_name: str, expand: str | None = None) -> Case: error_message="Failed to get case", ) - return Case.from_dict(data) - def list_cases( client, @@ -478,7 +469,7 @@ def patch_case( case_name: str, case_data: dict[str, Any], update_mask: str | None = None, -) -> Case: +) -> dict[str, Any]: """Update a case using partial update (PATCH). Args: @@ -491,7 +482,7 @@ def patch_case( update_mask: Optional comma-separated list of fields to update Returns: - Updated Case object + Dictionary containing the updated case Raises: APIError: If the API request fails @@ -519,7 +510,7 @@ def patch_case( } ) - data = chronicle_request( + return chronicle_request( client, method="PATCH", endpoint_path=f"cases/{endpoint_path}", @@ -528,5 +519,3 @@ def patch_case( params=params or None, error_message="Failed to patch case", ) - - return Case.from_dict(data) diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 99981d3..45f704c 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -176,7 +176,6 @@ APIVersion, AlertState, CaseCloseReason, - CaseList, CasePriority, DashboardChart, DashboardQuery, @@ -1142,7 +1141,7 @@ def list_iocs( prioritized_only, ) - def get_cases(self, case_ids: list[str]) -> CaseList: + def get_cases(self, case_ids: list[str]) -> dict[str, Any]: """Get case information for the specified case IDs. Uses the legacy:legacyBatchGetCases endpoint to retrieve multiple cases @@ -1152,7 +1151,7 @@ def get_cases(self, case_ids: list[str]) -> CaseList: case_ids: List of case IDs to retrieve (maximum 1000) Returns: - A CaseList object containing the requested cases + Dictionary containing cases data Raises: APIError: If the API request fails @@ -1160,7 +1159,9 @@ def get_cases(self, case_ids: list[str]) -> CaseList: """ return get_cases_from_list(self, case_ids) - def get_case(self, case_name: str, expand: str | None = None) -> "Case": + def get_case( + self, case_name: str, expand: str | None = None + ) -> dict[str, Any]: """Get a single case details. Args: @@ -1168,7 +1169,7 @@ def get_case(self, case_name: str, expand: str | None = None) -> "Case": expand: Optional expand field for getting related resources Returns: - Case object with case details + Dictionary containing case details Raises: APIError: If the API request fails @@ -1224,7 +1225,7 @@ def patch_case( case_name: str, case_data: dict[str, Any], update_mask: str | None = None, - ) -> "Case": + ) -> dict[str, Any]: """Update a case using partial update (PATCH). Args: @@ -1233,7 +1234,7 @@ def patch_case( update_mask: Optional comma-separated list of fields to update Returns: - Updated Case object + Dictionary containing the updated case Raises: APIError: If the API request fails diff --git a/src/secops/chronicle/models.py b/src/secops/chronicle/models.py index 5f598f3..fcc7b65 100644 --- a/src/secops/chronicle/models.py +++ b/src/secops/chronicle/models.py @@ -899,93 +899,6 @@ class DataExport: export_all_logs: bool = False -class SoarPlatformInfo: - """SOAR platform information for a case.""" - - def __init__(self, case_id: str, platform_type: str): - self.case_id = case_id - self.platform_type = platform_type - - @classmethod - def from_dict(cls, data: dict) -> "SoarPlatformInfo": - """Create from API response dict.""" - return cls( - case_id=data.get("caseId"), - platform_type=data.get("responsePlatformType"), - ) - - -class Case: - """Represents a Chronicle case.""" - - def __init__( - self, - id: str, # pylint: disable=redefined-builtin - display_name: str, - stage: str, - priority: str, - status: str, - soar_platform_info: SoarPlatformInfo | None = None, - alert_ids: list[str] | None = None, - ): - self.id = id - self.display_name = display_name - self.stage = stage - self.priority = priority - self.status = status - self.soar_platform_info = soar_platform_info - self.alert_ids = alert_ids or [] - - @classmethod - def from_dict(cls, data: dict) -> "Case": - """Create from API response dict.""" - return cls( - id=data.get("id"), - display_name=data.get("displayName"), - stage=data.get("stage"), - priority=data.get("priority"), - status=data.get("status"), - soar_platform_info=( - SoarPlatformInfo.from_dict(data["soarPlatformInfo"]) - if data.get("soarPlatformInfo") - else None - ), - alert_ids=data.get("alertIds", []), - ) - - -class CaseList: - """Collection of Chronicle cases with helper methods.""" - - def __init__(self, cases: list[Case]): - self.cases = cases - self._case_map = {case.id: case for case in cases} - - def get_case(self, case_id: str) -> Case | None: - """Get a case by ID.""" - return self._case_map.get(case_id) - - def filter_by_priority(self, priority: str) -> list[Case]: - """Get cases with specified priority.""" - return [case for case in self.cases if case.priority == priority] - - def filter_by_status(self, status: str) -> list[Case]: - """Get cases with specified status.""" - return [case for case in self.cases if case.status == status] - - def filter_by_stage(self, stage: str) -> list[Case]: - """Get cases with specified stage.""" - return [case for case in self.cases if case.stage == stage] - - @classmethod - def from_dict(cls, data: dict) -> "CaseList": - """Create from API response dict.""" - cases = [ - Case.from_dict(case_data) for case_data in data.get("cases", []) - ] - return cls(cases) - - # Dashboard Models diff --git a/src/secops/cli/commands/case.py b/src/secops/cli/commands/case.py index c3d31b6..14dfbe4 100644 --- a/src/secops/cli/commands/case.py +++ b/src/secops/cli/commands/case.py @@ -267,32 +267,7 @@ def handle_case_get_batch_command(args, chronicle): try: case_ids = [case_id.strip() for case_id in args.ids.split(",")] result = chronicle.get_cases(case_ids) - - # Convert CaseList to dictionary for output - cases_dict = { - "cases": [ - { - "id": case.id, - "display_name": case.display_name, - "stage": case.stage, - "priority": case.priority, - "status": case.status, - "soar_platform_info": ( - { - "case_id": case.soar_platform_info.case_id, - "platform_type": ( - case.soar_platform_info.platform_type - ), - } - if case.soar_platform_info - else None - ), - "alert_ids": case.alert_ids, - } - for case in result.cases - ] - } - output_formatter(cases_dict, args.output) + output_formatter(result, args.output) except Exception as e: # pylint: disable=broad-exception-caught print(f"Error: {e}", file=sys.stderr) sys.exit(1) @@ -301,22 +276,8 @@ def handle_case_get_batch_command(args, chronicle): def handle_case_get_command(args, chronicle): """Handle case get command.""" try: - case = chronicle.get_case(args.id, expand=args.expand) - case_dict = { - "id": case.id, - "display_name": case.display_name, - "stage": case.stage, - "priority": case.priority, - "status": case.status, - } - if case.soar_platform_info: - case_dict["soar_platform_info"] = { - "case_id": case.soar_platform_info.case_id, - "platform_type": case.soar_platform_info.platform_type, - } - if case.alert_ids: - case_dict["alert_ids"] = case.alert_ids - output_formatter(case_dict, args.output) + result = chronicle.get_case(args.id, expand=args.expand) + output_formatter(result, args.output) except APIError as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) @@ -344,17 +305,10 @@ def handle_case_patch_command(args, chronicle): """Handle case patch command.""" try: case_data = json.loads(args.data) - case = chronicle.patch_case( + result = chronicle.patch_case( args.id, case_data, update_mask=args.update_mask ) - case_dict = { - "id": case.id, - "display_name": case.display_name, - "stage": case.stage, - "priority": case.priority, - "status": case.status, - } - output_formatter(case_dict, args.output) + output_formatter(result, args.output) except json.JSONDecodeError as e: print(f"Error: Invalid JSON data - {e}", file=sys.stderr) sys.exit(1) diff --git a/tests/chronicle/test_case.py b/tests/chronicle/test_case.py index 83a0c3c..01235bf 100644 --- a/tests/chronicle/test_case.py +++ b/tests/chronicle/test_case.py @@ -31,7 +31,6 @@ patch_case, ) from secops.chronicle import case as case_module -from secops.chronicle.models import Case from secops.exceptions import APIError @@ -427,10 +426,10 @@ def test_get_case_with_id(chronicle_client, mock_case_data): assert call_args[1]["endpoint_path"] == "cases/12345" assert not call_args[1]["params"] - assert isinstance(result, Case) - assert result.id == "12345" - assert result.display_name == "Test Case" - assert result.priority == "PRIORITY_HIGH" + assert isinstance(result, dict) + assert result["id"] == "12345" + assert result["displayName"] == "Test Case" + assert result["priority"] == "PRIORITY_HIGH" def test_get_case_with_full_name(chronicle_client, mock_case_data): @@ -449,8 +448,8 @@ def test_get_case_with_full_name(chronicle_client, mock_case_data): call_args = mock_request.call_args assert call_args[1]["endpoint_path"] == "cases/12345" - assert isinstance(result, Case) - assert result.id == "12345" + assert isinstance(result, dict) + assert result["id"] == "12345" def test_get_case_with_expand(chronicle_client, mock_case_data): @@ -462,7 +461,7 @@ def test_get_case_with_expand(chronicle_client, mock_case_data): call_args = mock_request.call_args assert call_args[1]["params"] == {"expand": "tags,products"} - assert isinstance(result, Case) + assert isinstance(result, dict) def test_get_case_api_error(chronicle_client): @@ -724,8 +723,8 @@ def test_patch_case_with_id(chronicle_client, mock_case_data): assert str(call_args[1]["json"]["priority"]) == "PRIORITY_CRITICAL" assert call_args[1]["params"] == {"updateMask": "priority"} - assert isinstance(result, Case) - assert result.priority == "PRIORITY_CRITICAL" + assert isinstance(result, dict) + assert result["priority"] == "PRIORITY_CRITICAL" def test_patch_case_with_full_name(chronicle_client, mock_case_data): @@ -743,7 +742,7 @@ def test_patch_case_with_full_name(chronicle_client, mock_case_data): mock_request.assert_called_once() call_args = mock_request.call_args assert call_args[1]["endpoint_path"] == "cases/12345" - assert isinstance(result, Case) + assert isinstance(result, dict) def test_patch_case_without_update_mask(chronicle_client, mock_case_data): @@ -757,7 +756,7 @@ def test_patch_case_without_update_mask(chronicle_client, mock_case_data): call_args = mock_request.call_args assert call_args[1]["params"] is None - assert isinstance(result, Case) + assert isinstance(result, dict) def test_patch_case_multiple_fields(chronicle_client, mock_case_data): @@ -784,8 +783,8 @@ def test_patch_case_multiple_fields(chronicle_client, mock_case_data): call_args = mock_request.call_args assert str(call_args[1]["json"]["priority"]) == "PRIORITY_LOW" assert call_args[1]["json"]["stage"] == "Closed" - assert result.priority == "PRIORITY_LOW" - assert result.stage == "Closed" + assert result["priority"] == "PRIORITY_LOW" + assert result["stage"] == "Closed" def test_patch_case_api_error(chronicle_client): diff --git a/tests/chronicle/test_case_integration.py b/tests/chronicle/test_case_integration.py index ee081fa..1f6f2de 100644 --- a/tests/chronicle/test_case_integration.py +++ b/tests/chronicle/test_case_integration.py @@ -30,7 +30,7 @@ def test_list_and_get_cases_workflow(): TODO: Remove 401 skip logic once SOAR IAM role issue is fixed. """ - client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + client = SecOpsClient() chronicle = client.chronicle(**CHRONICLE_CONFIG) try: @@ -68,10 +68,11 @@ def test_list_and_get_cases_workflow(): if case_id: case = chronicle.get_case(case_id) assert case is not None - assert hasattr(case, "id") - assert hasattr(case, "display_name") - assert hasattr(case, "priority") - assert hasattr(case, "status") + assert isinstance(case, dict) + assert "name" in case + assert "displayName" in case + assert "priority" in case + assert "status" in case else: pytest.skip("No cases available for testing") @@ -90,7 +91,7 @@ def test_case_update_workflow(): TODO: Remove 401 skip logic once SOAR IAM role issue is fixed. """ - client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + client = SecOpsClient() chronicle = client.chronicle(**CHRONICLE_CONFIG) # Use dedicated test case ID @@ -99,7 +100,7 @@ def test_case_update_workflow(): try: # Get original case state original_case = chronicle.get_case(case_id) - original_priority = original_case.priority + original_priority = original_case["priority"] # Determine new priority (toggle between HIGH and MEDIUM) new_priority = ( @@ -117,11 +118,11 @@ def test_case_update_workflow(): ) assert updated_case is not None - assert updated_case.priority == new_priority + assert updated_case["priority"] == new_priority # Verify by fetching again verified_case = chronicle.get_case(case_id) - assert verified_case.priority == new_priority + assert verified_case["priority"] == new_priority finally: # Cleanup: Restore original priority @@ -147,7 +148,7 @@ def test_bulk_operations_workflow(): TODO: Remove 401 skip logic once SOAR IAM role issue is fixed. """ - client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + client = SecOpsClient() chronicle = client.chronicle(**CHRONICLE_CONFIG) # Use dedicated test case ID @@ -188,7 +189,7 @@ def test_bulk_assign(): TODO: Remove 401 skip logic once SOAR IAM role issue is fixed. """ - client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + client = SecOpsClient() chronicle = client.chronicle(**CHRONICLE_CONFIG) # Use dedicated test case ID @@ -218,7 +219,7 @@ def test_bulk_close_reopen_workflow(): TODO: Remove 401 skip logic once SOAR IAM role issue is fixed. """ - client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + client = SecOpsClient() chronicle = client.chronicle(**CHRONICLE_CONFIG) # Use dedicated test case ID @@ -240,7 +241,7 @@ def test_bulk_close_reopen_workflow(): # Verify cases are closed by fetching one if case_ids: case = chronicle.get_case(str(case_ids[0])) - assert case.status == "CLOSED" + assert case["status"] == "CLOSED" print(f"Verified case {case_ids[0]} is CLOSED") finally: @@ -256,7 +257,7 @@ def test_bulk_close_reopen_workflow(): # Verify case is reopened if case_ids: case = chronicle.get_case(str(case_ids[0])) - assert case.status == "OPENED" + assert case["status"] == "OPENED" print(f"Verified case {case_ids[0]} is OPENED") except Exception as e: diff --git a/tests/chronicle/test_client.py b/tests/chronicle/test_client.py index 1db629d..f34bfb2 100644 --- a/tests/chronicle/test_client.py +++ b/tests/chronicle/test_client.py @@ -19,7 +19,7 @@ import pytest from secops.chronicle.client import ChronicleClient -from secops.chronicle.models import APIVersion, CaseList +from secops.chronicle.models import APIVersion from secops.exceptions import APIError @@ -323,15 +323,16 @@ def test_get_cases(chronicle_client): # Verify the correct parameter name was used assert call_args[1]["params"] == {"names": ["case-123"]} - assert isinstance(result, CaseList) - case = result.get_case("case-123") - assert case.display_name == "Test Case" - assert case.priority == "PRIORITY_HIGH" - assert case.soar_platform_info.case_id == "soar-123" + assert isinstance(result, dict) + assert len(result["cases"]) == 1 + case = result["cases"][0] + assert case["displayName"] == "Test Case" + assert case["priority"] == "PRIORITY_HIGH" + assert case["soarPlatformInfo"]["caseId"] == "soar-123" -def test_get_cases_filtering(chronicle_client): - """Test CaseList filtering methods.""" +def test_get_cases_multiple(chronicle_client): + """Test getting multiple cases returns raw dict.""" mock_response = Mock() mock_response.status_code = 200 mock_response.json.return_value = { @@ -356,13 +357,10 @@ def test_get_cases_filtering(chronicle_client): ): result = chronicle_client.get_cases(["case-1", "case-2"]) - high_priority = result.filter_by_priority("PRIORITY_HIGH") - assert len(high_priority) == 1 - assert high_priority[0].id == "case-1" - - open_cases = result.filter_by_status("OPEN") - assert len(open_cases) == 1 - assert open_cases[0].id == "case-1" + assert isinstance(result, dict) + assert len(result["cases"]) == 2 + assert result["cases"][0]["id"] == "case-1" + assert result["cases"][1]["id"] == "case-2" def test_get_cases_error(chronicle_client): diff --git a/tests/cli/test_case_integration.py b/tests/cli/test_case_integration.py index e562ac1..3cb6e3d 100644 --- a/tests/cli/test_case_integration.py +++ b/tests/cli/test_case_integration.py @@ -100,7 +100,7 @@ def test_cli_list_and_get_cases_workflow(cli_env, common_args): assert get_result.returncode == 0 get_output = json.loads(get_result.stdout) - assert "id" in get_output or "display_name" in get_output + assert "name" in get_output or "display_name" in get_output assert "priority" in get_output assert "status" in get_output except (json.JSONDecodeError, KeyError):