From d37fc9ed59d10401a72a37b9740f7661f12c7b8b Mon Sep 17 00:00:00 2001 From: Gracjan Sadowicz Date: Thu, 25 Jun 2026 01:42:55 +0200 Subject: [PATCH 1/5] RDBC-1079 Python API: Revisions Operations Port the missing revisions operations from the C# client (v7.2): EnforceRevisionsConfigurationOperation, AdoptOrphanedRevisionsOperation, DeleteRevisionsOperation, RevertRevisionsByIdOperation, ConfigureRevisionsBinCleanerOperation, and the server-wide ConfigureRevisionsForConflictsOperation, plus supporting models. Wires top-level exports and adds TestBase tests verified against a licensed 7.2 server. --- ravendb/__init__.py | 21 +- ravendb/documents/operations/revisions.py | 339 +++++++++++++++++- ravendb/serverwide/operations/revisions.py | 63 ++++ .../test_revisions_operations.py | 257 +++++++++++++ ravendb/tests/test_imports.py | 20 +- 5 files changed, 690 insertions(+), 10 deletions(-) create mode 100644 ravendb/serverwide/operations/revisions.py create mode 100644 ravendb/tests/jvm_migrated_tests/client_tests/revisions_tests/test_revisions_operations.py diff --git a/ravendb/__init__.py b/ravendb/__init__.py index ea8bde2c..038c0d7b 100644 --- a/ravendb/__init__.py +++ b/ravendb/__init__.py @@ -173,6 +173,23 @@ from ravendb.documents.operations.revisions import ( RevisionsCollectionConfiguration, RevisionsConfiguration, + RevisionsResult, + ConfigureRevisionsOperation, + ConfigureRevisionsOperationResult, + GetRevisionsOperation, + EnforceRevisionsConfigurationOperation, + AdoptOrphanedRevisionsOperation, + DeleteRevisionsOperation, + RevertRevisionsByIdOperation, + ConfigureRevisionsBinCleanerOperation, + ConfigureRevisionsBinCleanerOperationResult, + RevisionsBinConfiguration, + RevisionsOperationParameters, + RevisionsOperationContinuationParameters, +) +from ravendb.serverwide.operations.revisions import ( + ConfigureRevisionsForConflictsOperation, + ConfigureRevisionsForConflictsResult, ) from ravendb.documents.operations.statistics import ( GetCollectionStatisticsOperation, @@ -340,7 +357,6 @@ # todo: Serverwide # ReorderDatabaseMembersOperation -# ConfigureRevisionsForConflictsOperation # UpdateDatabaseOperation # GetServerWideBackupConfigurationOperation # SetDatabaseDynamicDistributionOperation @@ -420,9 +436,6 @@ # LazyRevisionOperation # LazyRevisionOperations # StreamOperation -# ConfigureRevisionsOperation -# GetRevisionsOperation -# RevisionsResult # GetConnectionStringsOperation # RemoveConnectionStringOperation # SqlEtlTable diff --git a/ravendb/documents/operations/revisions.py b/ravendb/documents/operations/revisions.py index a84731c1..76123be0 100644 --- a/ravendb/documents/operations/revisions.py +++ b/ravendb/documents/operations/revisions.py @@ -8,12 +8,18 @@ import requests from ravendb.documents.commands.revisions import GetRevisionsCommand -from ravendb.documents.operations.definitions import IOperation, MaintenanceOperation -from ravendb.http.raven_command import RavenCommand +from ravendb.documents.operations.definitions import ( + IOperation, + MaintenanceOperation, + OperationIdResult, + VoidOperation, +) +from ravendb.http.raven_command import RavenCommand, VoidRavenCommand from ravendb.util.util import RaftIdGenerator from ravendb.http.topology import RaftCommand from ravendb.documents.session.entity_to_json import EntityToJsonStatic from ravendb.documents.conventions import DocumentConventions +from ravendb.tools.utils import Utils if TYPE_CHECKING: from ravendb.http.http_cache import HttpCache @@ -216,3 +222,332 @@ def set_response(self, response: Optional[str], from_cache: bool) -> None: def get_raft_unique_request_id(self) -> str: return RaftIdGenerator().new_id() + + +class RevisionsOperationContinuationParameters: + """State for resuming an interrupted revisions operation (etags are node-local).""" + + def __init__( + self, + start_from_etags: Dict[str, int] = None, + etag_barriers: Dict[str, int] = None, + node_tags: Dict[str, str] = None, + ): + self.start_from_etags = start_from_etags + self.etag_barriers = etag_barriers + self.node_tags = node_tags + + def to_json(self) -> Dict[str, Any]: + return { + "StartFromEtags": self.start_from_etags, + "EtagBarriers": self.etag_barriers, + "NodeTags": self.node_tags, + } + + +class RevisionsOperationParameters: + """Base parameters shared by enforce-configuration and adopt-orphaned operations.""" + + def __init__( + self, + collections: List[str] = None, + continuation_parameters: RevisionsOperationContinuationParameters = None, + ): + self.collections = collections + self.continuation_parameters = continuation_parameters + + def to_json(self) -> Dict[str, Any]: + return { + "Collections": self.collections, + "ContinuationParameters": ( + self.continuation_parameters.to_json() if self.continuation_parameters else None + ), + } + + +class EnforceRevisionsConfigurationOperation(IOperation[OperationIdResult]): + """Applies the current revisions configuration to all existing revisions (long-running: + send via ``store.operations.send_async`` and await completion).""" + + class Parameters(RevisionsOperationParameters): + def __init__( + self, + include_force_created: bool = False, + max_ops_per_second: int = None, + collections: List[str] = None, + continuation_parameters: RevisionsOperationContinuationParameters = None, + ): + super().__init__(collections, continuation_parameters) + if max_ops_per_second is not None and max_ops_per_second <= 0: + raise ValueError("max_ops_per_second must be greater than 0") + self.include_force_created = include_force_created + self.max_ops_per_second = max_ops_per_second + + def to_json(self) -> Dict[str, Any]: + json_dict = super().to_json() + json_dict["IncludeForceCreated"] = self.include_force_created + json_dict["MaxOpsPerSecond"] = self.max_ops_per_second + return json_dict + + def __init__(self, parameters: Optional["EnforceRevisionsConfigurationOperation.Parameters"] = None): + self._parameters = parameters if parameters is not None else EnforceRevisionsConfigurationOperation.Parameters() + + def get_command( + self, store: DocumentStore, conventions: DocumentConventions, cache: HttpCache + ) -> RavenCommand[OperationIdResult]: + return self.EnforceRevisionsConfigurationCommand(self._parameters) + + class EnforceRevisionsConfigurationCommand(RavenCommand[OperationIdResult]): + def __init__(self, parameters: "EnforceRevisionsConfigurationOperation.Parameters"): + super().__init__(OperationIdResult) + self._parameters = parameters + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: ServerNode) -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/revisions/config/enforce" + request = requests.Request("POST", url) + request.data = self._parameters.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = OperationIdResult.from_json(json.loads(response)) + + +class AdoptOrphanedRevisionsOperation(IOperation[OperationIdResult]): + """Re-attaches orphaned revisions to their documents (long-running: send via + ``store.operations.send_async`` and await completion).""" + + class Parameters(RevisionsOperationParameters): + pass + + def __init__(self, parameters: Optional["AdoptOrphanedRevisionsOperation.Parameters"] = None): + self._parameters = parameters if parameters is not None else AdoptOrphanedRevisionsOperation.Parameters() + + def get_command( + self, store: DocumentStore, conventions: DocumentConventions, cache: HttpCache + ) -> RavenCommand[OperationIdResult]: + return self.AdoptOrphanedRevisionsCommand(self._parameters) + + class AdoptOrphanedRevisionsCommand(RavenCommand[OperationIdResult]): + def __init__(self, parameters: "AdoptOrphanedRevisionsOperation.Parameters"): + super().__init__(OperationIdResult) + self._parameters = parameters + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: ServerNode) -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/revisions/orphaned/adopt" + request = requests.Request("POST", url) + request.data = self._parameters.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = OperationIdResult.from_json(json.loads(response)) + + +class DeleteRevisionsOperation(MaintenanceOperation["DeleteRevisionsOperation.Result"]): + """Explicitly deletes revisions for one or more documents - by document id(s), by a + date range, or by specific revision change-vectors.""" + + class Result: + def __init__(self, total_deletes: int = None): + self.total_deletes = total_deletes + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> DeleteRevisionsOperation.Result: + return cls(json_dict["TotalDeletes"]) + + class Parameters: + def __init__( + self, + document_ids: List[str] = None, + remove_force_created_revisions: bool = False, + revisions_change_vectors: List[str] = None, + from_date: datetime.datetime = None, + to_date: datetime.datetime = None, + ): + self.document_ids = document_ids + self.remove_force_created_revisions = remove_force_created_revisions + self.revisions_change_vectors = revisions_change_vectors + self.from_date = from_date + self.to_date = to_date + + def validate(self) -> None: + if not self.document_ids: + raise ValueError("Document ids cannot be None or empty") + + for document_id in self.document_ids: + if not document_id or document_id.isspace(): + raise ValueError("Document id cannot be None or whitespace") + + if self.revisions_change_vectors: + if len(self.document_ids) != 1: + raise ValueError("The number of document ids must be 1 when using revisions change vectors") + if self.from_date is not None or self.to_date is not None: + raise ValueError("Can't use revisions change vectors and date range in the same request") + elif self.from_date is not None and self.to_date is not None and self.to_date <= self.from_date: + raise ValueError("To date must be greater than From date") + + def to_json(self) -> Dict[str, Any]: + return { + "DocumentIds": self.document_ids, + "RevisionsChangeVectors": self.revisions_change_vectors, + "From": Utils.datetime_to_string(self.from_date) if self.from_date is not None else None, + "To": Utils.datetime_to_string(self.to_date) if self.to_date is not None else None, + "RemoveForceCreatedRevisions": self.remove_force_created_revisions, + } + + def __init__( + self, + document_id: str = None, + revisions_change_vectors: List[str] = None, + from_date: datetime.datetime = None, + to_date: datetime.datetime = None, + remove_force_created_revisions: bool = False, + document_ids: List[str] = None, + parameters: Optional["DeleteRevisionsOperation.Parameters"] = None, + ): + if parameters is None: + if document_ids is None and document_id is not None: + document_ids = [document_id] + parameters = DeleteRevisionsOperation.Parameters( + document_ids, + remove_force_created_revisions, + revisions_change_vectors, + from_date, + to_date, + ) + parameters.validate() + self._parameters = parameters + + def get_command(self, conventions: DocumentConventions) -> RavenCommand[DeleteRevisionsOperation.Result]: + return self.DeleteRevisionsCommand(self._parameters) + + class DeleteRevisionsCommand(RavenCommand["DeleteRevisionsOperation.Result"], RaftCommand): + def __init__(self, parameters: "DeleteRevisionsOperation.Parameters"): + super().__init__(DeleteRevisionsOperation.Result) + self._parameters = parameters + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: ServerNode) -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/revisions" + request = requests.Request("DELETE", url) + request.data = self._parameters.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = DeleteRevisionsOperation.Result.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator().new_id() + + +class RevertRevisionsByIdOperation(VoidOperation): + """Reverts one or more documents to a specific revision identified by its change-vector.""" + + def __init__(self, id_to_change_vector: Dict[str, str] = None, id_: str = None, change_vector: str = None): + if id_to_change_vector is None: + if not id_: + raise ValueError("Id cannot be None or empty") + if not change_vector: + raise ValueError("Change vector cannot be None or empty") + id_to_change_vector = {id_: change_vector} + + if not id_to_change_vector: + raise ValueError("id_to_change_vector cannot be None or empty") + + self._id_to_change_vector = id_to_change_vector + + def get_command(self, store: DocumentStore, conventions: DocumentConventions, cache: HttpCache) -> VoidRavenCommand: + return self.RevertRevisionsByIdCommand(self._id_to_change_vector) + + class RevertRevisionsByIdCommand(VoidRavenCommand): + def __init__(self, id_to_change_vector: Dict[str, str]): + super().__init__() + self._id_to_change_vector = id_to_change_vector + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: ServerNode) -> requests.Request: + url = f"{node.url}/databases/{node.database}/revisions/revert/docs" + request = requests.Request("POST", url) + request.data = {"IdToChangeVector": self._id_to_change_vector} + return request + + +class RevisionsBinConfiguration: + """Configuration for the automatic revisions-bin cleaner.""" + + def __init__( + self, + disabled: bool = False, + minimum_entries_age_to_keep_in_min: int = 43200, + cleaner_frequency_in_sec: int = 300, + ): + self.disabled = disabled + self.minimum_entries_age_to_keep_in_min = minimum_entries_age_to_keep_in_min + self.cleaner_frequency_in_sec = cleaner_frequency_in_sec + + def to_json(self) -> Dict[str, Any]: + return { + "Disabled": self.disabled, + "MinimumEntriesAgeToKeepInMin": self.minimum_entries_age_to_keep_in_min, + "CleanerFrequencyInSec": self.cleaner_frequency_in_sec, + } + + +class ConfigureRevisionsBinCleanerOperationResult: + def __init__(self, raft_command_index: int = None): + self.raft_command_index = raft_command_index + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> ConfigureRevisionsBinCleanerOperationResult: + return cls(json_dict["RaftCommandIndex"]) + + +class ConfigureRevisionsBinCleanerOperation(MaintenanceOperation[ConfigureRevisionsBinCleanerOperationResult]): + """Enables / configures the automatic revisions-bin cleaner for the database.""" + + def __init__(self, configuration: RevisionsBinConfiguration): + if configuration is None: + raise ValueError("Configuration cannot be None") + self._configuration = configuration + + def get_command( + self, conventions: DocumentConventions + ) -> RavenCommand[ConfigureRevisionsBinCleanerOperationResult]: + return self.ConfigureRevisionsBinCleanerCommand(self._configuration) + + class ConfigureRevisionsBinCleanerCommand(RavenCommand[ConfigureRevisionsBinCleanerOperationResult], RaftCommand): + def __init__(self, configuration: RevisionsBinConfiguration): + super().__init__(ConfigureRevisionsBinCleanerOperationResult) + self._configuration = configuration + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: ServerNode) -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/revisions/bin-cleaner/config" + request = requests.Request("POST", url) + request.data = self._configuration.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ConfigureRevisionsBinCleanerOperationResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator().new_id() diff --git a/ravendb/serverwide/operations/revisions.py b/ravendb/serverwide/operations/revisions.py new file mode 100644 index 00000000..d72f3a3f --- /dev/null +++ b/ravendb/serverwide/operations/revisions.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, Optional, TYPE_CHECKING + +import requests + +from ravendb.documents.operations.revisions import RevisionsCollectionConfiguration +from ravendb.http.raven_command import RavenCommand +from ravendb.http.topology import RaftCommand +from ravendb.serverwide.operations.common import ServerOperation +from ravendb.util.util import RaftIdGenerator + +if TYPE_CHECKING: + from ravendb.documents.conventions import DocumentConventions + from ravendb.http.server_node import ServerNode + + +class ConfigureRevisionsForConflictsResult: + def __init__(self, raft_command_index: Optional[int] = None): + self.raft_command_index = raft_command_index + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> ConfigureRevisionsForConflictsResult: + return cls(json_dict["RaftCommandIndex"]) + + +class ConfigureRevisionsForConflictsOperation(ServerOperation[ConfigureRevisionsForConflictsResult]): + """Sets the revisions configuration that is applied to conflicting documents of a database.""" + + def __init__(self, database: str, configuration: RevisionsCollectionConfiguration): + if configuration is None: + raise ValueError("Configuration cannot be None") + self._database = database + self._configuration = configuration + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[ConfigureRevisionsForConflictsResult]: + return self.ConfigureRevisionsForConflictsCommand(self._database, self._configuration) + + class ConfigureRevisionsForConflictsCommand(RavenCommand[ConfigureRevisionsForConflictsResult], RaftCommand): + def __init__(self, database: str, configuration: RevisionsCollectionConfiguration): + super().__init__(ConfigureRevisionsForConflictsResult) + if database is None: + raise ValueError("Database cannot be None") + self._database = database + self._configuration = configuration + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{self._database}/admin/revisions/conflicts/config" + request = requests.Request("POST", url) + request.data = self._configuration.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ConfigureRevisionsForConflictsResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator().new_id() diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/revisions_tests/test_revisions_operations.py b/ravendb/tests/jvm_migrated_tests/client_tests/revisions_tests/test_revisions_operations.py new file mode 100644 index 00000000..9e241274 --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/revisions_tests/test_revisions_operations.py @@ -0,0 +1,257 @@ +from datetime import datetime + +from ravendb.documents.operations.revisions import ( + AdoptOrphanedRevisionsOperation, + ConfigureRevisionsBinCleanerOperation, + ConfigureRevisionsOperation, + DeleteRevisionsOperation, + EnforceRevisionsConfigurationOperation, + RevertRevisionsByIdOperation, + RevisionsBinConfiguration, + RevisionsCollectionConfiguration, + RevisionsConfiguration, +) +from ravendb.serverwide.operations.revisions import ConfigureRevisionsForConflictsOperation +from ravendb.infrastructure.orders import Company +from ravendb.primitives import constants +from ravendb.tests.test_base import TestBase + + +class TestRevisionsOperations(TestBase): + def setUp(self): + super().setUp() + + def _create_company_with_revisions(self, number_of_updates: int = 4) -> Company: + company = Company(name="Company Name") + with self.store.open_session() as session: + session.store(company) + session.save_changes() + + for i in range(number_of_updates): + with self.store.open_session() as session: + loaded = session.load(company.Id, Company) + loaded.name = f"Company {i}" + session.save_changes() + + return company + + def test_enforce_revisions_configuration_purges_excess_revisions(self): + self.setup_revisions(self.store, False, 100) + company = self._create_company_with_revisions(4) + + with self.store.open_session() as session: + self.assertEqual(5, session.advanced.revisions.get_count_for(company.Id)) + + # Tighten the configuration - this only affects future modifications... + configuration = RevisionsConfiguration() + default_config = RevisionsCollectionConfiguration() + default_config.minimum_revisions_to_keep = 2 + configuration.default_config = default_config + self.store.maintenance.send(ConfigureRevisionsOperation(configuration)) + + with self.store.open_session() as session: + self.assertEqual(5, session.advanced.revisions.get_count_for(company.Id)) + + # ...until we enforce it on the existing revisions. + operation = self.store.operations.send_async(EnforceRevisionsConfigurationOperation()) + operation.wait_for_completion() + + with self.store.open_session() as session: + self.assertEqual(2, session.advanced.revisions.get_count_for(company.Id)) + + def test_enforce_revisions_configuration_with_parameters(self): + self.setup_revisions(self.store, False, 100) + company = self._create_company_with_revisions(4) + + configuration = RevisionsConfiguration() + default_config = RevisionsCollectionConfiguration() + default_config.minimum_revisions_to_keep = 1 + configuration.default_config = default_config + self.store.maintenance.send(ConfigureRevisionsOperation(configuration)) + + parameters = EnforceRevisionsConfigurationOperation.Parameters( + include_force_created=True, collections=["Companies"] + ) + operation = self.store.operations.send_async(EnforceRevisionsConfigurationOperation(parameters)) + operation.wait_for_completion() + + with self.store.open_session() as session: + self.assertEqual(1, session.advanced.revisions.get_count_for(company.Id)) + + def test_delete_revisions_by_document_id(self): + self.setup_revisions(self.store, False, 100) + company = self._create_company_with_revisions(4) + + with self.store.open_session() as session: + self.assertEqual(5, session.advanced.revisions.get_count_for(company.Id)) + + result = self.store.maintenance.send(DeleteRevisionsOperation(document_id=company.Id)) + self.assertEqual(5, result.total_deletes) + + with self.store.open_session() as session: + self.assertEqual(0, session.advanced.revisions.get_count_for(company.Id)) + + def test_delete_revisions_by_change_vectors(self): + self.setup_revisions(self.store, False, 100) + company = self._create_company_with_revisions(4) + + with self.store.open_session() as session: + metadata = session.advanced.revisions.get_metadata_for(company.Id) + self.assertEqual(5, len(metadata)) + change_vectors = [m[constants.Documents.Metadata.CHANGE_VECTOR] for m in metadata[:2]] + + result = self.store.maintenance.send( + DeleteRevisionsOperation(document_id=company.Id, revisions_change_vectors=change_vectors) + ) + self.assertEqual(2, result.total_deletes) + + with self.store.open_session() as session: + self.assertEqual(3, session.advanced.revisions.get_count_for(company.Id)) + + def test_delete_revisions_validation_is_client_side(self): + self.assertRaises(ValueError, lambda: DeleteRevisionsOperation(document_ids=[])) + self.assertRaises( + ValueError, + lambda: DeleteRevisionsOperation( + document_id="companies/1", revisions_change_vectors=["cv"], from_date=datetime(2020, 1, 1) + ), + ) + + def test_revert_revisions_by_id(self): + self.setup_revisions(self.store, False, 100) + + company = Company(name="Old Name") + with self.store.open_session() as session: + session.store(company) + session.save_changes() + + with self.store.open_session() as session: + loaded = session.load(company.Id, Company) + loaded.name = "New Name" + session.save_changes() + + with self.store.open_session() as session: + metadata = session.advanced.revisions.get_metadata_for(company.Id) + self.assertEqual(2, len(metadata)) + # Metadata is ordered newest-first, so the original ("Old Name") revision is last. + old_change_vector = metadata[1][constants.Documents.Metadata.CHANGE_VECTOR] + + self.store.operations.send(RevertRevisionsByIdOperation(id_=company.Id, change_vector=old_change_vector)) + + with self.store.open_session() as session: + loaded = session.load(company.Id, Company) + self.assertEqual("Old Name", loaded.name) + + def test_configure_revisions_bin_cleaner(self): + configuration = RevisionsBinConfiguration() + configuration.disabled = False + configuration.minimum_entries_age_to_keep_in_min = 10 + configuration.cleaner_frequency_in_sec = 100 + + result = self.store.maintenance.send(ConfigureRevisionsBinCleanerOperation(configuration)) + + self.assertIsNotNone(result) + self.assertIsNotNone(result.raft_command_index) + self.assertGreater(result.raft_command_index, 0) + + def test_configure_revisions_for_conflicts(self): + configuration = RevisionsCollectionConfiguration() + configuration.minimum_revisions_to_keep = 5 + + result = self.store.maintenance.server.send( + ConfigureRevisionsForConflictsOperation(self.store.database, configuration) + ) + + self.assertIsNotNone(result) + self.assertIsNotNone(result.raft_command_index) + self.assertGreater(result.raft_command_index, 0) + + def test_adopt_orphaned_revisions_completes(self): + self.setup_revisions(self.store, False, 100) + + company = Company(name="Company Name") + with self.store.open_session() as session: + session.store(company) + session.save_changes() + + with self.store.open_session() as session: + session.delete(company.Id) + session.save_changes() + + # The operation must run to completion against the server (0 adoptions is a valid result); + # wait_for_completion raises if the operation faults. + operation = self.store.operations.send_async(AdoptOrphanedRevisionsOperation()) + operation.wait_for_completion() + + def test_enforce_revisions_configuration_with_max_ops_per_second(self): + self.setup_revisions(self.store, False, 100) + company = self._create_company_with_revisions(4) + + configuration = RevisionsConfiguration() + default_config = RevisionsCollectionConfiguration() + default_config.minimum_revisions_to_keep = 2 + configuration.default_config = default_config + self.store.maintenance.send(ConfigureRevisionsOperation(configuration)) + + parameters = EnforceRevisionsConfigurationOperation.Parameters(max_ops_per_second=10000) + operation = self.store.operations.send_async(EnforceRevisionsConfigurationOperation(parameters)) + operation.wait_for_completion() + + with self.store.open_session() as session: + self.assertEqual(2, session.advanced.revisions.get_count_for(company.Id)) + + def test_enforce_parameters_reject_non_positive_max_ops_per_second(self): + self.assertRaises(ValueError, lambda: EnforceRevisionsConfigurationOperation.Parameters(max_ops_per_second=0)) + + def test_delete_revisions_by_date_range(self): + self.setup_revisions(self.store, False, 100) + company = self._create_company_with_revisions(4) + + result = self.store.maintenance.send( + DeleteRevisionsOperation( + document_id=company.Id, from_date=datetime(2000, 1, 1), to_date=datetime(2100, 1, 1) + ) + ) + self.assertEqual(5, result.total_deletes) + with self.store.open_session() as session: + self.assertEqual(0, session.advanced.revisions.get_count_for(company.Id)) + + def test_delete_revisions_for_multiple_documents(self): + self.setup_revisions(self.store, False, 100) + first = self._create_company_with_revisions(4) + second = self._create_company_with_revisions(4) + + result = self.store.maintenance.send(DeleteRevisionsOperation(document_ids=[first.Id, second.Id])) + self.assertEqual(10, result.total_deletes) + with self.store.open_session() as session: + self.assertEqual(0, session.advanced.revisions.get_count_for(first.Id)) + self.assertEqual(0, session.advanced.revisions.get_count_for(second.Id)) + + def test_revert_multiple_revisions_by_id(self): + self.setup_revisions(self.store, False, 100) + + id_to_old_change_vector = {} + for _ in range(2): + company = Company(name="Old Name") + with self.store.open_session() as session: + session.store(company) + session.save_changes() + with self.store.open_session() as session: + session.load(company.Id, Company).name = "New Name" + session.save_changes() + with self.store.open_session() as session: + metadata = session.advanced.revisions.get_metadata_for(company.Id) + id_to_old_change_vector[company.Id] = metadata[1][constants.Documents.Metadata.CHANGE_VECTOR] + + self.store.operations.send(RevertRevisionsByIdOperation(id_to_change_vector=id_to_old_change_vector)) + + with self.store.open_session() as session: + for company_id in id_to_old_change_vector: + self.assertEqual("Old Name", session.load(company_id, Company).name) + + def test_configure_revisions_bin_cleaner_disabled(self): + result = self.store.maintenance.send( + ConfigureRevisionsBinCleanerOperation(RevisionsBinConfiguration(disabled=True)) + ) + self.assertIsNotNone(result.raft_command_index) + self.assertGreater(result.raft_command_index, 0) diff --git a/ravendb/tests/test_imports.py b/ravendb/tests/test_imports.py index 2edda75b..bdba36c2 100644 --- a/ravendb/tests/test_imports.py +++ b/ravendb/tests/test_imports.py @@ -29,7 +29,9 @@ def test_imports_at_top_level(self): from ravendb import GetBuildNumberOperation # from ravendb import ReorderDatabaseMembersOperation - # from ravendb import ConfigureRevisionsForConflictsOperation + from ravendb import ConfigureRevisionsForConflictsOperation + from ravendb import ConfigureRevisionsForConflictsResult + # from ravendb import UpdateDatabaseOperation # from ravendb import GetServerWideBackupConfigurationOperation # from ravendb import SetDatabaseDynamicDistributionOperation @@ -215,9 +217,19 @@ def test_imports_at_top_level(self): from ravendb import PatchResult from ravendb import PatchStatus - # from ravendb import ConfigureRevisionsOperation - # from ravendb import GetRevisionsOperation - # from ravendb import RevisionsResult + from ravendb import ConfigureRevisionsOperation + from ravendb import ConfigureRevisionsOperationResult + from ravendb import GetRevisionsOperation + from ravendb import RevisionsResult + from ravendb import EnforceRevisionsConfigurationOperation + from ravendb import AdoptOrphanedRevisionsOperation + from ravendb import DeleteRevisionsOperation + from ravendb import RevertRevisionsByIdOperation + from ravendb import ConfigureRevisionsBinCleanerOperation + from ravendb import ConfigureRevisionsBinCleanerOperationResult + from ravendb import RevisionsBinConfiguration + from ravendb import RevisionsOperationParameters + from ravendb import RevisionsOperationContinuationParameters from ravendb import RevisionsCollectionConfiguration from ravendb import RevisionsConfiguration from ravendb import DetailedDatabaseStatistics From 8bd85785a85648fabac066e8191184de4e513af9 Mon Sep 17 00:00:00 2001 From: Gracjan Sadowicz Date: Thu, 25 Jun 2026 01:42:55 +0200 Subject: [PATCH 2/5] RDBC-1076 Python API: Pull Replication Port the Pull Replication client API from the C# client (v7.2): hub, sink, external-replication and hub-access (register/unregister/get) operations, read-side pull-replication task info, supporting models/enums and ReplicationHubNotFoundException. Wires top-level exports and adds TestBase tests (incl. secured, certificate-based coverage) verified against a licensed 7.2 server. License-gated tests use the repo RAVENDB_LICENSE skipIf (skip on CI). --- ravendb/__init__.py | 36 +- ravendb/documents/operations/ongoing_tasks.py | 183 ++++++++++ .../operations/replication/definitions.py | 327 +++++++++++++++++- .../replication/pull_replication.py | 289 ++++++++++++++++ ravendb/exceptions/exception_dispatcher.py | 3 + ravendb/exceptions/raven_exceptions.py | 5 + .../replication_tests/__init__.py | 0 .../test_pull_replication.py | 202 +++++++++++ .../test_pull_replication_secured.py | 123 +++++++ ravendb/tests/test_imports.py | 32 +- 10 files changed, 1160 insertions(+), 40 deletions(-) create mode 100644 ravendb/documents/operations/replication/pull_replication.py create mode 100644 ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/__init__.py create mode 100644 ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py create mode 100644 ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication_secured.py diff --git a/ravendb/__init__.py b/ravendb/__init__.py index 038c0d7b..172d57b8 100644 --- a/ravendb/__init__.py +++ b/ravendb/__init__.py @@ -165,10 +165,29 @@ from ravendb.documents.operations.refresh.configuration import RefreshConfiguration from ravendb.documents.operations.replication.definitions import ( ExternalReplication, + ExternalReplicationBase, + ReplicationNode, PullReplicationAsSink, PullReplicationDefinition, - ReplicationNode, - ExternalReplicationBase, + PullReplicationMode, + PreventDeletionsMode, + ReplicationHubAccess, + DetailedReplicationHubAccess, + ReplicationHubAccessResult, + PullReplicationDefinitionAndCurrentConnections, +) +from ravendb.documents.operations.replication.pull_replication import ( + PutPullReplicationAsHubOperation, + UpdatePullReplicationAsSinkOperation, + UpdateExternalReplicationOperation, + RegisterReplicationHubAccessOperation, + UnregisterReplicationHubAccessOperation, + GetReplicationHubAccessOperation, + GetPullReplicationTasksInfoOperation, +) +from ravendb.documents.operations.ongoing_tasks import ( + OngoingTaskPullReplicationAsSink, + OngoingTaskPullReplicationAsHub, ) from ravendb.documents.operations.revisions import ( RevisionsCollectionConfiguration, @@ -411,17 +430,7 @@ # SeedIdentityForOperation # IOperationProgress # IOperationResult -# PullReplicationDefinitionAndCurrentConnections -# DetailedReplicationHubAccess -# GetReplicationHubAccessOperation -# PreventDeletionsMode -# PullReplicationMode -# RegisterReplicationHubAccessOperation -# ReplicationHubAccess -# ReplicationHubAccessResult # ReplicationHubAccessResponse -# UnregisterReplicationHubAccessOperation -# UpdatePullReplicationAsSinkOperation # GetConflictsCommand # PutAttachmentCommandHelper # SetupDocumentBase @@ -448,9 +457,6 @@ # DisableDatabaseToggleResult # ConfigureExpirationOperation # DeleteOngoingTaskOperation -# GetPullReplicationHubTasksInfoOperation -# OngoingTaskPullReplicationAsSink -# OngoingTaskPullReplicationAsHub # OngoingTaskType # RunningBackup # NextBackup diff --git a/ravendb/documents/operations/ongoing_tasks.py b/ravendb/documents/operations/ongoing_tasks.py index 6561fa89..62b869c6 100644 --- a/ravendb/documents/operations/ongoing_tasks.py +++ b/ravendb/documents/operations/ongoing_tasks.py @@ -12,6 +12,7 @@ from ravendb.tools.utils import Utils from ravendb.util.util import RaftIdGenerator from ravendb.http.raven_command import RavenCommand +from ravendb.documents.operations.replication.definitions import PullReplicationMode if TYPE_CHECKING: from ravendb.documents.conventions import DocumentConventions @@ -279,6 +280,186 @@ def from_json(cls, json_dict: dict) -> "OngoingTaskEmbeddingsGeneration": ) +class OngoingTaskPullReplicationAsHub(OngoingTask): + """Ongoing task information for a single pull-replication hub connection.""" + + def __init__( + self, + task_id: Optional[int] = None, + responsible_node: Optional[NodeId] = None, + task_state: Optional[OngoingTaskState] = None, + task_connection_status: Optional[OngoingTaskConnectionStatus] = None, + task_name: Optional[str] = None, + error: Optional[str] = None, + mentor_node: Optional[str] = None, + pin_to_mentor_node: Optional[bool] = None, + from_to_string: Optional[str] = None, + destination_url: Optional[str] = None, + destination_database: Optional[str] = None, + delay_replication_for=None, + handler_id: Optional[str] = None, + last_accepted_change_vector_from_destination: Optional[str] = None, + source_database_change_vector: Optional[str] = None, + last_sent_etag: Optional[int] = None, + last_database_etag: Optional[int] = None, + ): + super().__init__( + task_id=task_id, + task_type=OngoingTaskType.PULL_REPLICATION_AS_HUB, + responsible_node=responsible_node, + task_state=task_state, + task_connection_status=task_connection_status, + task_name=task_name, + error=error, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + ) + self.from_to_string = from_to_string + self.destination_url = destination_url + self.destination_database = destination_database + self.delay_replication_for = delay_replication_for + self.handler_id = handler_id + self.last_accepted_change_vector_from_destination = last_accepted_change_vector_from_destination + self.source_database_change_vector = source_database_change_vector + self.last_sent_etag = last_sent_etag + self.last_database_etag = last_database_etag + + def to_json(self) -> dict: + result = super().to_json() + result["FromToString"] = self.from_to_string + result["DestinationUrl"] = self.destination_url + result["DestinationDatabase"] = self.destination_database + result["DelayReplicationFor"] = ( + Utils.timedelta_to_str(self.delay_replication_for) if self.delay_replication_for is not None else None + ) + result["HandlerId"] = self.handler_id + result["LastAcceptedChangeVectorFromDestination"] = self.last_accepted_change_vector_from_destination + result["SourceDatabaseChangeVector"] = self.source_database_change_vector + result["LastSentEtag"] = self.last_sent_etag + result["LastDatabaseEtag"] = self.last_database_etag + return result + + @classmethod + def from_json(cls, json_dict: dict) -> Optional["OngoingTaskPullReplicationAsHub"]: + if json_dict is None: + return None + task_state_str = json_dict.get("TaskState") + task_connection_status_str = json_dict.get("TaskConnectionStatus") + delay = json_dict.get("DelayReplicationFor") + return cls( + task_id=json_dict.get("TaskId"), + responsible_node=NodeId.from_json(json_dict.get("ResponsibleNode")), + task_state=OngoingTaskState(task_state_str) if task_state_str else None, + task_connection_status=( + OngoingTaskConnectionStatus(task_connection_status_str) if task_connection_status_str else None + ), + task_name=json_dict.get("TaskName"), + error=json_dict.get("Error"), + mentor_node=json_dict.get("MentorNode"), + pin_to_mentor_node=json_dict.get("PinToMentorNode"), + from_to_string=json_dict.get("FromToString"), + destination_url=json_dict.get("DestinationUrl"), + destination_database=json_dict.get("DestinationDatabase"), + delay_replication_for=Utils.string_to_timedelta(delay) if delay else None, + handler_id=json_dict.get("HandlerId"), + last_accepted_change_vector_from_destination=json_dict.get("LastAcceptedChangeVectorFromDestination"), + source_database_change_vector=json_dict.get("SourceDatabaseChangeVector"), + last_sent_etag=json_dict.get("LastSentEtag"), + last_database_etag=json_dict.get("LastDatabaseEtag"), + ) + + +class OngoingTaskPullReplicationAsSink(OngoingTask): + """Ongoing task information for a pull-replication sink task.""" + + def __init__( + self, + task_id: Optional[int] = None, + responsible_node: Optional[NodeId] = None, + task_state: Optional[OngoingTaskState] = None, + task_connection_status: Optional[OngoingTaskConnectionStatus] = None, + task_name: Optional[str] = None, + error: Optional[str] = None, + mentor_node: Optional[str] = None, + pin_to_mentor_node: Optional[bool] = None, + hub_name: Optional[str] = None, + mode: Optional[PullReplicationMode] = None, + destination_url: Optional[str] = None, + topology_discovery_urls: Optional[list] = None, + destination_database: Optional[str] = None, + connection_string_name: Optional[str] = None, + certificate_public_key: Optional[str] = None, + access_name: Optional[str] = None, + allowed_hub_to_sink_paths: Optional[list] = None, + allowed_sink_to_hub_paths: Optional[list] = None, + ): + super().__init__( + task_id=task_id, + task_type=OngoingTaskType.PULL_REPLICATION_AS_SINK, + responsible_node=responsible_node, + task_state=task_state, + task_connection_status=task_connection_status, + task_name=task_name, + error=error, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + ) + self.hub_name = hub_name + self.mode = mode + self.destination_url = destination_url + self.topology_discovery_urls = topology_discovery_urls + self.destination_database = destination_database + self.connection_string_name = connection_string_name + self.certificate_public_key = certificate_public_key + self.access_name = access_name + self.allowed_hub_to_sink_paths = allowed_hub_to_sink_paths + self.allowed_sink_to_hub_paths = allowed_sink_to_hub_paths + + def to_json(self) -> dict: + result = super().to_json() + result["HubName"] = self.hub_name + result["Mode"] = self.mode.value if self.mode else None + result["DestinationUrl"] = self.destination_url + result["TopologyDiscoveryUrls"] = self.topology_discovery_urls + result["DestinationDatabase"] = self.destination_database + result["ConnectionStringName"] = self.connection_string_name + result["CertificatePublicKey"] = self.certificate_public_key + result["AccessName"] = self.access_name + result["AllowedHubToSinkPaths"] = self.allowed_hub_to_sink_paths + result["AllowedSinkToHubPaths"] = self.allowed_sink_to_hub_paths + return result + + @classmethod + def from_json(cls, json_dict: dict) -> Optional["OngoingTaskPullReplicationAsSink"]: + if json_dict is None: + return None + task_state_str = json_dict.get("TaskState") + task_connection_status_str = json_dict.get("TaskConnectionStatus") + mode_str = json_dict.get("Mode") + return cls( + task_id=json_dict.get("TaskId"), + responsible_node=NodeId.from_json(json_dict.get("ResponsibleNode")), + task_state=OngoingTaskState(task_state_str) if task_state_str else None, + task_connection_status=( + OngoingTaskConnectionStatus(task_connection_status_str) if task_connection_status_str else None + ), + task_name=json_dict.get("TaskName"), + error=json_dict.get("Error"), + mentor_node=json_dict.get("MentorNode"), + pin_to_mentor_node=json_dict.get("PinToMentorNode"), + hub_name=json_dict.get("HubName"), + mode=PullReplicationMode(mode_str) if mode_str else None, + destination_url=json_dict.get("DestinationUrl"), + topology_discovery_urls=json_dict.get("TopologyDiscoveryUrls"), + destination_database=json_dict.get("DestinationDatabase"), + connection_string_name=json_dict.get("ConnectionStringName"), + certificate_public_key=json_dict.get("CertificatePublicKey"), + access_name=json_dict.get("AccessName"), + allowed_hub_to_sink_paths=json_dict.get("AllowedHubToSinkPaths"), + allowed_sink_to_hub_paths=json_dict.get("AllowedSinkToHubPaths"), + ) + + class ToggleOngoingTaskStateOperation(MaintenanceOperation[ModifyOngoingTaskResult]): def __init__( self, task_name_or_id: Union[int, str], type_of_task: Optional[OngoingTaskType], disable: Optional[bool] @@ -454,6 +635,8 @@ def _deserialize_task( return OngoingTaskGenAi.from_json(json_dict) elif self._task_type == OngoingTaskType.EMBEDDINGS_GENERATION: return OngoingTaskEmbeddingsGeneration.from_json(json_dict) + elif self._task_type == OngoingTaskType.PULL_REPLICATION_AS_SINK: + return OngoingTaskPullReplicationAsSink.from_json(json_dict) else: # todo: handle more types of tasks return OngoingTask.from_json(json_dict) diff --git a/ravendb/documents/operations/replication/definitions.py b/ravendb/documents/operations/replication/definitions.py index 51d2d7d8..333f71b0 100644 --- a/ravendb/documents/operations/replication/definitions.py +++ b/ravendb/documents/operations/replication/definitions.py @@ -1,28 +1,290 @@ -# todo: implement -from datetime import datetime -from typing import Optional, List, Dict +from __future__ import annotations + +import enum +from datetime import datetime, timedelta +from typing import Optional, List, Dict, Any from ravendb.tools.utils import Utils +class PullReplicationMode(enum.Enum): + """Direction of data flow between a pull-replication hub and sink (the C# ``[Flags]`` + combination serializes as the comma-separated string the server expects/returns).""" + + NONE = "None" + HUB_TO_SINK = "HubToSink" + SINK_TO_HUB = "SinkToHub" + HUB_TO_SINK_AND_SINK_TO_HUB = "HubToSink, SinkToHub" + + def __str__(self) -> str: + return self.value + + +class PreventDeletionsMode(enum.Enum): + NONE = "None" + PREVENT_SINK_TO_HUB_DELETIONS = "PreventSinkToHubDeletions" + + def __str__(self) -> str: + return self.value + + +class ReplicationType(enum.Enum): + EXTERNAL = "External" + PULL_AS_SINK = "PullAsSink" + PULL_AS_HUB = "PullAsHub" + INTERNAL = "Internal" + MIGRATION = "Migration" + + def __str__(self) -> str: + return self.value + + class ReplicationNode: - pass + def __init__(self, url: str = None, database: str = None, disabled: bool = False): + self.url = url.rstrip("/") if url else url + self.database = database + self.disabled = disabled + + def get_replication_type(self) -> Optional[ReplicationType]: + return None + + def to_json(self) -> Dict[str, Any]: + replication_type = self.get_replication_type() + return { + "Database": self.database, + "Url": self.url, + "Disabled": self.disabled, + "Type": replication_type.value if replication_type is not None else None, + } class ExternalReplicationBase(ReplicationNode): - pass + def __init__( + self, + database: str = None, + connection_string_name: str = None, + name: str = None, + mentor_node: str = None, + pin_to_mentor_node: bool = False, + task_id: int = 0, + url: str = None, + disabled: bool = False, + ): + super().__init__(url, database, disabled) + self.task_id = task_id + self.name = name + self.connection_string_name = connection_string_name + self.mentor_node = mentor_node + self.pin_to_mentor_node = pin_to_mentor_node + + def to_json(self) -> Dict[str, Any]: + json_dict = super().to_json() + json_dict.update( + { + "TaskId": self.task_id, + "Name": self.name, + "MentorNode": self.mentor_node, + "PinToMentorNode": self.pin_to_mentor_node, + "ConnectionStringName": self.connection_string_name, + } + ) + return json_dict + + def _fill_from_json(self, json_dict: Dict[str, Any]) -> None: + self.url = json_dict.get("Url") + self.database = json_dict.get("Database") + self.disabled = json_dict.get("Disabled", False) + self.task_id = json_dict.get("TaskId", 0) + self.name = json_dict.get("Name") + self.connection_string_name = json_dict.get("ConnectionStringName") + self.mentor_node = json_dict.get("MentorNode") + self.pin_to_mentor_node = json_dict.get("PinToMentorNode", False) class ExternalReplication(ExternalReplicationBase): - pass + def __init__( + self, + database: str = None, + connection_string_name: str = None, + delay_replication_for: timedelta = None, + name: str = None, + mentor_node: str = None, + pin_to_mentor_node: bool = False, + task_id: int = 0, + ): + super().__init__( + database=database, + connection_string_name=connection_string_name, + name=name, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + task_id=task_id, + ) + self.delay_replication_for = delay_replication_for + def get_replication_type(self) -> ReplicationType: + return ReplicationType.EXTERNAL -class PullReplicationAsSink(ExternalReplicationBase): - pass + def to_json(self) -> Dict[str, Any]: + json_dict = super().to_json() + json_dict["DelayReplicationFor"] = ( + Utils.timedelta_to_str(self.delay_replication_for) if self.delay_replication_for is not None else None + ) + return json_dict class PullReplicationDefinition: - pass + """A pull-replication hub definition (the source side of a pull-replication task).""" + + def __init__( + self, + name: str = None, + delay_replication_for: timedelta = None, + mentor_node: str = None, + disabled: bool = False, + mode: PullReplicationMode = None, + task_id: int = 0, + with_filtering: bool = False, + prevent_deletions_mode: PreventDeletionsMode = None, + pin_to_mentor_node: bool = False, + ): + self.name = name + self.delay_replication_for = delay_replication_for + self.mentor_node = mentor_node + self.disabled = disabled + self.mode = mode if mode is not None else PullReplicationMode.HUB_TO_SINK + self.task_id = task_id + self.with_filtering = with_filtering + self.prevent_deletions_mode = prevent_deletions_mode + self.pin_to_mentor_node = pin_to_mentor_node + + def to_json(self) -> Dict[str, Any]: + return { + "Name": self.name, + "DelayReplicationFor": ( + Utils.timedelta_to_str(self.delay_replication_for) if self.delay_replication_for is not None else None + ), + "MentorNode": self.mentor_node, + "PinToMentorNode": self.pin_to_mentor_node, + "Disabled": self.disabled, + "Mode": self.mode.value if self.mode else None, + "TaskId": self.task_id, + "WithFiltering": self.with_filtering, + "PreventDeletionsMode": self.prevent_deletions_mode.value if self.prevent_deletions_mode else None, + } + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[PullReplicationDefinition]: + if json_dict is None: + return None + delay = json_dict.get("DelayReplicationFor") + mode = json_dict.get("Mode") + prevent = json_dict.get("PreventDeletionsMode") + return cls( + name=json_dict.get("Name"), + delay_replication_for=Utils.string_to_timedelta(delay) if delay else None, + mentor_node=json_dict.get("MentorNode"), + disabled=json_dict.get("Disabled", False), + mode=PullReplicationMode(mode) if mode else None, + task_id=json_dict.get("TaskId", 0), + with_filtering=json_dict.get("WithFiltering", False), + prevent_deletions_mode=PreventDeletionsMode(prevent) if prevent else None, + pin_to_mentor_node=json_dict.get("PinToMentorNode", False), + ) + + +class PullReplicationAsSink(ExternalReplicationBase): + """A pull-replication sink definition (the destination side of a pull-replication task).""" + + def __init__( + self, + database: str = None, + connection_string_name: str = None, + hub_name: str = None, + mode: PullReplicationMode = None, + allowed_hub_to_sink_paths: List[str] = None, + allowed_sink_to_hub_paths: List[str] = None, + certificate_with_private_key: str = None, + certificate_password: str = None, + access_name: str = None, + name: str = None, + mentor_node: str = None, + pin_to_mentor_node: bool = False, + task_id: int = 0, + ): + super().__init__( + database=database, + connection_string_name=connection_string_name, + name=name, + mentor_node=mentor_node, + pin_to_mentor_node=pin_to_mentor_node, + task_id=task_id, + ) + self.hub_name = hub_name + self.mode = mode if mode is not None else PullReplicationMode.HUB_TO_SINK + self.allowed_hub_to_sink_paths = allowed_hub_to_sink_paths + self.allowed_sink_to_hub_paths = allowed_sink_to_hub_paths + self.certificate_with_private_key = certificate_with_private_key + self.certificate_password = certificate_password + self.access_name = access_name + + def get_replication_type(self) -> ReplicationType: + return ReplicationType.PULL_AS_SINK + + def to_json(self) -> Dict[str, Any]: + json_dict = super().to_json() + json_dict.update( + { + "Mode": self.mode.value if self.mode else None, + "HubName": self.hub_name, + "AllowedHubToSinkPaths": self.allowed_hub_to_sink_paths, + "AllowedSinkToHubPaths": self.allowed_sink_to_hub_paths, + "CertificateWithPrivateKey": self.certificate_with_private_key, + "CertificatePassword": self.certificate_password, + "AccessName": self.access_name, + } + ) + return json_dict + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[PullReplicationAsSink]: + if json_dict is None: + return None + obj = cls() + obj._fill_from_json(json_dict) + mode = json_dict.get("Mode") + obj.mode = PullReplicationMode(mode) if mode else None + obj.hub_name = json_dict.get("HubName") + obj.allowed_hub_to_sink_paths = json_dict.get("AllowedHubToSinkPaths") + obj.allowed_sink_to_hub_paths = json_dict.get("AllowedSinkToHubPaths") + obj.certificate_with_private_key = json_dict.get("CertificateWithPrivateKey") + obj.certificate_password = json_dict.get("CertificatePassword") + obj.access_name = json_dict.get("AccessName") + return obj + + +class ReplicationHubAccess: + """Grants a sink (identified by a certificate) access to a replication hub.""" + + def __init__( + self, + name: str = None, + certificate_base64: str = None, + allowed_hub_to_sink_paths: List[str] = None, + allowed_sink_to_hub_paths: List[str] = None, + ): + self.name = name + self.certificate_base64 = certificate_base64 + self.allowed_hub_to_sink_paths = allowed_hub_to_sink_paths + self.allowed_sink_to_hub_paths = allowed_sink_to_hub_paths + + def to_json(self) -> Dict[str, Any]: + return { + "Name": self.name, + "CertificateBase64": self.certificate_base64, + "AllowedHubToSinkPaths": self.allowed_hub_to_sink_paths, + "AllowedSinkToHubPaths": self.allowed_sink_to_hub_paths, + } class DetailedReplicationHubAccess: @@ -60,3 +322,50 @@ def to_json(self) -> Dict: "AllowedHubToSinkPaths": self.allowed_hub_to_sink_paths, "AllowedSinkToHubPaths": self.allowed_sink_to_hub_paths, } + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[DetailedReplicationHubAccess]: + if json_dict is None: + return None + not_before = json_dict.get("NotBefore") + not_after = json_dict.get("NotAfter") + return cls( + name=json_dict.get("Name"), + thumbprint=json_dict.get("Thumbprint"), + certificate=json_dict.get("Certificate"), + not_before=Utils.string_to_datetime(not_before) if not_before else None, + not_after=Utils.string_to_datetime(not_after) if not_after else None, + subject=json_dict.get("Subject"), + issuer=json_dict.get("Issuer"), + allowed_hub_to_sink_paths=json_dict.get("AllowedHubToSinkPaths"), + allowed_sink_to_hub_paths=json_dict.get("AllowedSinkToHubPaths"), + ) + + +class ReplicationHubAccessResult: + def __init__(self, results: List[DetailedReplicationHubAccess] = None): + self.results = results + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> ReplicationHubAccessResult: + results = json_dict.get("Results") or [] + return cls([DetailedReplicationHubAccess.from_json(item) for item in results]) + + +class PullReplicationDefinitionAndCurrentConnections: + def __init__(self, definition: PullReplicationDefinition = None, ongoing_tasks: List = None): + self.definition = definition + self.ongoing_tasks = ongoing_tasks + + @classmethod + def from_json(cls, json_dict: Dict[str, Any]) -> Optional[PullReplicationDefinitionAndCurrentConnections]: + if json_dict is None: + return None + from ravendb.documents.operations.ongoing_tasks import OngoingTaskPullReplicationAsHub + + definition = json_dict.get("Definition") + ongoing = json_dict.get("OngoingTasks") or [] + return cls( + definition=PullReplicationDefinition.from_json(definition) if definition else None, + ongoing_tasks=[OngoingTaskPullReplicationAsHub.from_json(item) for item in ongoing], + ) diff --git a/ravendb/documents/operations/replication/pull_replication.py b/ravendb/documents/operations/replication/pull_replication.py new file mode 100644 index 00000000..69fd06b4 --- /dev/null +++ b/ravendb/documents/operations/replication/pull_replication.py @@ -0,0 +1,289 @@ +from __future__ import annotations + +import http +import json +from typing import List, Optional, TYPE_CHECKING + +import requests + +from ravendb.documents.operations.definitions import MaintenanceOperation, VoidMaintenanceOperation +from ravendb.documents.operations.replication.definitions import ( + DetailedReplicationHubAccess, + ExternalReplication, + PullReplicationAsSink, + PullReplicationDefinition, + PullReplicationDefinitionAndCurrentConnections, + ReplicationHubAccess, + ReplicationHubAccessResult, +) +from ravendb.exceptions.raven_exceptions import ReplicationHubNotFoundException +from ravendb.http.raven_command import RavenCommand, RavenCommandResponseType, VoidRavenCommand +from ravendb.http.topology import RaftCommand +from ravendb.serverwide.operations.common import ModifyOngoingTaskResult +from ravendb.tools.utils import Utils +from ravendb.util.util import RaftIdGenerator + +if TYPE_CHECKING: + from ravendb.documents.conventions import DocumentConventions + from ravendb.http.server_node import ServerNode + + +class PutPullReplicationAsHubOperation(MaintenanceOperation[ModifyOngoingTaskResult]): + """Creates or updates a pull-replication hub task on the (source) database.""" + + def __init__( + self, + pull_replication_definition: Optional[PullReplicationDefinition] = None, + name: str = None, + ): + if pull_replication_definition is None: + if not name or name.isspace(): + raise ValueError("Name cannot be None or whitespace") + pull_replication_definition = PullReplicationDefinition(name) + + if not pull_replication_definition.name or pull_replication_definition.name.isspace(): + raise ValueError("PullReplicationDefinition name cannot be None or whitespace") + + self._pull_replication_definition = pull_replication_definition + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[ModifyOngoingTaskResult]: + return self.PutPullReplicationAsHubCommand(self._pull_replication_definition) + + class PutPullReplicationAsHubCommand(RavenCommand[ModifyOngoingTaskResult], RaftCommand): + def __init__(self, pull_replication_definition: PullReplicationDefinition): + super().__init__(ModifyOngoingTaskResult) + self._pull_replication_definition = pull_replication_definition + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub" + request = requests.Request("PUT", url) + request.data = self._pull_replication_definition.to_json() + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ModifyOngoingTaskResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class UpdatePullReplicationAsSinkOperation(MaintenanceOperation[ModifyOngoingTaskResult]): + """Creates or updates a pull-replication sink task on the (destination) database.""" + + def __init__(self, pull_replication: PullReplicationAsSink, use_server_certificate: bool = False): + if pull_replication is None: + raise ValueError("PullReplicationAsSink cannot be None") + self._pull_replication = pull_replication + self._use_server_certificate = use_server_certificate + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[ModifyOngoingTaskResult]: + return self.UpdatePullReplicationAsSinkCommand(self._pull_replication, self._use_server_certificate) + + class UpdatePullReplicationAsSinkCommand(RavenCommand[ModifyOngoingTaskResult], RaftCommand): + def __init__(self, pull_replication: PullReplicationAsSink, use_server_certificate: bool): + super().__init__(ModifyOngoingTaskResult) + self._pull_replication = pull_replication + self._use_server_certificate = use_server_certificate + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/tasks/sink-pull-replication" + sink_json = self._pull_replication.to_json() + if self._use_server_certificate: + sink_json["CertificateWithPrivateKey"] = None + request = requests.Request("POST", url) + request.data = {"PullReplicationAsSink": sink_json} + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ModifyOngoingTaskResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class UpdateExternalReplicationOperation(MaintenanceOperation[ModifyOngoingTaskResult]): + """Creates or updates an external-replication task on the database.""" + + def __init__(self, new_watcher: ExternalReplication): + if new_watcher is None: + raise ValueError("ExternalReplication cannot be None") + self._new_watcher = new_watcher + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[ModifyOngoingTaskResult]: + return self.UpdateExternalReplicationCommand(self._new_watcher) + + class UpdateExternalReplicationCommand(RavenCommand[ModifyOngoingTaskResult], RaftCommand): + def __init__(self, new_watcher: ExternalReplication): + super().__init__(ModifyOngoingTaskResult) + self._new_watcher = new_watcher + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/admin/tasks/external-replication" + request = requests.Request("POST", url) + request.data = {"Watcher": self._new_watcher.to_json()} + return request + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self._throw_invalid_response() + self.result = ModifyOngoingTaskResult.from_json(json.loads(response)) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class RegisterReplicationHubAccessOperation(MaintenanceOperation[None]): + """Registers a sink's certificate access against an existing replication hub.""" + + def __init__(self, hub_name: str, access: ReplicationHubAccess): + if not hub_name or hub_name.isspace(): + raise ValueError("Hub name cannot be None or whitespace") + if access is None: + raise ValueError("Access cannot be None") + self._hub_name = hub_name + self._access = access + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[None]: + return self.RegisterReplicationHubAccessCommand(self._hub_name, self._access) + + class RegisterReplicationHubAccessCommand(RavenCommand[None], RaftCommand): + def __init__(self, hub_name: str, access: ReplicationHubAccess): + super().__init__() + self._hub_name = hub_name + self._access = access + self._response_type = RavenCommandResponseType.RAW + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = ( + f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub/access" + f"?name={Utils.quote_key(self._hub_name)}" + ) + request = requests.Request("PUT", url) + request.data = self._access.to_json() + return request + + def set_response_raw(self, response: requests.Response, stream: bytes) -> None: + if response.status_code == http.HTTPStatus.NOT_FOUND: + raise ReplicationHubNotFoundException( + f"The replication hub {self._hub_name} was not found on the database. " + f"Did you forget to define it first?" + ) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class UnregisterReplicationHubAccessOperation(VoidMaintenanceOperation): + """Revokes a previously-registered sink certificate (by thumbprint) from a replication hub.""" + + def __init__(self, hub_name: str, thumbprint: str): + if not hub_name or hub_name.isspace(): + raise ValueError("Hub name cannot be None or whitespace") + if not thumbprint or thumbprint.isspace(): + raise ValueError("Thumbprint cannot be None or whitespace") + self._hub_name = hub_name + self._thumbprint = thumbprint + + def get_command(self, conventions: "DocumentConventions") -> VoidRavenCommand: + return self.UnregisterReplicationHubAccessCommand(self._hub_name, self._thumbprint) + + class UnregisterReplicationHubAccessCommand(VoidRavenCommand, RaftCommand): + def __init__(self, hub_name: str, thumbprint: str): + super().__init__() + self._hub_name = hub_name + self._thumbprint = thumbprint + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = ( + f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub/access" + f"?name={Utils.quote_key(self._hub_name)}&thumbprint={Utils.quote_key(self._thumbprint)}" + ) + return requests.Request("DELETE", url) + + def get_raft_unique_request_id(self) -> str: + return RaftIdGenerator.new_id() + + +class GetReplicationHubAccessOperation(MaintenanceOperation[List[DetailedReplicationHubAccess]]): + """Lists the sinks (certificates) registered to access a replication hub.""" + + def __init__(self, hub_name: str, start: int = 0, page_size: int = 25): + if not hub_name or hub_name.isspace(): + raise ValueError("Hub name cannot be None or whitespace") + self._hub_name = hub_name + self._start = start + self._page_size = page_size + + def get_command(self, conventions: "DocumentConventions") -> RavenCommand[List[DetailedReplicationHubAccess]]: + return self.GetReplicationHubAccessCommand(self._hub_name, self._start, self._page_size) + + class GetReplicationHubAccessCommand(RavenCommand[List[DetailedReplicationHubAccess]]): + def __init__(self, hub_name: str, start: int, page_size: int): + super().__init__(DetailedReplicationHubAccess) + self._hub_name = hub_name + self._start = start + self._page_size = page_size + + def is_read_request(self) -> bool: + return True + + def create_request(self, node: "ServerNode") -> requests.Request: + url = ( + f"{node.url}/databases/{node.database}/admin/tasks/pull-replication/hub/access" + f"?name={Utils.quote_key(self._hub_name)}&start={self._start}&pageSize={self._page_size}" + ) + return requests.Request("GET", url) + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + self.result = [] + return + self.result = ReplicationHubAccessResult.from_json(json.loads(response)).results + + +class GetPullReplicationTasksInfoOperation(MaintenanceOperation[PullReplicationDefinitionAndCurrentConnections]): + """Retrieves a pull-replication hub definition together with its current sink connections.""" + + def __init__(self, task_id: int): + self._task_id = task_id + + def get_command( + self, conventions: "DocumentConventions" + ) -> RavenCommand[PullReplicationDefinitionAndCurrentConnections]: + return self.GetPullReplicationTasksInfoCommand(self._task_id) + + class GetPullReplicationTasksInfoCommand(RavenCommand[PullReplicationDefinitionAndCurrentConnections]): + def __init__(self, task_id: int): + super().__init__(PullReplicationDefinitionAndCurrentConnections) + self._task_id = task_id + + def is_read_request(self) -> bool: + return False + + def create_request(self, node: "ServerNode") -> requests.Request: + url = f"{node.url}/databases/{node.database}/tasks/pull-replication/hub?key={self._task_id}" + return requests.Request("GET", url) + + def set_response(self, response: Optional[str], from_cache: bool) -> None: + if response is None: + return + self.result = PullReplicationDefinitionAndCurrentConnections.from_json(json.loads(response)) diff --git a/ravendb/exceptions/exception_dispatcher.py b/ravendb/exceptions/exception_dispatcher.py index 4d80bed6..b06ca2a2 100644 --- a/ravendb/exceptions/exception_dispatcher.py +++ b/ravendb/exceptions/exception_dispatcher.py @@ -20,6 +20,7 @@ RateLimitException, RavenException, RefusedToAnswerException, + ReplicationHubNotFoundException, SchemaValidationException, TooManyRequestsException, TooManyTokensException, @@ -54,6 +55,8 @@ "BulkInsertProtocolViolationException": BulkInsertProtocolViolationException, # schema validation "SchemaValidationException": SchemaValidationException, + # replication + "ReplicationHubNotFoundException": ReplicationHubNotFoundException, # cluster "NodeIsPassiveException": NodeIsPassiveException, "NoLoaderException": NoLoaderException, diff --git a/ravendb/exceptions/raven_exceptions.py b/ravendb/exceptions/raven_exceptions.py index 34c507c0..6b6efee9 100644 --- a/ravendb/exceptions/raven_exceptions.py +++ b/ravendb/exceptions/raven_exceptions.py @@ -89,3 +89,8 @@ class MissingAiAgentParameterException(RavenException): class SchemaValidationException(RavenException): def __init__(self, message: str = None): super().__init__(message) + + +class ReplicationHubNotFoundException(RavenException): + def __init__(self, message: str = None, cause: BaseException = None): + super().__init__(message, cause) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/__init__.py b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py new file mode 100644 index 00000000..82246084 --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication.py @@ -0,0 +1,202 @@ +import http +import os +import unittest +from datetime import timedelta +from types import SimpleNamespace + +from ravendb import ( + ExternalReplication, + GetReplicationHubAccessOperation, + GetPullReplicationTasksInfoOperation, + PreventDeletionsMode, + PullReplicationAsSink, + PullReplicationDefinition, + PullReplicationMode, + PutPullReplicationAsHubOperation, + RegisterReplicationHubAccessOperation, + ReplicationHubAccess, + UnregisterReplicationHubAccessOperation, + UpdateExternalReplicationOperation, + UpdatePullReplicationAsSinkOperation, +) +from ravendb.documents.operations.etl.configuration import RavenConnectionString +from ravendb.documents.operations.connection_string.put_connection_string_operation import ( + PutConnectionStringOperation, +) +from ravendb.documents.operations.ongoing_tasks import ( + GetOngoingTaskInfoOperation, + OngoingTaskPullReplicationAsSink, + OngoingTaskType, +) +from ravendb.exceptions.raven_exceptions import ReplicationHubNotFoundException +from ravendb.tests.test_base import TestBase + + +class TestPullReplication(TestBase): + def setUp(self): + super().setUp() + + def _put_hub(self, name, **kwargs): + definition = PullReplicationDefinition(name, **kwargs) + result = self.store.maintenance.send(PutPullReplicationAsHubOperation(definition)) + self.assertIsNotNone(result) + self.assertIsNotNone(result.task_id) + return result + + def _put_connection_string(self, name, database): + connection_string = RavenConnectionString( + name=name, database=database, topology_discovery_urls=list(self.store.urls) + ) + self.store.maintenance.send(PutConnectionStringOperation(connection_string)) + return name + + @unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") + def test_can_put_pull_replication_as_hub(self): + result = self._put_hub("hub-1") + self.assertGreater(result.task_id, 0) + + @unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") + def test_can_put_pull_replication_as_hub_by_name(self): + result = self.store.maintenance.send(PutPullReplicationAsHubOperation(name="hub-by-name")) + self.assertIsNotNone(result.task_id) + + @unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") + def test_can_get_pull_replication_tasks_info(self): + result = self._put_hub("hub-info", delay_replication_for=timedelta(seconds=30), disabled=True) + + info = self.store.maintenance.send(GetPullReplicationTasksInfoOperation(result.task_id)) + self.assertIsNotNone(info) + self.assertIsNotNone(info.definition) + self.assertEqual("hub-info", info.definition.name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK, info.definition.mode) + self.assertEqual(timedelta(seconds=30), info.definition.delay_replication_for) + self.assertTrue(info.definition.disabled) + self.assertIsNotNone(info.ongoing_tasks) + + @unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") + def test_can_update_pull_replication_as_sink(self): + connection_string_name = self._put_connection_string("cs-sink", "sink-remote-db") + sink = PullReplicationAsSink( + database=self.store.database, + connection_string_name=connection_string_name, + hub_name="remote-hub", + name="my-sink", + ) + + result = self.store.maintenance.send(UpdatePullReplicationAsSinkOperation(sink)) + self.assertIsNotNone(result) + self.assertIsNotNone(result.task_id) + + info = self.store.maintenance.send( + GetOngoingTaskInfoOperation(result.task_id, OngoingTaskType.PULL_REPLICATION_AS_SINK) + ) + self.assertIsInstance(info, OngoingTaskPullReplicationAsSink) + self.assertEqual("remote-hub", info.hub_name) + self.assertEqual("my-sink", info.task_name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK, info.mode) + self.assertEqual(connection_string_name, info.connection_string_name) + self.assertEqual("sink-remote-db", info.destination_database) + + @unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") + def test_can_update_external_replication(self): + connection_string_name = self._put_connection_string("cs-ext", "external-remote-db") + external = ExternalReplication( + database=self.store.database, + connection_string_name=connection_string_name, + name="my-external-replication", + ) + + result = self.store.maintenance.send(UpdateExternalReplicationOperation(external)) + self.assertIsNotNone(result) + self.assertIsNotNone(result.task_id) + + @unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") + def test_get_replication_hub_access_is_empty_for_new_hub(self): + self._put_hub("hub-no-access") + access = self.store.maintenance.send(GetReplicationHubAccessOperation("hub-no-access")) + self.assertEqual([], access) + + def test_register_hub_access_maps_404_to_replication_hub_not_found(self): + # The register command reads a raw response; a 404 (hub not found) must surface as + # ReplicationHubNotFoundException, matching the C# client. + command = RegisterReplicationHubAccessOperation("missing-hub", ReplicationHubAccess(name="s")).get_command(None) + self.assertRaises( + ReplicationHubNotFoundException, + command.set_response_raw, + SimpleNamespace(status_code=http.HTTPStatus.NOT_FOUND), + None, + ) + # A successful (non-404) response must not raise. + command.set_response_raw(SimpleNamespace(status_code=http.HTTPStatus.OK), None) + + @unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") + def test_unregister_replication_hub_access_is_noop_for_unknown_thumbprint(self): + self._put_hub("hub-unregister") + # Removing an unknown thumbprint from an existing hub must not raise. + self.store.maintenance.send( + UnregisterReplicationHubAccessOperation("hub-unregister", "0123456789ABCDEF0123456789ABCDEF01234567") + ) + + def test_operations_validate_arguments_client_side(self): + self.assertRaises(ValueError, lambda: PutPullReplicationAsHubOperation()) + self.assertRaises(ValueError, lambda: RegisterReplicationHubAccessOperation("", ReplicationHubAccess())) + self.assertRaises(ValueError, lambda: RegisterReplicationHubAccessOperation("hub", None)) + self.assertRaises(ValueError, lambda: UnregisterReplicationHubAccessOperation("hub", "")) + self.assertRaises(ValueError, lambda: GetReplicationHubAccessOperation("")) + self.assertRaises(ValueError, lambda: UpdatePullReplicationAsSinkOperation(None)) + + def test_pull_replication_definition_json_round_trip(self): + definition = PullReplicationDefinition( + "round-trip", + delay_replication_for=timedelta(minutes=5), + mode=PullReplicationMode.HUB_TO_SINK_AND_SINK_TO_HUB, + with_filtering=True, + prevent_deletions_mode=PreventDeletionsMode.PREVENT_SINK_TO_HUB_DELETIONS, + disabled=True, + ) + + parsed = PullReplicationDefinition.from_json(definition.to_json()) + self.assertEqual("round-trip", parsed.name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK_AND_SINK_TO_HUB, parsed.mode) + self.assertTrue(parsed.with_filtering) + self.assertTrue(parsed.disabled) + self.assertEqual(PreventDeletionsMode.PREVENT_SINK_TO_HUB_DELETIONS, parsed.prevent_deletions_mode) + self.assertEqual(timedelta(minutes=5), parsed.delay_replication_for) + + def test_pull_replication_as_sink_json_round_trip(self): + sink = PullReplicationAsSink( + database="db", + connection_string_name="cs", + hub_name="h", + name="s", + mode=PullReplicationMode.HUB_TO_SINK, + allowed_hub_to_sink_paths=["users/*"], + ) + + parsed = PullReplicationAsSink.from_json(sink.to_json()) + self.assertEqual("h", parsed.hub_name) + self.assertEqual("cs", parsed.connection_string_name) + self.assertEqual("db", parsed.database) + self.assertEqual("s", parsed.name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK, parsed.mode) + self.assertEqual(["users/*"], parsed.allowed_hub_to_sink_paths) + + def test_replication_node_to_json_includes_type(self): + sink_json = PullReplicationAsSink(database="db", connection_string_name="cs", hub_name="h").to_json() + self.assertEqual("PullAsSink", sink_json["Type"]) + external_json = ExternalReplication(database="db", connection_string_name="cs").to_json() + self.assertEqual("External", external_json["Type"]) + + def test_update_sink_with_server_certificate_omits_private_key(self): + node = SimpleNamespace(url="http://localhost:8080", database="db") + sink = PullReplicationAsSink( + database="db", connection_string_name="cs", hub_name="h", name="s", certificate_with_private_key="SECRET" + ) + + with_server_cert = UpdatePullReplicationAsSinkOperation(sink, use_server_certificate=True).get_command(None) + sent = with_server_cert.create_request(node).data["PullReplicationAsSink"] + self.assertIsNone(sent["CertificateWithPrivateKey"]) + + without = UpdatePullReplicationAsSinkOperation(sink, use_server_certificate=False).get_command(None) + sent = without.create_request(node).data["PullReplicationAsSink"] + self.assertEqual("SECRET", sent["CertificateWithPrivateKey"]) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication_secured.py b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication_secured.py new file mode 100644 index 00000000..6721f0f6 --- /dev/null +++ b/ravendb/tests/jvm_migrated_tests/client_tests/replication_tests/test_pull_replication_secured.py @@ -0,0 +1,123 @@ +import base64 +import hashlib +import os +import re +import unittest + +from ravendb import ( + GetPullReplicationTasksInfoOperation, + GetReplicationHubAccessOperation, + PreventDeletionsMode, + PullReplicationAsSink, + PullReplicationDefinition, + PullReplicationMode, + PutPullReplicationAsHubOperation, + RegisterReplicationHubAccessOperation, + ReplicationHubAccess, + UnregisterReplicationHubAccessOperation, + UpdatePullReplicationAsSinkOperation, +) +from ravendb.documents.operations.connection_string.put_connection_string_operation import ( + PutConnectionStringOperation, +) +from ravendb.documents.operations.etl.configuration import RavenConnectionString +from ravendb.documents.operations.ongoing_tasks import ( + GetOngoingTaskInfoOperation, + OngoingTaskPullReplicationAsSink, + OngoingTaskType, +) +from ravendb.exceptions.raven_exceptions import RavenException +from ravendb.tests.test_base import TestBase + + +def _certificate_base64_and_thumbprint(pem_path): + """Extract the certificate (base64 DER) and its SHA-1 thumbprint from a PEM file.""" + with open(pem_path, "r") as pem_file: + pem = pem_file.read() + match = re.search(r"-----BEGIN CERTIFICATE-----(.+?)-----END CERTIFICATE-----", pem, re.DOTALL) + certificate_base64 = "".join(match.group(1).split()) + der = base64.b64decode(certificate_base64) + return certificate_base64, hashlib.sha1(der).hexdigest().upper() + + +@unittest.skipIf(os.environ.get("RAVENDB_LICENSE") is None, "Insufficient license permissions. Skipping on CI/CD.") +class TestPullReplicationSecured(TestBase): + """Pull-replication surface that the server only permits over SSL (filtering, the + SinkToHub direction, and certificate-based hub access) - exercised against a secured store.""" + + def setUp(self): + super().setUp() + + def test_can_define_hub_with_filtering_and_combined_mode(self): + store = self.secured_document_store + definition = PullReplicationDefinition( + "filtered-hub", + mode=PullReplicationMode.HUB_TO_SINK_AND_SINK_TO_HUB, + with_filtering=True, + prevent_deletions_mode=PreventDeletionsMode.PREVENT_SINK_TO_HUB_DELETIONS, + ) + + result = store.maintenance.send(PutPullReplicationAsHubOperation(definition)) + self.assertIsNotNone(result.task_id) + + info = store.maintenance.send(GetPullReplicationTasksInfoOperation(result.task_id)) + self.assertEqual("filtered-hub", info.definition.name) + self.assertEqual(PullReplicationMode.HUB_TO_SINK_AND_SINK_TO_HUB, info.definition.mode) + self.assertTrue(info.definition.with_filtering) + self.assertEqual(PreventDeletionsMode.PREVENT_SINK_TO_HUB_DELETIONS, info.definition.prevent_deletions_mode) + + def test_register_get_and_unregister_replication_hub_access(self): + store = self.secured_document_store + store.maintenance.send( + PutPullReplicationAsHubOperation(PullReplicationDefinition("access-hub", with_filtering=True)) + ) + + certificate_base64, thumbprint = _certificate_base64_and_thumbprint(self.test_client_certificate_url) + access = ReplicationHubAccess( + name="sink-access", certificate_base64=certificate_base64, allowed_hub_to_sink_paths=["users/*"] + ) + store.maintenance.send(RegisterReplicationHubAccessOperation("access-hub", access)) + + registered = store.maintenance.send(GetReplicationHubAccessOperation("access-hub")) + self.assertEqual(1, len(registered)) + self.assertEqual(thumbprint.lower(), registered[0].thumbprint.lower()) + self.assertEqual(1, len(registered[0].allowed_hub_to_sink_paths)) + + store.maintenance.send(UnregisterReplicationHubAccessOperation("access-hub", thumbprint)) + self.assertEqual([], store.maintenance.send(GetReplicationHubAccessOperation("access-hub"))) + + def test_register_replication_hub_access_for_missing_hub_raises(self): + store = self.secured_document_store + certificate_base64, _ = _certificate_base64_and_thumbprint(self.test_client_certificate_url) + operation = RegisterReplicationHubAccessOperation( + "hub-that-does-not-exist", ReplicationHubAccess(name="sink", certificate_base64=certificate_base64) + ) + # The client maps a 404 to ReplicationHubNotFoundException; the 7.2 server reports a + # missing hub as a 500 instead, so assert the base RavenException (covers either). + self.assertRaises(RavenException, store.maintenance.send, operation) + + def test_can_define_sink_with_server_certificate(self): + store = self.secured_document_store + connection_string = RavenConnectionString( + name="cs-secured-sink", database="sink-remote-db", topology_discovery_urls=list(store.urls) + ) + store.maintenance.send(PutConnectionStringOperation(connection_string)) + + sink = PullReplicationAsSink( + database=store.database, + connection_string_name="cs-secured-sink", + hub_name="remote-hub", + name="server-cert-sink", + ) + result = store.maintenance.send(UpdatePullReplicationAsSinkOperation(sink, use_server_certificate=True)) + self.assertIsNotNone(result.task_id) + + info = store.maintenance.send( + GetOngoingTaskInfoOperation(result.task_id, OngoingTaskType.PULL_REPLICATION_AS_SINK) + ) + self.assertIsInstance(info, OngoingTaskPullReplicationAsSink) + # use_server_certificate sends no client key; the secured server still accepts and + # stores the sink (it authenticates with its own certificate at connection time). + self.assertEqual("remote-hub", info.hub_name) + self.assertEqual("server-cert-sink", info.task_name) + self.assertEqual("cs-secured-sink", info.connection_string_name) diff --git a/ravendb/tests/test_imports.py b/ravendb/tests/test_imports.py index bdba36c2..0f00b9fd 100644 --- a/ravendb/tests/test_imports.py +++ b/ravendb/tests/test_imports.py @@ -150,22 +150,22 @@ def test_imports_at_top_level(self): # from ravendb import SeedIdentityForOperation # from ravendb import IOperationProgress # from ravendb import IOperationResult - # from ravendb import UpdateExternalReplicationOperation + from ravendb import UpdateExternalReplicationOperation + from ravendb import PullReplicationDefinitionAndCurrentConnections + from ravendb import PutPullReplicationAsHubOperation + from ravendb import DetailedReplicationHubAccess + from ravendb import GetReplicationHubAccessOperation + from ravendb import PreventDeletionsMode + from ravendb import PullReplicationMode + from ravendb import RegisterReplicationHubAccessOperation + from ravendb import ReplicationHubAccess + from ravendb import ReplicationHubAccessResult + from ravendb import UnregisterReplicationHubAccessOperation + from ravendb import UpdatePullReplicationAsSinkOperation + from ravendb import GetPullReplicationTasksInfoOperation - # from ravendb import PullReplicationDefinitionAndCurrentConnections - # from ravendb import PutPullReplicationAsHubOperation - - # from ravendb import DetailedReplicationHubAccess - # from ravendb import GetReplicationHubAccessOperation # from ravendb import IExternalReplication - # from ravendb import PreventDeletionsMode - # from ravendb import PullReplicationMode - # from ravendb import RegisterReplicationHubAccessOperation - # from ravendb import ReplicationHubAccess - # from ravendb import ReplicationHubAccessResult # from ravendb import ReplicationHubAccessResponse - # from ravendb import UnregisterReplicationHubAccessOperation - # from ravendb import UpdatePullReplicationAsSinkOperation # from ravendb import GetConflictsCommand from ravendb import SetIndexesLockOperation from ravendb import SetIndexesPriorityOperation @@ -259,9 +259,9 @@ def test_imports_at_top_level(self): # from ravendb import DisableDatabaseToggleResult # from ravendb import ConfigureExpirationOperation # from ravendb import DeleteOngoingTaskOperation - # from ravendb import GetPullReplicationHubTasksInfoOperation - # from ravendb import OngoingTaskPullReplicationAsSink - # from ravendb import OngoingTaskPullReplicationAsHub + from ravendb import OngoingTaskPullReplicationAsSink + from ravendb import OngoingTaskPullReplicationAsHub + # from ravendb import OngoingTaskType # from ravendb import RunningBackup # from ravendb import NextBackup From 54479b73981e7d28cf51d66a600c788f1f60ee96 Mon Sep 17 00:00:00 2001 From: Gracjan Sadowicz Date: Thu, 25 Jun 2026 01:42:55 +0200 Subject: [PATCH 3/5] Quiet noisy background-thread logs in the test base Silence the subscription worker reporting expected connection drops on teardown so test output stays readable. --- ravendb/tests/test_base.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ravendb/tests/test_base.py b/ravendb/tests/test_base.py index f70110d5..27b6f2a1 100644 --- a/ravendb/tests/test_base.py +++ b/ravendb/tests/test_base.py @@ -1,5 +1,6 @@ import atexit import datetime +import logging import threading import time import unittest @@ -402,6 +403,9 @@ def setConvention(self, conventions): self.conventions = conventions def setUp(self): + # Silence noisy background-thread logs (e.g. the subscription worker reporting + # expected connection drops on teardown) so test output stays readable. + logging.disable(logging.ERROR) RavenTestDriver.__init__(self) self._locator = TestBase.TestServiceLocator() self._secured_locator = TestBase.TestSecuredServiceLocator() From b75596e214475a4f0f63feb0410170416bab70c9 Mon Sep 17 00:00:00 2001 From: Gracjan Sadowicz Date: Thu, 25 Jun 2026 01:53:30 +0200 Subject: [PATCH 4/5] Replace deprecated ssl.PROTOCOL_TLSv1_2 in the TCP client Use ssl.PROTOCOL_TLS_CLIENT and explicitly disable CA verification and hostname checking, preserving the existing exact-certificate-pinning behavior. Verified against a real secured server (secured subscription tests pass). --- ravendb/util/tcp_utils.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ravendb/util/tcp_utils.py b/ravendb/util/tcp_utils.py index 1e624bf4..0957f606 100644 --- a/ravendb/util/tcp_utils.py +++ b/ravendb/util/tcp_utils.py @@ -20,7 +20,13 @@ def connect( is_ssl_socket = server_certificate_base64 and client_certificate_pem_path if is_ssl_socket: - context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + # PROTOCOL_TLS_CLIENT replaces the deprecated PROTOCOL_TLSv1_2. Its defaults turn on + # CA verification and hostname checking, which this code intentionally does not use - + # the server is validated below by exact-certificate pinning - so disable both to keep + # the previous behavior. (check_hostname must be cleared before setting CERT_NONE.) + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE context.load_cert_chain(client_certificate_pem_path, password=certificate_private_key_password) s = context.wrap_socket(s) From f396fe872142a18cdb0ee6675ed7a46c00e9bc73 Mon Sep 17 00:00:00 2001 From: Gracjan Sadowicz Date: Thu, 25 Jun 2026 01:59:47 +0200 Subject: [PATCH 5/5] Migrate test off deprecated CmpXchg.value to RavenDocumentQuery.cmp_xchg CmpXchg.value emits a DeprecationWarning; RavenDocumentQuery.cmp_xchg() builds the same query function without it. --- .../test_queries_with_custom_functions.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ravendb/tests/jvm_migrated_tests/client_tests/queries_tests/test_queries_with_custom_functions.py b/ravendb/tests/jvm_migrated_tests/client_tests/queries_tests/test_queries_with_custom_functions.py index 906a25d2..a64d7123 100644 --- a/ravendb/tests/jvm_migrated_tests/client_tests/queries_tests/test_queries_with_custom_functions.py +++ b/ravendb/tests/jvm_migrated_tests/client_tests/queries_tests/test_queries_with_custom_functions.py @@ -1,5 +1,5 @@ from ravendb.documents.operations.compare_exchange.operations import PutCompareExchangeValueOperation -from ravendb.documents.session.misc import CmpXchg +from ravendb.documents.queries.raven_document_query import RavenDocumentQuery from ravendb.infrastructure.entities import User from ravendb.tests.test_base import TestBase @@ -27,8 +27,8 @@ def test_query_cmp_xchg_where(self): with self.store.open_session() as session: q = ( session.advanced.document_query(object_type=User) - .where_equals("name", CmpXchg.value("Hera")) - .where_equals("last_name", CmpXchg.value("Tom")) + .where_equals("name", RavenDocumentQuery.cmp_xchg("Hera")) + .where_equals("last_name", RavenDocumentQuery.cmp_xchg("Tom")) ) self.assertEqual("from 'Users' where name = cmpxchg($p0) and last_name = cmpxchg($p1)", q.index_query.query) @@ -38,7 +38,9 @@ def test_query_cmp_xchg_where(self): self.assertEqual("Zeus", query_result[0].name) user = list( - session.advanced.document_query(object_type=User).where_not_equals("name", CmpXchg.value("Hera")) + session.advanced.document_query(object_type=User).where_not_equals( + "name", RavenDocumentQuery.cmp_xchg("Hera") + ) ) self.assertEqual(1, len(user))