Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions ravendb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,50 @@
from ravendb.documents.operations.refresh.configuration import RefreshConfiguration
from ravendb.documents.operations.replication.definitions import (
ExternalReplication,
ExternalReplicationBase,
ReplicationNode,
PullReplicationAsSink,
PullReplicationDefinition,
ReplicationNode,
ExternalReplicationBase,
PullReplicationMode,
PreventDeletionsMode,
ReplicationHubAccess,
DetailedReplicationHubAccess,
ReplicationHubAccessResult,
PullReplicationDefinitionAndCurrentConnections,
)
from ravendb.documents.operations.replication.pull_replication import (
PutPullReplicationAsHubOperation,
UpdatePullReplicationAsSinkOperation,
UpdateExternalReplicationOperation,
RegisterReplicationHubAccessOperation,
UnregisterReplicationHubAccessOperation,
GetReplicationHubAccessOperation,
GetPullReplicationTasksInfoOperation,
)
from ravendb.documents.operations.ongoing_tasks import (
OngoingTaskPullReplicationAsSink,
OngoingTaskPullReplicationAsHub,
)
from ravendb.documents.operations.revisions import (
RevisionsCollectionConfiguration,
RevisionsConfiguration,
RevisionsResult,
ConfigureRevisionsOperation,
ConfigureRevisionsOperationResult,
GetRevisionsOperation,
EnforceRevisionsConfigurationOperation,
AdoptOrphanedRevisionsOperation,
DeleteRevisionsOperation,
RevertRevisionsByIdOperation,
ConfigureRevisionsBinCleanerOperation,
ConfigureRevisionsBinCleanerOperationResult,
RevisionsBinConfiguration,
RevisionsOperationParameters,
RevisionsOperationContinuationParameters,
)
from ravendb.serverwide.operations.revisions import (
ConfigureRevisionsForConflictsOperation,
ConfigureRevisionsForConflictsResult,
)
from ravendb.documents.operations.statistics import (
GetCollectionStatisticsOperation,
Expand Down Expand Up @@ -340,7 +376,6 @@

# todo: Serverwide
# ReorderDatabaseMembersOperation
# ConfigureRevisionsForConflictsOperation
# UpdateDatabaseOperation
# GetServerWideBackupConfigurationOperation
# SetDatabaseDynamicDistributionOperation
Expand Down Expand Up @@ -395,17 +430,7 @@
# SeedIdentityForOperation
# IOperationProgress
# IOperationResult
# PullReplicationDefinitionAndCurrentConnections
# DetailedReplicationHubAccess
# GetReplicationHubAccessOperation
# PreventDeletionsMode
# PullReplicationMode
# RegisterReplicationHubAccessOperation
# ReplicationHubAccess
# ReplicationHubAccessResult
# ReplicationHubAccessResponse
# UnregisterReplicationHubAccessOperation
# UpdatePullReplicationAsSinkOperation
# GetConflictsCommand
# PutAttachmentCommandHelper
# SetupDocumentBase
Expand All @@ -420,9 +445,6 @@
# LazyRevisionOperation
# LazyRevisionOperations
# StreamOperation
# ConfigureRevisionsOperation
# GetRevisionsOperation
# RevisionsResult
# GetConnectionStringsOperation
# RemoveConnectionStringOperation
# SqlEtlTable
Expand All @@ -435,9 +457,6 @@
# DisableDatabaseToggleResult
# ConfigureExpirationOperation
# DeleteOngoingTaskOperation
# GetPullReplicationHubTasksInfoOperation
# OngoingTaskPullReplicationAsSink
# OngoingTaskPullReplicationAsHub
# OngoingTaskType
# RunningBackup
# NextBackup
Expand Down
183 changes: 183 additions & 0 deletions ravendb/documents/operations/ongoing_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ravendb.tools.utils import Utils
from ravendb.util.util import RaftIdGenerator
from ravendb.http.raven_command import RavenCommand
from ravendb.documents.operations.replication.definitions import PullReplicationMode

if TYPE_CHECKING:
from ravendb.documents.conventions import DocumentConventions
Expand Down Expand Up @@ -279,6 +280,186 @@ def from_json(cls, json_dict: dict) -> "OngoingTaskEmbeddingsGeneration":
)


