Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pip-delete-this-directory.txt
# Extraneous
logs/
data/
output/
output_*

# Demo data
analyzed_videos/
Expand Down
53 changes: 53 additions & 0 deletions demos/minimal/actors/sample_generator_expiration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from actors.sample_generator_zmq import Generator
import numpy as np
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class Generator_exp(Generator):
"""Sample actor to generate data to pass into a sample processor with TTL for each Redis key

Intended for use along with sample_processor.py.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.data = None
self.name = "Generator_exp"
self.frame_num = 0

def runStep(self):
"""Generates additional data after initial setup data is exhausted.

Data is of a different form as the setup data in that although it is
the same size (5x1 vector), it is uniformly distributed in [1, 10]
instead of in [1, 100]. Therefore, the average over time should
converge to 5.5.
"""

if self.frame_num < np.shape(self.data)[0]:
if self.store_loc:
data_id = self.client.put(
self.data[self.frame_num], str(f"Gen_raw: {self.frame_num}")
)
else:
data_id = self.client.put(self.data[self.frame_num], ex=40)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only line we're changing relative to the other generator? Could we not inherit and overload the one method? Just seems like a lot of code duplication.

# logger.info('Put data in store')
try:
if self.store_loc:
self.q_out.put([[data_id, str(self.frame_num)]])
else:
self.q_out.put(data_id) # AsyncQueue.put()
# logger.info("Sent message on")

self.frame_num += 1
except Exception as e:
logger.error(
f"--------------------------------Generator Exception: {e}"
)
else:
self.data = np.concatenate(
(self.data, np.asmatrix(np.random.randint(10, size=(1, 5)))), axis=0
)
21 changes: 21 additions & 0 deletions demos/minimal/minimal_expiration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
actors:
Generator_exp:
package: actors.sample_generator_expiration
class: Generator_exp

Processor:
package: actors.sample_processor_zmq
class: Processor

# connections:
# Generator.q_out: [Processor.q_in]

connections:
Generator-Processor:
sources:
- Generator_exp.q_out
sinks:
- Processor.q_in

# redis_config:
# port: 6370
5 changes: 3 additions & 2 deletions improv/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def connect_to_server(self):

return self.client

def put(self, object):
def put(self, object, ex=60):
"""
Put a single object referenced by its string name
into the store. If the store already has a value stored at this key,
Expand All @@ -99,6 +99,7 @@ def put(self, object):
Args:
object: the object to store in Redis
object_key (str): the key under which the object should be stored
ex (int): TTL; the number of seconds after which the object got expired
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English phrasing: Maybe something like

ex (int): time to live; number of seconds until object will expire


Returns:
object: the object that was a
Expand All @@ -108,7 +109,7 @@ def put(self, object):
pickle.dumps(object, protocol=5), level=self.compression_level
)

self.client.set(object_key, data, nx=True)
self.client.set(object_key, data, nx=True, ex=ex)

return object_key

Expand Down