From fe54cadb1b9cfb38b34a8813f1383f230af00248 Mon Sep 17 00:00:00 2001 From: Frithjof Gressmann Date: Fri, 9 Oct 2020 01:59:40 +0100 Subject: [PATCH] Deprecate self.store in favor of self.storage --- CHANGELOG.md | 5 +- docs/guide/components.md | 31 +++--- src/machinable/core/component.py | 115 +++++++++-------------- src/machinable/storage/component.py | 53 ++++------- src/machinable/store/record.py | 2 +- src/machinable/store/store.py | 98 +++++++++---------- tests/execution/execution_test.py | 10 +- tests/storage/storage_test.py | 6 +- tests/store/store_test.py | 21 ++--- tests/test_project/nodes/observations.py | 14 +-- 10 files changed, 149 insertions(+), 206 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 330ca7d2..100f644c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,10 @@ -# Unreleased +# v2.6.0 -- Support static host info methods in Registration +- Deprecates self.store in favor of self.storage +- Support static host info methods in registration - New `find_experiments` method to simplify recursive search for experiments in a given directory ## v2.5.2 diff --git a/docs/guide/components.md b/docs/guide/components.md index 9ffbc72f..d054d6e9 100755 --- a/docs/guide/components.md +++ b/docs/guide/components.md @@ -98,13 +98,13 @@ For convenience, the dict interface can be accessed using the `.` object notatio Flags are configuration values that are associated with the particular execution, for example the random seeds or worker IDs. They are accessible via the `self.flags` object, that supports the `.` object notation. You can add your own flags through basic assignment, e.g. ``self.flags.counter = 1``. To avoid name collision, all native machinable flags use UPPERCASE (e.g. ``self.flags.SEED``). -## self.store +## self.storage -The interface `self.store` allows for the storing of data and results of the components. Note that you don't have to specify where the data is being stored. machinable will manage unique directories automatically. The data can later be retrieved using the [Storage](./storage.md) interface. +`self.storage` provides access to the storage directory of the component (each component directy name is unique and managed automatically so you don't have to specify where the data is being stored). The data can later be retrieved using the [storage interfaces](./storage.md). **Log** -`self.store.log` or `self.log` provides a standard logger interface that outputs to the console and a log file. +`self.storage.log` or `self.log` provides a standard logger interface that outputs to the console and a log file. ``` python self.log.info('Component created') @@ -113,7 +113,7 @@ self.log.debug('Component initialized') **Records** -`self.store.record` or `self.record` provides an interface for tabular logging, that is, storing recurring data points at each iteration. The results become available as a table where each row represents each iteration. +`self.storage.record` or `self.record` provides an interface for tabular logging, that is, storing recurring data points at each iteration. The results become available as a table where each row represents each iteration. ``` python for iteration in range(10): @@ -129,27 +129,22 @@ for iteration in range(10): If you use the `on_execute_iteration` event, iteration information and `record.save()` will be triggered automatically at the end of each iteration. -Sometimes it is useful to have multiple tabular loggers, for example to record training and validation performance separately. You can create custom record loggers using `self.store.get_record_writer(scope)` which returns a new instance of a record writer that you can use just like the main record writer. +Sometimes it is useful to have multiple tabular loggers, for example to record training and validation performance separately. You can create custom record loggers using `self.storage.get_record_writer(scope)` which returns a new instance of a record writer that you can use just like the main record writer. -**Store** +**Custom data** -You can use `self.store.write()` to write any other Python object, for example: +Any other data can be stored in the `data/` subdirectory. -```python -self.store.write('final_accuracy', [0.85, 0.92]) -``` -Note that to protect unintended data loss, overwriting will fail unless the ``overwrite`` argument is explicitly set. - -For larger data structures, it can be more suitable to write data in specific file formats by appending a file extension, i.e.: +You can use `self.storage.write_data()` to write any other Python object, for example: ``` python -self.store.write('data.txt', 'a string') -self.store.write('data.p', generic_object) -self.store.write('data.json', jsonable_object) -self.store.write('data.npy', numpy_array) +self.storage.save_data('data.txt', 'a string') +self.storage.save_data('data.p', generic_object) +self.storage.save_data('data.json', jsonable_object) +self.storage.save_data('data.npy', numpy_array) ``` -Refer to the store [reference](./components.md#store) for more details. +To protect against unintended data loss, you can set `overwrite=False`. ## Config methods diff --git a/src/machinable/core/component.py b/src/machinable/core/component.py index d5e5d848..ecacfa14 100644 --- a/src/machinable/core/component.py +++ b/src/machinable/core/component.py @@ -144,9 +144,9 @@ def unserialize(cls, serialized): @staticmethod def save(component): - if component.node is not None or component.store is None: + if component.node is not None or component.storage is None: return False - component.store.write( + component.storage.write( "state.json", { "component": component.component_state.serialize(), @@ -185,7 +185,7 @@ def __init__(self, config: dict = None, flags: dict = None, node=None): self._node: Optional[Component] = node self._components: Optional[List[Component]] = None - self._store: Optional[Store] = None + self._storage = None self._events = Events() self._actor_config = None self._storage_config = None @@ -261,25 +261,30 @@ def components(self, value): self._components = value @property - def store(self) -> Store: - if self._store is None and isinstance(self.node, Component): + def store(self): + # deprecated alias + return self.storage + + @property + def storage(self): + if self._storage is None and isinstance(self.node, Component): # forward to node store if available - return self.node.store - return self._store + return self.node.storage + return self._storage - @store.setter - def store(self, value): - self._store = value + @storage.setter + def storage(self, value): + self._storage = value @property def record(self) -> Record: """Record writer instance""" - return self.store.record + return self.storage.record @property def log(self) -> Log: """Log writer instance""" - return self.store.log + return self.storage.log @property def events(self) -> Events: @@ -308,26 +313,25 @@ def dispatch( self.on_init_storage(storage_config) self._storage_config = storage_config - self.store = Store(component=self, config=storage_config) + self.storage = Store(component=self, config=storage_config) if not storage_config["url"].startswith("mem://"): OutputRedirection.apply( self._storage_config["output_redirection"], - self.store.get_stream, + self.storage.get_stream, "output.log", ) - if not self.store.exists("host.json", _meta=True): - self.store.write("host.json", get_host_info(), _meta=True) - if not self.store.exists("component.json", _meta=True): - self.store.write("component.json", self.serialize(), _meta=True) - if not self.store.exists("components.json", _meta=True): - self.store.write( + if not self.storage.has_file("host.json"): + self.storage.save_file("host.json", get_host_info()) + if not self.storage.has_file("component.json"): + self.storage.save_file("component.json", self.serialize()) + if not self.storage.has_file("components.json"): + self.storage.save_file( "components.json", [component.serialize() for component in self.components] if self.components else [], - _meta=True, ) self.component_state.save(self) @@ -442,12 +446,12 @@ def execute(self): if self.on_after_execute_iteration(iteration) is not False: # trigger records.save() automatically if ( - self.store - and self.store.has_records() - and not self.store.record.empty() + self.storage + and self.storage.has_records() + and not self.storage.record.empty() ): - self.store.record["_iteration"] = iteration - self.store.record.save() + self.record["_iteration"] = iteration + self.record.save() except (KeyboardInterrupt, StopIteration): callback = StopIteration @@ -500,9 +504,7 @@ def refresh_status(self, log_errors=False): """ try: self.component_status["heartbeat_at"] = str(pendulum.now()) - self.store.write( - "status.json", self.component_status, overwrite=True, _meta=True - ) + self.storage.save_file("status.json", self.component_status) except (IOError, Exception) as ex: if log_errors: self.log.error( @@ -513,29 +515,6 @@ def refresh_status(self, log_errors=False): return True - def get_url(self, append=""): - """Returns the storage URL of the component""" - return os.path.join( - self._storage_config["url"], - os.path.join( - self._storage_config.get("directory", ""), - self._storage_config["experiment"], - self._storage_config.get("component", ""), - append, - ), - ) - - def local_directory(self, append=""): - """Returns the local storage filesystem path, or False if non-local - - # Returns - Local filesystem path, or False if non-local - """ - if not self._storage_config["url"].startswith("osfs://"): - return False - - return os.path.join(self.get_url().split("osfs://")[-1], append) - def set_seed(self, seed=None) -> bool: """Applies a global random seed @@ -564,16 +543,16 @@ def save_checkpoint(self, path: str = None, timestep=None) -> Union[bool, str]: timestep: int = len(self.component_state.checkpoints) if path is None: - if not self.store: + if not self.storage: raise ValueError("You need to specify a checkpoint path") - fs_prefix, basepath = self.store.config["url"].split("://") + fs_prefix, basepath = self.storage.config["url"].split("://") if fs_prefix != "osfs": # todo: support non-local filesystems via automatic sync raise NotImplementedError( "Checkpointing to non-os file systems is currently not supported." ) - checkpoint_path = self.store.get_path("checkpoints", create=True) + checkpoint_path = self.storage.get_path("checkpoints", create=True) path = os.path.join(os.path.expanduser(basepath), checkpoint_path) checkpoint = self.on_save(path, timestep) @@ -591,8 +570,8 @@ def restore_checkpoint(self, checkpoint): # Arguments filepath: Checkpoint filepath """ - if self.store is not None: - self.store.log.info(f"Restoring checkpoint `{checkpoint}`") + if self.storage is not None: + self.log.info(f"Restoring checkpoint `{checkpoint}`") return self.on_restore(checkpoint) def serialize(self): @@ -811,26 +790,24 @@ def dispatch(self, components_config, storage_config, actor_config=None): payload["components"] = [ config_map(component) for component in components ] - elif key == "store" or key == "_store": - if key == "_store": - payload["store"] = storage_config + elif key == "storage" or key == "_storage": + if key == "_storage": + payload["storage"] = storage_config else: - store = Store(component=self, config=storage_config) - store.write("host.json", get_host_info(), _meta=True) - store.write( - "component", + storage = Store(component=self, config=storage_config) + storage.save_file("host.json", get_host_info()) + storage.save_file( + "component.json", {"config": self.node["config"], "flags": self.node["flags"]}, - _meta=True, ) - store.write( - "components", + storage.save_file( + "components.json", [ {"config": c["config"], "flags": c["flags"]} for c in components ], - _meta=True, ) - payload["store"] = store + payload["storage"] = storage else: raise ValueError( f"Unrecognized argument: '{key}'. " diff --git a/src/machinable/storage/component.py b/src/machinable/storage/component.py index c0a365e9..e9906fa7 100644 --- a/src/machinable/storage/component.py +++ b/src/machinable/storage/component.py @@ -19,12 +19,13 @@ def __init__( self._cache = cache or {} self._cache["experiment"] = experiment - def file(self, filepath, default=sentinel, reload=None): + def read_file(self, filepath, default=sentinel, reload=None): """Returns the content of a file in the storage # Arguments filepath: Relative filepath - reload: If True, cache will be ignored. If datetime, file will be reloaded if cached version is older than the date + reload: If True, cache will be ignored. If datetime, file will be reloaded + if cached version is older than the date """ if reload is None: finished_at = self.finished_at @@ -90,48 +91,32 @@ def experiment(self): return self._cache["experiment"] - def store(self, name=None): - """Retrieves element from the store - - This is the counterpart to the ``store.write`` method. + def read_data(self, name=None, default=sentinel): + """Retrieves a data object from the storage # Arguments - name: Key or filename of the object that is to be retrieved. If None, a list of available objects is returned + name: Name of the data object. If None, a list of available objects is returned """ - if isinstance(name, str) and os.path.splitext(name)[1] != "": - return self._model.file(os.path.join("store", name)) - - if "store" in self._cache: - store = self._cache["store"] - else: - try: - store = self._model.file("store.json") - except FileNotFoundError: - store = {} - - with open_fs(self.url) as filesystem: - store["__files"] = filesystem.listdir("store") - - if self.is_finished(): - self._cache["store"] = store - if name is None: - return store + with open_fs(self.url) as filesystem: + return filesystem.listdir("data") - return store[name] + return self.read_file(os.path.join("data", name), default) @property def config(self): """Returns the component config""" if "config" not in self._cache: - self._cache["config"] = config_map(self.file("component.json")["config"]) + self._cache["config"] = config_map( + self.read_file("component.json")["config"] + ) return self._cache["config"] @property def flags(self): """Returns the component flags""" if "flags" not in self._cache: - self._cache["flags"] = config_map(self.file("component.json")["flags"]) + self._cache["flags"] = config_map(self.read_file("component.json")["flags"]) return self._cache["flags"] @property @@ -143,7 +128,7 @@ def tuning(self): def components(self): if "components" not in self._cache: self._cache["components"] = [ - config_map(component) for component in self.file("components.json") + config_map(component) for component in self.read_file("components.json") ] return self._cache["components"] @@ -152,7 +137,7 @@ def components(self): def host(self): """Returns information of the host""" if "host" not in self._cache: - self._cache["host"] = config_map(self.file("host.json")) + self._cache["host"] = config_map(self.read_file("host.json")) return self._cache["host"] @property @@ -161,7 +146,7 @@ def state(self): if "state" in self._cache: return self._cache["state"] - state = config_map(self.file("state.json")) + state = config_map(self.read_file("state.json")) if self.is_finished(): self._cache["state"] = state @@ -172,7 +157,7 @@ def log(self): if "log" in self._cache: return self._cache["log"] - log = self.file("log.txt") + log = self.read_file("log.txt") if self.is_finished(): self._cache["log"] = log @@ -184,7 +169,7 @@ def output(self): if "output" in self._cache: return self._cache["output"] - output = self.file("output.log") + output = self.read_file("output.log") if self.is_finished(): self._cache["output"] = output @@ -220,7 +205,7 @@ def get_records(self, scope=None): if "records." + scope in self._cache: return self._cache["records." + scope] - records = RecordCollection(self.file(f"records/{scope}.p")) + records = RecordCollection(self.read_file(f"records/{scope}.p")) if self.is_finished(): self._cache["records." + scope] = records diff --git a/src/machinable/store/record.py b/src/machinable/store/record.py index 3baaf1db..74494ea6 100644 --- a/src/machinable/store/record.py +++ b/src/machinable/store/record.py @@ -176,7 +176,7 @@ def save(self, echo=False, force=False): if self.scope == "default": if hasattr(self.store, "events"): - self.store.events.trigger("store.on_change", "record.save") + self.store.events.trigger("storage.on_change", "record.save") if echo: msg(os.path.join(self.store.config["url"], self.store.get_path())) diff --git a/src/machinable/store/store.py b/src/machinable/store/store.py index 1359dbbb..e1892aba 100644 --- a/src/machinable/store/store.py +++ b/src/machinable/store/store.py @@ -19,7 +19,7 @@ class Store: """Store interface ::: tip - Becomes available as ``self.store`` + Becomes available as ``self.storage`` ::: # Arguments @@ -43,19 +43,15 @@ def __init__(self, component, config): self._log = None self.created_at = pendulum.now().timestamp() - # restore if existing - with open_fs(self.config["url"]) as filesystem: - self._store = filesystem.load_file(self.get_path("store.json"), default={}) - - def get_record_writer(self, scope): + def get_record_writer(self, scope=None): """Creates or returns an instance of a record writer # Arguments - scope: Name of the record writer - - # Returns - machinable.store.record.Record + scope: Name of the record writer. If None, a dict of all registered writers will be returned """ + if scope is None: + return self._record_writers + if scope not in self._record_writers: self._record_writers[scope] = Record( store=self, config=self.config["records"], scope=scope @@ -63,26 +59,15 @@ def get_record_writer(self, scope): return self._record_writers[scope] - @property - def record_writers(self): - """Returns a mapping of all record writers (scope => RecordWriter)""" - return self._record_writers - @property def record(self): """Record interface - - # Returns - machinable.store.record.Record """ return self.get_record_writer("default") @property def log(self): """Log interface - - # Returns - machinable.store.log.Log """ if self._log is None: self._log = Log(store=self, config=self.config["log"]) @@ -100,7 +85,7 @@ def has_records(self, scope="default"): """ return scope in self._record_writers - def has_logs(self): + def has_log(self): """Determines whether log writer exists # Returns @@ -108,16 +93,24 @@ def has_logs(self): """ return self._log is not None - def write(self, name, data, overwrite=False, _meta=False): + def has_file(self, name): + with open_fs(self.config["url"]) as filesystem: + return filesystem.exists(name) + + def write(self, name, data, overwrite=True, _meta=False): + # deprecated alias + return self.save_data(name, data, overwrite, _meta) + + def save_file(self, filepath, data): + return self.save_data(filepath, data, overwrite=True, _meta=True) + + def save_data(self, name, data, overwrite=True, _meta=False): """Stores a data object # Arguments - name: String, name identifier. - You can provide an extension to instruct machinable to write the data in its own file and not as part - of a dictionary with other stored values. - Supported formats are .json (JSON), .npy (numpy), .p (pickle), .txt (txt) + name: String, name identifier. Supported formats are .json (JSON), .npy (numpy), .p (pickle), .txt (txt) data: The data object - overwrite: Boolean, if True existing values will be overwritten + overwrite: Boolean, if False existing values won't be overwritten if existing """ mode = "w" if overwrite else "a" path = os.path.dirname(name) @@ -125,23 +118,13 @@ def write(self, name, data, overwrite=False, _meta=False): _, ext = os.path.splitext(name) if not _meta: - path = "store/" + path + path = "data/" + path self.filesystem.makedir(self.get_path(path), recreate=True) filepath = os.path.join(path, name) # todo: check overwrite for files - if ext == "": - # automatic handling - if name in self._store and not overwrite: - raise ValueError( - f"'{name}' already exist. " - f"Use overwrite=True if you intent to overwrite existing data" - ) - self._store[name] = data - with self.get_stream("store.json", "w") as f: - f.write(json.dumps(self._store, ensure_ascii=False, default=serialize)) - elif ext == ".json": + if ext == ".json": # json with self.get_stream(filepath, mode) as f: f.write(json.dumps(data, ensure_ascii=False, default=serialize)) @@ -171,23 +154,34 @@ def write(self, name, data, overwrite=False, _meta=False): if hasattr(self.component, "events"): self.component.events.trigger( - "store.on_change", "store.save", {"name": name, "data": data} + "storage.on_change", "storage.save", {"name": name, "data": data} ) - def read(self, name, default=sentinel, _meta=False): - if not _meta: - name = "store/" + name - with open_fs(self.config["url"]) as filesystem: - return filesystem.load_file(self.get_path(name), default) + def get_url(self, append=""): + """Returns the storage URL of the component""" + return os.path.join( + self.config["url"], + os.path.join( + self.config.get("directory", ""), + self.config["experiment"], + self.config.get("component", ""), + append, + ), + ) - def exists(self, name, _meta=False): - if not _meta: - name = "store/" + name - with open_fs(self.config["url"]) as filesystem: - return filesystem.exists(name) + def local_directory(self, append=""): + """Returns the local storage filesystem path, or False if non-local + + # Returns + Local filesystem path, or False if non-local + """ + if not self.config["url"].startswith("osfs://"): + return False + + return os.path.join(self.get_url().split("osfs://")[-1], append) def get_stream(self, path, mode="r", *args, **kwargs): - """Returns a file stream on the store write + """Returns a file stream on the storage # Arguments path: Relative file path diff --git a/tests/execution/execution_test.py b/tests/execution/execution_test.py index 81ab7549..61553132 100644 --- a/tests/execution/execution_test.py +++ b/tests/execution/execution_test.py @@ -13,17 +13,19 @@ def test_execution_decorators(): t = Experiment().components("thenode", "thechildren") @execute - def run(component, components, store): + def run(component, components, storage): assert component.config.alpha == 0 - store.log.info("Custom training with learning_rate=" + str(component.config.a)) + storage.log.info( + "Custom training with learning_rate=" + str(component.config.a) + ) assert components[0].config.alpha == 0 assert run(t, seed=1, project="./test_project").failures == 0 @Execution - def run_2(component, components, store): + def run_2(component, components, storage): assert component.config.alpha == 0 - store.log.info("Execution decorator") + storage.log.info("Execution decorator") assert components[0].config.alpha == 0 assert run_2(t, seed=1, project="./test_project").submit().failures == 0 diff --git a/tests/storage/storage_test.py b/tests/storage/storage_test.py index 68bac597..312cc5c2 100644 --- a/tests/storage/storage_test.py +++ b/tests/storage/storage_test.py @@ -49,11 +49,7 @@ def test_component_storage(): assert comp.flags.NAME == "nodes.observations" assert comp.config.to_test == "observations" assert len(comp.components) == 0 - assert comp.store("data.json")["observation_id"] > 0 - assert comp.store("test") == 2 - assert comp.store("key") == "value" - assert "test" in comp.store() - assert len(comp.store()["__files"]) + assert comp.read_data("data.json")["observation_id"] > 0 assert len(comp.host) == 10 assert len(comp.get_records()) == 2 diff --git a/tests/store/store_test.py b/tests/store/store_test.py index 74434967..04ec74c1 100644 --- a/tests/store/store_test.py +++ b/tests/store/store_test.py @@ -13,26 +13,19 @@ def test_store_writer(): ) # write - store.write("test.txt", "test me") + store.save_data("test.txt", "test me") f = os.path.join( - store.config["experiment"], store.config["component"], "store", "test.txt" + store.config["experiment"], store.config["component"], "data", "test.txt" ) assert store.filesystem.readtext(f) == "test me" - store.write("test.npy", np.ones([5])) - store.write("test.p", np.ones([5])) - store.write("test.json", [1, 2, 3]) - store.write("dir/test.txt", "subdirectory") + store.save_data("test.npy", np.ones([5])) + store.save_data("test.p", np.ones([5])) + store.save_data("test.json", [1, 2, 3]) + store.save_data("dir/test.txt", "subdirectory") f = os.path.join( - store.config["experiment"], store.config["component"], "store", "dir/test.txt" + store.config["experiment"], store.config["component"], "data", "dir/test.txt" ) assert store.filesystem.readtext(f) == "subdirectory" - f = os.path.join( - store.config["experiment"], store.config["component"], "store.json" - ) - store.write("test", True) - assert store.filesystem.readtext(f) == '{"test": true}' - store.write("bla", 1) - assert store.filesystem.readtext(f) == '{"test": true, "bla": 1}' # observations store.record["test"] = 1 diff --git a/tests/test_project/nodes/observations.py b/tests/test_project/nodes/observations.py index a3bbdac6..76bf3dc9 100644 --- a/tests/test_project/nodes/observations.py +++ b/tests/test_project/nodes/observations.py @@ -29,12 +29,12 @@ def on_execute_iteration(self, iteration): if self.config.get("test") is True: # custom records - self.store.get_record_writer("validation")["iteration"] = iteration - self.store.get_record_writer("validation").save() + self.storage.get_record_writer("validation")["iteration"] = iteration + self.storage.get_record_writer("validation").save() - self.store.write("test", 2, overwrite=True) + self.storage.save_data("test_data.json", 2) if iteration == 1: - self.store.write("test.txt", f"hello from observation {self.config.id}") - self.store.write("data.json", {"observation_id": self.config.id}) - self.store.write("key", "value") - self.store.write("test", 1, overwrite=True) + self.storage.save_data( + "test.txt", f"hello from observation {self.config.id}" + ) + self.storage.save_data("data.json", {"observation_id": self.config.id})