class OngoingTaskPullReplicationAsHub(OngoingTask):
"""Ongoing task information for a single pull-replication hub connection."""

def __init__(
self,
task_id: Optional[int] = None,
responsible_node: Optional[NodeId] = None,
task_state: Optional[OngoingTaskState] = None,
task_connection_status: Optional[OngoingTaskConnectionStatus] = None,
task_name: Optional[str] = None,
error: Optional[str] = None,
mentor_node: Optional[str] = None,
pin_to_mentor_node: Optional[bool] = None,
from_to_string: Optional[str] = None,
destination_url: Optional[str] = None,
destination_database: Optional[str] = None,
delay_replication_for=None,
handler_id: Optional[str] = None,
last_accepted_change_vector_from_destination: Optional[str] = None,
source_database_change_vector: Optional[str] = None,
last_sent_etag: Optional[int] = None,
last_database_etag: Optional[int] = None,
):
super().__init__(
task_id=task_id,
task_type=OngoingTaskType.PULL_REPLICATION_AS_HUB,
responsible_node=responsible_node,
task_state=task_state,
task_connection_status=task_connection_status,
task_name=task_name,
error=error,
mentor_node=mentor_node,
pin_to_mentor_node=pin_to_mentor_node,
)
self.from_to_string = from_to_string
self.destination_url = destination_url
self.destination_database = destination_database
self.delay_replication_for = delay_replication_for
self.handler_id = handler_id
self.last_accepted_change_vector_from_destination = last_accepted_change_vector_from_destination
self.source_database_change_vector = source_database_change_vector
self.last_sent_etag = last_sent_etag
self.last_database_etag = last_database_etag

def to_json(self) -> dict:
result = super().to_json()
result["FromToString"] = self.from_to_string
result["DestinationUrl"] = self.destination_url
result["DestinationDatabase"] = self.destination_database
result["DelayReplicationFor"] = (
Utils.timedelta_to_str(self.delay_replication_for) if self.delay_replication_for is not None else None
)
result["HandlerId"] = self.handler_id
result["LastAcceptedChangeVectorFromDestination"] = self.last_accepted_change_vector_from_destination
result["SourceDatabaseChangeVector"] = self.source_database_change_vector
result["LastSentEtag"] = self.last_sent_etag
result["LastDatabaseEtag"] = self.last_database_etag
return result

@classmethod
def from_json(cls, json_dict: dict) -> Optional["OngoingTaskPullReplicationAsHub"]:
if json_dict is None:
return None
task_state_str = json_dict.get("TaskState")
task_connection_status_str = json_dict.get("TaskConnectionStatus")
delay = json_dict.get("DelayReplicationFor")
return cls(
task_id=json_dict.get("TaskId"),
responsible_node=NodeId.from_json(json_dict.get("ResponsibleNode")),
task_state=OngoingTaskState(task_state_str) if task_state_str else None,
task_connection_status=(
OngoingTaskConnectionStatus(task_connection_status_str) if task_connection_status_str else None
),
task_name=json_dict.get("TaskName"),
error=json_dict.get("Error"),
mentor_node=json_dict.get("MentorNode"),
pin_to_mentor_node=json_dict.get("PinToMentorNode"),
from_to_string=json_dict.get("FromToString"),
destination_url=json_dict.get("DestinationUrl"),
destination_database=json_dict.get("DestinationDatabase"),
delay_replication_for=Utils.string_to_timedelta(delay) if delay else None,
handler_id=json_dict.get("HandlerId"),
last_accepted_change_vector_from_destination=json_dict.get("LastAcceptedChangeVectorFromDestination"),
source_database_change_vector=json_dict.get("SourceDatabaseChangeVector"),
last_sent_etag=json_dict.get("LastSentEtag"),
last_database_etag=json_dict.get("LastDatabaseEtag"),
)


class OngoingTaskPullReplicationAsSink(OngoingTask):
"""Ongoing task information for a pull-replication sink task."""

