-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathactor_storage_example.py
More file actions
120 lines (96 loc) · 4.21 KB
/
actor_storage_example.py
File metadata and controls
120 lines (96 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
Example: Demonstrating how to use Actor storage methods with environment variables
This simulates the environment in which your Actor would run
"""
import os
import json
from datetime import datetime, timedelta
from scrapeless import Actor
from scrapeless.types import IPaginationParams, IKVValueData, IQueuePushParams
# Mock environment variables for testing
# In a real Actor runtime, these would be set by the platform
os.environ['SCRAPELESS_INPUT'] = json.dumps({
'keywords': ['apple', 'banana', 'orange'],
'maxResults': 10
})
os.environ['SCRAPELESS_DATASET_ID'] = 'mock_dataset_id'
os.environ['SCRAPELESS_KV_NAMESPACE_ID'] = 'mock_kv_namespace_id'
os.environ['SCRAPELESS_BUCKET_ID'] = 'mock_bucket_id'
os.environ['SCRAPELESS_QUEUE_ID'] = 'mock_queue_id'
def run_actor_example():
try:
print('Starting Actor storage example')
# Initialize Actor
actor = Actor()
# Get input data from environment
input_data = actor.input()
print('Actor input:', input_data)
# Dataset operations
print('\n--- Dataset operations ---')
try:
# Get dataset ID from environment
dataset_id = os.getenv('SCRAPELESS_DATASET_ID')
# Add items to dataset (using Actor convenience method)
items = [{'keyword': keyword, 'timestamp': datetime.now().isoformat()} for keyword in input_data['keywords']]
add_result = actor.add_items(items)
print('Added items to dataset:', add_result)
# Get items from dataset (using Actor convenience method)
pagination = IPaginationParams(
page=1,
page_size=10,
)
get_result = actor.get_items(pagination)
print('Items from dataset:', get_result)
# Using direct storage service
# Note: In a real environment, the results would match. Here we can't truly compare.
print(f'Would use direct API call with dataset_id: {dataset_id}')
except Exception as e:
print(f'Dataset operations error: {e}')
# KV storage operations
print('\n--- KV storage operations ---')
try:
# Get KV namespace ID from environment
namespace_id = os.getenv('SCRAPELESS_KV_NAMESPACE_ID')
# Set value (using Actor convenience method)
data = IKVValueData(
key='last_run',
value=datetime.now().isoformat()
)
set_value = actor.set_value(data)
print('Set KV value:', set_value)
# Get value (using Actor convenience method)
get_value = actor.get_value('last_run')
print('Get KV value:', get_value)
print(f'Would use direct API call with namespace_id: {namespace_id}')
except Exception as e:
print(f'KV storage operations error: {e}')
# Queue operations
print('\n--- Queue operations ---')
try:
# Get queue ID from environment
queue_id = os.getenv('SCRAPELESS_QUEUE_ID')
# Push message to queue (using Actor convenience method)
deadline = datetime.now() + timedelta(hours=1)
push_params = IQueuePushParams(
name='product-next',
payload=json.dumps({'a': 'a'}),
retry=2,
timeout=3000,
deadline=deadline
)
push_result = actor.push_message(push_params)
print('Pushed message to queue:', push_result)
# Pull message from queue (using Actor convenience method)
pull_result = actor.pull_message()
print('Pulled message from queue:', pull_result)
# Acknowledge message (using Actor convenience method)
if pull_result and len(pull_result) > 0:
ack_result = actor.ack_message(pull_result[0]['id'])
print('Acknowledged message:', ack_result)
print(f'Would use direct API call with queue_id: {queue_id}')
except Exception as e:
print(f'Queue operations error: {e}')
except Exception as e:
print(f'Actor example error: {e}')
# Run the example
run_actor_example()