Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/actinia/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ def __update(
for key in actinia_json_dict:
setattr(self, key, actinia_json_dict[key])

def poll(self, quiet=False):
def poll(self, quiet=False, retries=0):
"""
Update job by polling.

Args:
quiet: Bool if the method should log process status
retries: Integer number of retries in case of ReadTimeout
"""
if self.status not in ["accepted", "running"]:
log.warning("The job is not running and can not be updated.")
Expand All @@ -67,7 +71,9 @@ def poll(self, quiet=False):
"timeout": self.__actinia.timeout,
}
url = self.urls["status"]
resp = request_and_check("GET", url, status_code=(200, 400), **kwargs)
resp = request_and_check(
"GET", url, status_code=(200, 400), retries=retries, **kwargs
)

if "process_results" not in resp:
resp["process_results"] = {}
Expand All @@ -77,19 +83,20 @@ def poll(self, quiet=False):
if not quiet:
log.info(f"Status of {self.name} job is {self.status}.")

def poll_until_finished(self, waiting_time=5, quiet=False):
def poll_until_finished(self, waiting_time=5, quiet=False, retries=0):
"""
Polling job until finished or error.

Args:
waiting_time: Time to wait in seconds for next poll
quiet: Bool if the method should log each process status or only
changed
retries: Integer number of retries in case of ReadTimeout
"""
status_accepted_running = True
status = None
while status_accepted_running:
self.poll(quiet=True)
self.poll(quiet=True, retries=retries)
if self.status not in ["accepted", "running"]:
status_accepted_running = False
msg = (
Expand Down
42 changes: 32 additions & 10 deletions src/actinia/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@
from datetime import datetime


def request_and_check(method, url, status_code=(200,), **kwargs):
"""Function to send a GET request to an URL and check the status code.
def request_and_check(method, url, status_code=(200,), retries=0, **kwargs):
"""Send a request with the given method to a URL and check the status code.

Parameters:
method (string): Request method (GET, POST, PUT, DELETE, ...)
url (string): URL as string
status_code (tuple): Tuple of acceptable status codes to check
if it is set; default is 200
retries (int): Maximal number of retries in case of read timeouts
default is 0.
**kwargs:
auth (tuple): Tuple of user and password
timeout (tuple): Tuple of connection timeout and read timeout
Expand All @@ -49,15 +51,35 @@ def request_and_check(method, url, status_code=(200,), **kwargs):
Returns:
(dict): returns text of the response as dictionary

Throws an error if the request does not have the status_code
Throws an error if the request does not have the right status_code or
the response content cannot be paresd as JSON.
"""
resp = requests.request(method, url, **kwargs)
# Use resp.raise_for_status() ?
if resp.status_code == 401:
raise Exception("Wrong user or password. Please check your inputs.")
elif resp.status_code not in status_code:
raise Exception(f"Error {resp.status_code}: {resp.text}")
return json.loads(resp.text)
attempt = 0
while attempt <= retries:
attempt += 1
resp = requests.request(method, url, **kwargs)
try:
if resp.status_code not in status_code:
resp.raise_for_status()
except requests.exceptions.ReadTimeout as e:
if attempt >= retries:
raise e
continue
except requests.exceptions.RequestException as e:
if resp.status_code == 401:
raise Exception(
"Wrong user or password. Please check your inputs."
) from e
raise requests.exceptions.RequestException(
f"Error {resp.status_code}: {resp.text}", e
) from None
try:
return json.loads(resp.text)
except json.JSONDecodeError:
raise RuntimeError(
"Invalid value returned. Cannot parse JSON from response:",
resp.text,
) from None


def set_job_names(name, default_name="unknown_job"):
Expand Down