def __init__(
self,
task_id: Optional[int] = None,
responsible_node: Optional[NodeId] = None,
task_state: Optional[OngoingTaskState] = None,
task_connection_status: Optional[OngoingTaskConnectionStatus] = None,
task_name: Optional[str] = None,
error: Optional[str] = None,
mentor_node: Optional[str] = None,
pin_to_mentor_node: Optional[bool] = None,
hub_name: Optional[str] = None,
mode: Optional[PullReplicationMode] = None,
destination_url: Optional[str] = None,
topology_discovery_urls: Optional[list] = None,
destination_database: Optional[str] = None,
connection_string_name: Optional[str] = None,
certificate_public_key: Optional[str] = None,
access_name: Optional[str] = None,
allowed_hub_to_sink_paths: Optional[list] = None,
allowed_sink_to_hub_paths: Optional[list] = None,
):
super().__init__(
task_id=task_id,
task_type=OngoingTaskType.PULL_REPLICATION_AS_SINK,
responsible_node=responsible_node,
task_state=task_state,
task_connection_status=task_connection_status,
task_name=task_name,
error=error,
mentor_node=mentor_node,
pin_to_mentor_node=pin_to_mentor_node,
)
self.hub_name = hub_name
self.mode = mode
self.destination_url = destination_url
self.topology_discovery_urls = topology_discovery_urls
self.destination_database = destination_database
self.connection_string_name = connection_string_name
self.certificate_public_key = certificate_public_key
self.access_name = access_name
self.allowed_hub_to_sink_paths = allowed_hub_to_sink_paths
self.allowed_sink_to_hub_paths = allowed_sink_to_hub_paths

def to_json(self) -> dict:
result = super().to_json()
result["HubName"] = self.hub_name
result["Mode"] = self.mode.value if self.mode else None
result["DestinationUrl"] = self.destination_url
result["TopologyDiscoveryUrls"] = self.topology_discovery_urls
result["DestinationDatabase"] = self.destination_database
result["ConnectionStringName"] = self.connection_string_name
result["CertificatePublicKey"] = self.certificate_public_key
result["AccessName"] = self.access_name
result["AllowedHubToSinkPaths"] = self.allowed_hub_to_sink_paths
result["AllowedSinkToHubPaths"] = self.allowed_sink_to_hub_paths
return result

@classmethod
def from_json(cls, json_dict: dict) -> Optional["OngoingTaskPullReplicationAsSink"]:
if json_dict is None:
return None
task_state_str = json_dict.get("TaskState")
task_connection_status_str = json_dict.get("TaskConnectionStatus")
mode_str = json_dict.get("Mode")
return cls(
task_id=json_dict.get("TaskId"),
responsible_node=NodeId.from_json(json_dict.get("ResponsibleNode")),
task_state=OngoingTaskState(task_state_str) if task_state_str else None,
task_connection_status=(
OngoingTaskConnectionStatus(task_connection_status_str) if task_connection_status_str else None
),
task_name=json_dict.get("TaskName"),
error=json_dict.get("Error"),
mentor_node=json_dict.get("MentorNode"),
pin_to_mentor_node=json_dict.get("PinToMentorNode"),
hub_name=json_dict.get("HubName"),
mode=PullReplicationMode(mode_str) if mode_str else None,
destination_url=json_dict.get("DestinationUrl"),
topology_discovery_urls=json_dict.get("TopologyDiscoveryUrls"),
destination_database=json_dict.get("DestinationDatabase"),
connection_string_name=json_dict.get("ConnectionStringName"),
certificate_public_key=json_dict.get("CertificatePublicKey"),
access_name=json_dict.get("AccessName"),
allowed_hub_to_sink_paths=json_dict.get("AllowedHubToSinkPaths"),
allowed_sink_to_hub_paths=json_dict.get("AllowedSinkToHubPaths"),
)


class ToggleOngoingTaskStateOperation(MaintenanceOperation[ModifyOngoingTaskResult]):
def __init__(
self, task_name_or_id: Union[int, str], type_of_task: Optional[OngoingTaskType], disable: Optional[bool]
Expand Down Expand Up @@ -454,6 +635,8 @@ def _deserialize_task(
return OngoingTaskGenAi.from_json(json_dict)
elif self._task_type == OngoingTaskType.EMBEDDINGS_GENERATION:
return OngoingTaskEmbeddingsGeneration.from_json(json_dict)
elif self._task_type == OngoingTaskType.PULL_REPLICATION_AS_SINK:
return OngoingTaskPullReplicationAsSink.from_json(json_dict)
else:
# todo: handle more types of tasks
return OngoingTask.from_json(json_dict)
Expand Down
Loading
Loading