diff --git a/.gitignore b/.gitignore index bedf9d58..0cb7c648 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,8 @@ pip-delete-this-directory.txt # Extraneous logs/ data/ +output/ +output_* # Demo data analyzed_videos/ diff --git a/demos/minimal/actors/sample_generator_expiration.py b/demos/minimal/actors/sample_generator_expiration.py new file mode 100644 index 00000000..810ea175 --- /dev/null +++ b/demos/minimal/actors/sample_generator_expiration.py @@ -0,0 +1,53 @@ +from actors.sample_generator_zmq import Generator +import numpy as np +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class Generator_exp(Generator): + """Sample actor to generate data to pass into a sample processor with TTL for each Redis key + + Intended for use along with sample_processor.py. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.data = None + self.name = "Generator_exp" + self.frame_num = 0 + + def runStep(self): + """Generates additional data after initial setup data is exhausted. + + Data is of a different form as the setup data in that although it is + the same size (5x1 vector), it is uniformly distributed in [1, 10] + instead of in [1, 100]. Therefore, the average over time should + converge to 5.5. + """ + + if self.frame_num < np.shape(self.data)[0]: + if self.store_loc: + data_id = self.client.put( + self.data[self.frame_num], str(f"Gen_raw: {self.frame_num}") + ) + else: + data_id = self.client.put(self.data[self.frame_num], ex=40) + # logger.info('Put data in store') + try: + if self.store_loc: + self.q_out.put([[data_id, str(self.frame_num)]]) + else: + self.q_out.put(data_id) # AsyncQueue.put() + # logger.info("Sent message on") + + self.frame_num += 1 + except Exception as e: + logger.error( + f"--------------------------------Generator Exception: {e}" + ) + else: + self.data = np.concatenate( + (self.data, np.asmatrix(np.random.randint(10, size=(1, 5)))), axis=0 + ) diff --git a/demos/minimal/minimal_expiration.yaml b/demos/minimal/minimal_expiration.yaml new file mode 100644 index 00000000..cda2b798 --- /dev/null +++ b/demos/minimal/minimal_expiration.yaml @@ -0,0 +1,21 @@ +actors: + Generator_exp: + package: actors.sample_generator_expiration + class: Generator_exp + + Processor: + package: actors.sample_processor_zmq + class: Processor + +# connections: +# Generator.q_out: [Processor.q_in] + +connections: + Generator-Processor: + sources: + - Generator_exp.q_out + sinks: + - Processor.q_in + +# redis_config: +# port: 6370 \ No newline at end of file diff --git a/improv/store.py b/improv/store.py index ef4868ef..aceaf09d 100644 --- a/improv/store.py +++ b/improv/store.py @@ -88,7 +88,7 @@ def connect_to_server(self): return self.client - def put(self, object): + def put(self, object, ex=60): """ Put a single object referenced by its string name into the store. If the store already has a value stored at this key, @@ -99,6 +99,7 @@ def put(self, object): Args: object: the object to store in Redis object_key (str): the key under which the object should be stored + ex (int): TTL; the number of seconds after which the object got expired Returns: object: the object that was a @@ -108,7 +109,7 @@ def put(self, object): pickle.dumps(object, protocol=5), level=self.compression_level ) - self.client.set(object_key, data, nx=True) + self.client.set(object_key, data, nx=True, ex=ex) return object_key