Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 105 additions & 26 deletions shuffle-tools/1.2.0/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tempfile
import zipfile
import base64
import gzip
import ipaddress
import hashlib
from io import StringIO
Expand Down Expand Up @@ -53,6 +54,8 @@ def __init__(self, redis, logger, console_logger=None):
:param logger:
:param console_logger:
"""
self.cache_update_buffer = []
self.shared_cache = {}
super().__init__(redis, logger, console_logger)

def router(self):
Expand Down Expand Up @@ -661,6 +664,62 @@ def check_wildcard(self, wildcardstring, matching_string):

return False

def preload_cache(self, key):
org_id = self.full_execution["workflow"]["execution_org"]["id"]
url = f"{self.url}/api/v1/orgs/{org_id}/get_cache"
data = {
"workflow_id": self.full_execution["workflow"]["id"],
"execution_id": self.current_execution_id,
"authorization": self.authorization,
"org_id": org_id,
"key": key,
}
get_response = requests.post(url, json=data, verify=False)
response_data = get_response.json()
if "value" in response_data:
raw_value = response_data["value"]
if isinstance(raw_value, str):
try:
parsed = json.loads(raw_value)
except json.JSONDecodeError:
parsed = [raw_value]
else:
parsed = raw_value

if not isinstance(parsed, list):
parsed = [parsed]

response_data["value"] = parsed
return get_response.json()

def check_compression(self, obj, threshold=1_000_000):
data_btyes = json.dumps(obj).encode("utf-8")
if len(data_btyes) > threshold:
return True
return False

def compress_data(self, obj):
data_btyes = json.dumps(obj).encode("utf-8")
compressed_data = gzip.compress(data_btyes)
return base64.b64encode(compressed_data).decode("utf-8")

def update_cache(self, key):
org_id = self.full_execution["workflow"]["execution_org"]["id"]
url = f"{self.url}/api/v1/orgs/{org_id}/set_cache"
data = {
"workflow_id": self.full_execution["workflow"]["id"],
"execution_id": self.current_execution_id,
"authorization": self.authorization,
"org_id": org_id,
"key": key,
"value": json.dumps(self.shared_cache["value"]),
}

get_response = requests.post(url, json=data, verify=False)
self.cache_update_buffer = []
return get_response.json()


def filter_list(self, input_list, field, check, value, opposite):

# Remove hashtags on the fly
Expand Down Expand Up @@ -876,12 +935,20 @@ def filter_list(self, input_list, field, check, value, opposite):
failed_list.append(item)

elif check == "in cache key":
if item == input_list[0]:
self.shared_cache = self.preload_cache(key=value)

ret = self.check_cache_contains(value, tmp, "true")

if ret["success"] == True and ret["found"] == True:
new_list.append(item)
else:
failed_list.append(item)

if len(self.cache_update_buffer) > 400 or (item == input_list[-1] and len(self.cache_update_buffer) > 0):
self.update_cache(value)


#return {
# "success": True,
# "found": False,
Expand Down Expand Up @@ -931,13 +998,16 @@ def filter_list(self, input_list, field, check, value, opposite):
failed_list = tmplist

try:
return json.dumps(
{
data ={
"success": True,
"valid": new_list,
"invalid": failed_list,
}
)
if self.check_compression(data):
data = self.compress_data(data)
return data

return json.dumps(data)
# new_list = json.dumps(new_list)
except json.decoder.JSONDecodeError as e:
return json.dumps(
Expand Down Expand Up @@ -1737,7 +1807,6 @@ def escape_html(self, input_data):

def check_cache_contains(self, key, value, append):
org_id = self.full_execution["workflow"]["execution_org"]["id"]
url = "%s/api/v1/orgs/%s/get_cache" % (self.url, org_id)
data = {
"workflow_id": self.full_execution["workflow"]["id"],
"execution_id": self.current_execution_id,
Expand Down Expand Up @@ -1766,7 +1835,7 @@ def check_cache_contains(self, key, value, append):
value = json.dumps(value)
except Exception as e:
pass

if not isinstance(value, str):
value = str(value)

Expand All @@ -1778,11 +1847,13 @@ def check_cache_contains(self, key, value, append):
append = False

if "success" not in allvalues:
get_response = requests.post(url, json=data, verify=False)
#get_response = requests.post(url, json=data, verify=False)
pass

try:
if "success" not in allvalues:
allvalues = get_response.json()
#allvalues = get_response.json()
allvalues = self.shared_cache

try:
if allvalues["value"] == None or allvalues["value"] == "null":
Expand All @@ -1799,6 +1870,7 @@ def check_cache_contains(self, key, value, append):
set_response = requests.post(set_url, json=data, verify=False)
try:
allvalues = set_response.json()
self.shared_cache = self.preload_cache(key=key)
#allvalues["key"] = key
#return allvalues

Expand Down Expand Up @@ -1830,19 +1902,26 @@ def check_cache_contains(self, key, value, append):
if allvalues["value"] == None or allvalues["value"] == "null":
allvalues["value"] = "[]"

allvalues["value"] = str(allvalues["value"])
if isinstance(allvalues["value"], str):
try:
allvalues["value"] = json.loads(allvalues["value"])
except json.JSONDecodeError:
self.logger.info("[WARNING] Failed inner value cache parsing")
allvalues["value"] = [allvalues["value"]]

if not isinstance(allvalues["value"], list):
allvalues["value"] = [allvalues["value"]]

try:
parsedvalue = json.loads(allvalues["value"])
parsedvalue = json.loads(str(allvalues["value"]))
except json.decoder.JSONDecodeError as e:
parsedvalue = [str(allvalues["value"])]
except Exception as e:
parsedvalue = [str(allvalues["value"])]
parsedvalue = allvalues["value"]

try:
for item in parsedvalue:
#return "%s %s" % (item, value)
if item == value:
self.logger.info(f"{item} == {value}")
if str(item) == str(value):
if not append:
try:
newdata = json.loads(json.dumps(data))
Expand All @@ -1858,7 +1937,7 @@ def check_cache_contains(self, key, value, append):
"reason": "Found and not appending!",
"key": key,
"search": value,
"value": json.loads(allvalues["value"]),
"value": allvalues["value"],
}
else:
return {
Expand All @@ -1867,10 +1946,10 @@ def check_cache_contains(self, key, value, append):
"reason": "Found, was appending, but item already exists",
"key": key,
"search": value,
"value": json.loads(allvalues["value"]),
"value": allvalues["value"],
}
# Lol

# Lol
break
except Exception as e:
parsedvalue = [str(parsedvalue)]
Expand All @@ -1886,18 +1965,18 @@ def check_cache_contains(self, key, value, append):
"value": json.loads(allvalues["value"]),
}

new_value = parsedvalue
if new_value == None:
new_value = [value]
#parsedvalue.append(value)

new_value.append(value)
data["value"] = json.dumps(new_value)
#data["value"] = json.dumps(parsedvalue)

set_url = "%s/api/v1/orgs/%s/set_cache" % (self.url, org_id)
response = requests.post(set_url, json=data, verify=False)
if value not in allvalues["value"] and isinstance(allvalues["value"], list):
self.cache_update_buffer.append(value)
allvalues["value"].append(value)
#set_url = "%s/api/v1/orgs/%s/set_cache" % (self.url, org_id)
#response = requests.post(set_url, json=data, verify=False)
exception = ""
try:
allvalues = response.json()
#allvalues = response.json()
#return allvalues

return {
Expand All @@ -1906,7 +1985,7 @@ def check_cache_contains(self, key, value, append):
"reason": "Appended as it didn't exist",
"key": key,
"search": value,
"value": new_value,
"value": parsedvalue,
}
except Exception as e:
exception = e
Expand Down
Loading