Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions temporalio/nexus/_operation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class Info:
Retrieved inside a Nexus operation handler via :py:func:`info`.
"""

namespace: str
"""The namespace of the worker handling this Nexus operation."""

task_queue: str
"""The task queue of the worker handling this Nexus operation."""

Expand Down
6 changes: 4 additions & 2 deletions temporalio/worker/_nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(
*,
bridge_worker: Callable[[], temporalio.bridge.worker.Worker],
client: temporalio.client.Client,
namespace: str,
task_queue: str,
service_handlers: Sequence[Any],
data_converter: temporalio.converter.DataConverter,
Expand All @@ -76,6 +77,7 @@ def __init__(
) -> None:
self._bridge_worker = bridge_worker
self._client = client
self._namespace = namespace
self._task_queue = task_queue

self._metric_meter = metric_meter
Expand Down Expand Up @@ -242,7 +244,7 @@ async def _handle_cancel_operation_task(
request_deadline=request_deadline,
)
temporalio.nexus._operation_context._TemporalCancelOperationContext(
info=lambda: Info(task_queue=self._task_queue),
info=lambda: Info(namespace=self._namespace, task_queue=self._task_queue),
nexus_context=ctx,
client=self._client,
_runtime_metric_meter=self._metric_meter,
Expand Down Expand Up @@ -373,7 +375,7 @@ async def _start_operation(
temporalio.nexus._operation_context._TemporalStartOperationContext(
nexus_context=ctx,
client=self._client,
info=lambda: Info(task_queue=self._task_queue),
info=lambda: Info(namespace=self._namespace, task_queue=self._task_queue),
_runtime_metric_meter=self._metric_meter,
_worker_shutdown_event=self._worker_shutdown_event,
).set()
Expand Down
1 change: 1 addition & 0 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ def _init_from_config(self, client: temporalio.client.Client, config: WorkerConf
self._nexus_worker = _NexusWorker(
bridge_worker=lambda: self._bridge_worker,
client=config["client"], # type: ignore[reportTypedDictNotRequiredAccess]
namespace=client_config["namespace"],
task_queue=config["task_queue"], # type: ignore[reportTypedDictNotRequiredAccess]
service_handlers=nexus_service_handlers,
data_converter=client_config["data_converter"],
Expand Down
41 changes: 41 additions & 0 deletions tests/nexus/test_workflow_caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,47 @@ async def test_sync_operation_happy_path(client: Client, env: WorkflowEnvironmen
assert wf_output.op_output.value == "sync response"


@service_handler
class NexusInfoService:
@sync_operation
async def get_info(
self, _ctx: StartOperationContext, _input: None
) -> dict[str, str]:
info = nexus.info()
return {"namespace": info.namespace, "task_queue": info.task_queue}


@workflow.defn
class NexusInfoCallerWorkflow:
@workflow.run
async def run(self, task_queue: str) -> dict[str, str]:
nexus_client = workflow.create_nexus_client(
service=NexusInfoService,
endpoint=make_nexus_endpoint_name(task_queue),
)
return await nexus_client.execute_operation(NexusInfoService.get_info, None)


async def test_nexus_info_includes_namespace(client: Client, env: WorkflowEnvironment):
task_queue = str(uuid.uuid4())
async with Worker(
client,
nexus_service_handlers=[NexusInfoService()],
workflows=[NexusInfoCallerWorkflow],
task_queue=task_queue,
):
endpoint_name = make_nexus_endpoint_name(task_queue)
await env.create_nexus_endpoint(endpoint_name, task_queue)
result = await client.execute_workflow(
NexusInfoCallerWorkflow.run,
task_queue,
id=str(uuid.uuid4()),
task_queue=task_queue,
)
assert result["namespace"] == client.namespace
assert result["task_queue"] == task_queue


async def test_workflow_run_operation_happy_path(
client: Client, env: WorkflowEnvironment
):
Expand Down
Loading