diff --git a/.gitignore b/.gitignore index 0866e96..8966fc8 100644 --- a/.gitignore +++ b/.gitignore @@ -169,5 +169,6 @@ log/ calibrate/ experiment.py nohup.out -cache.jsonl quick_test.py +*.jsonl +**/config.yaml \ No newline at end of file diff --git a/pyduino/config.template.yaml b/pyduino/config.template.yaml new file mode 100644 index 0000000..cc5ff9d --- /dev/null +++ b/pyduino/config.template.yaml @@ -0,0 +1,84 @@ +hyperparameters: + reset_density: true #Whether or not to use gotod + log_name: #Log folder name (delete to use the default) + density_param: DensidadeAtual #Parameter name for density counts. + brilho_param: Brilho #When Brilho = 0, optimization is turned off. + maximize: false + rng_seed: 2 #Random seed parameter initialization + ranges: + branco: + - 1 + - 100 + others: + - 0 + - 100 +slave: + port: "5000" #Must be a string + network: "192.168.1.1/24" + exclude: #Leave blank if there are none to exclude +system: + #About the partition system (This is done at `init_spectra.py`) + #`all` will create a single manager for all reactors. + #`single` will create a manager for each reactor. + partition: all + log_level: "DEBUG" #DEBUG, INFO, WARNING, ERROR, CRITICAL + sync_clocks: false #Whether or not to sync the clocks of the slaves. + initial_state: "preset_state.d/all.csv" + reboot_wait_time: 5 #Time in seconds to wait after a reboot. + relevant_parameters: + # - brilho + - branco + - full + - "440" + - "470" + - "495" + - "530" + - "595" + - "634" + - "660" + - "684" + irradiance: + #brilho: 12 + "branco": 10.35 + "full": 11.50 + "440": 7.72 + "470": 8.50 + "495": 8.30 + "530": 8.56 + "595": 2.17 + "634": 3.44 + "660": 4.52 + "684": 5.74 + standard_parameters: + #brilho: float + branco: float + full: float + "440": float + "470": float + "495": float + "530": float + "595": float + "634": float + "660": float + "684": float + cor: int + modopainel: int + brilho: float + bomdia: int + boanoite: int + tau: int + step: int + modotemp: int + temp: int + densidade: float + mododil: int + ar: int + ima: int + modoco2: int + co2: int + dtco2: int +tensorboard: + additional_parameters: + - Brilho + - Temp + - pH \ No newline at end of file diff --git a/pyduino/log.py b/pyduino/log.py index 6df5851..5abdabc 100644 --- a/pyduino/log.py +++ b/pyduino/log.py @@ -4,7 +4,6 @@ from datetime import datetime import io from glob import glob -from uuid import uuid1 from tabulate import tabulate from collections import OrderedDict from datetime import datetime @@ -37,7 +36,7 @@ def to_markdown_table(data: OrderedDict) -> str: def y_to_table(y): return tabulate(list(y.items()), tablefmt="pipe") -class log: +class Log: @property def timestamp(self): """str: Current date.""" @@ -49,15 +48,15 @@ def prefix(self): def __init__(self,subdir,path="./log",name=None): """ - Logs data into csvs with timestamps. + Logs data into jsonls with timestamps. Example: log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0') log/YEAR/MONTH/ ├─ experiment_0/ - │ ├─ reactor_0.csv - │ ├─ reactor_1.csv + │ ├─ reactor_0.jsonl + │ ├─ reactor_1.jsonl Args: subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`. @@ -75,7 +74,7 @@ def __init__(self,subdir,path="./log",name=None): self.subdir = subdir else: raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") - self.subdir = list(map(lambda x: str(x)+".csv" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) + self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) self.first_timestamp = None self.data_frames = {} @@ -88,7 +87,7 @@ def __init__(self,subdir,path="./log",name=None): self.subdir = subdir else: raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.") - self.subdir = list(map(lambda x: str(x)+".csv" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) + self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir)) self.first_timestamp = None self.data_frames = {} @@ -100,19 +99,18 @@ def backup_config_file(self): with open(config_file) as cfile, open(filename,'w') as wfile: wfile.write(cfile.read()) - def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs): + def log_rows(self,rows,subdir,add_timestamp=True,tags=None): """ - Logs rows into csv format. + Logs rows into jsonl format. Args: rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe. subdir (str): Subdirectory name. Intended to be an element of `self.subdir`. add_timestamp (bool,optional): Whether or not to include a timestamp column. tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns. - **kwargs: Additional arguments passed to `pandas.to_csv`. """ t = self.timestamp - path = os.path.join(self.path,self.start_timestamp,f"{subdir}.csv") + path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl") df = pd.DataFrame() if isinstance(rows,list): @@ -125,7 +123,7 @@ def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs): if os.path.exists(path): if self.first_timestamp is None: with open(path) as file: - head = pd.read_csv(io.StringIO(file.readline()+file.readline()),index_col=False,**kwargs) + head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True) self.first_timestamp = datetime_from_str(head.log_timestamp[0]) else: self.first_timestamp = t @@ -136,17 +134,13 @@ def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs): for key,value in tags.items(): df.loc[:,key] = value - df.to_csv( - path, - mode="a", - header=not os.path.exists(path), - index=False, - **kwargs - ) + with open(path, mode="a") as log_file: + log_file.write(df.to_json(orient="records", lines=True)) + return df def log_many_rows(self,data,**kwargs): """ - Logs rows into csv format. + Logs rows into jsonl format. Args: data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame. @@ -154,7 +148,7 @@ def log_many_rows(self,data,**kwargs): """ self.data_frames = {} for _id,row in data.items(): - df = self.log_rows(rows=[row],subdir=_id,sep='\t',**kwargs) + df = self.log_rows(rows=[row],subdir=_id,**kwargs) self.data_frames[_id] = df self.data_frames = pd.concat(list(self.data_frames.values())) @@ -164,7 +158,7 @@ def log_optimal(self,column,maximum=True,**kwargs): """ i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin() self.df_opt = self.data_frames.iloc[i,:] - self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',sep='\t',**kwargs) + self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs) def log_average(self, cols: list, **kwargs): """ @@ -178,38 +172,38 @@ def log_average(self, cols: list, **kwargs): df.loc[:, cols] = df.loc[:, cols].astype(float) df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2) self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index() - self.log_rows(rows=self.df_avg, subdir='avg', sep='\t', **kwargs) + self.log_rows(rows=self.df_avg, subdir='avg', **kwargs) - def cache_data(self,rows,path="./cache.csv",**kwargs): + def cache_data(self,rows,path="./cache.jsonl",**kwargs): """ - Dumps rows into a single csv. + Dumps rows into a single jsonl. Args: rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows. - path (str): Path to the csv file. + path (str): Path to the jsonl file. """ - pd.DataFrame(rows).T.to_csv(path,**kwargs) + pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs) - def transpose(self,columns,destination,sep='\t',skip=1,**kwargs): + def transpose(self,columns,destination,skip=1,**kwargs): """ - Maps reactor csv to column csvs with columns given by columns. + Maps reactor jsonl to column jsonls with columns given by columns. Args: columns (:obj:list of :obj:str): List of columns to extract. destination (str): Destination path. Creates directories as needed and overwrites any existing files. - sep (str, optional): Column separator. Defaults to '\t'. + skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. """ dfs = [] for file in self.paths: - df = pd.read_csv(file,index_col=False,sep=sep,**kwargs) + df = pd.read_json(file, orient="records", lines=True, **kwargs) df['FILE'] = file dfs.append(df.iloc[::skip,:]) df = pd.concat(dfs) for column in columns: Path(destination).mkdir(parents=True,exist_ok=True) - df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_csv(os.path.join(destination,f"{column}.csv"),sep=sep) + df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True) class LogAggregator: @@ -225,20 +219,19 @@ def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elap self.glob_list = log_paths self.timestamp_col = timestamp_col self.elapsed_time_col = elapsed_time_col - def agg(self,destination,skip=1,sep='\t',**kwargs): + def agg(self,destination,skip=1,**kwargs): """ Aggregator Args: destination (str): Destination path. Creates directories as needed and overwrites any existing files. skip (int, optional): How many rows to jump while reading the input files. Defaults to 1. - sep (str, optional): Column separator. Defaults to '\t'. """ dfs = {} for path in self.glob_list: for file in glob(path): basename = os.path.basename(file) - df = pd.read_csv(file,index_col=False,sep=sep,dtype={self.elapsed_time_col:float},**kwargs) + df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs) df = df.iloc[::skip,:] df['FILE'] = file if dfs.get(basename,None) is not None: @@ -256,5 +249,5 @@ def agg(self,destination,skip=1,sep='\t',**kwargs): for filename, df in dfs.items(): Path(destination).mkdir(parents=True,exist_ok=True) path = os.path.join(destination,filename) - df.to_csv(path,sep=sep,index=False) + df.to_json(path, orient="records", lines=True) diff --git a/pyduino/pyduino2.py b/pyduino/pyduino2.py index 5e6402c..b157c54 100644 --- a/pyduino/pyduino2.py +++ b/pyduino/pyduino2.py @@ -3,7 +3,7 @@ from time import sleep, time import pandas as pd from collections import OrderedDict -from pyduino.log import log +from pyduino.log import Log import pandas as pd from multiprocessing import Pool, Process from functools import partial @@ -227,11 +227,10 @@ def send(self,command,await_response=True,**kwargs): r._send(command) return out - def send_parallel(self,command,delay,await_response=True): - out = [] + def send_parallel(self,command,await_response=True): with Pool(7) as p: out = p.map(partial(send_wrapper,command=command,await_response=await_response),list(self.reactors.items())) - return out + return dict(out) def set(self, data=None, **kwargs): for k,r in self.reactors.items(): @@ -268,9 +267,18 @@ def log_init(self,**kwargs): Args: name (str): Name of the subdirectory in the log folder where the files will be saved. """ - self.log = log(subdir=list(self.reactors.keys()),**kwargs) + self.log = Log(subdir=list(self.reactors.keys()),**kwargs) print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}") + @property + def brilho(self): + """ + Convenience method to get brilho from reactors. + """ + out = self.send_parallel(f"get({self.brilho_param.lower()})") + out = {k: float(v.strip()) for k,v in out.items()} + return out + def dados(self,save_cache=True): """ Get data from Arduinos. @@ -284,7 +292,7 @@ def dados(self,save_cache=True): len_empty = None while len_empty!=0: - rows = self.send_parallel("dados",delay=20,await_response=True) + rows = self.send_parallel("dados",await_response=True).items() #Checking if any reactor didn't respond. empty = list(filter(lambda x: x[1] is None,rows)) len_empty = len(empty) @@ -305,7 +313,7 @@ def dados(self,save_cache=True): rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows)) if save_cache: - self.log.cache_data(rows,sep='\t',index=False) #Index set to False because ID already exists in rows. + self.log.cache_data(rows) #Index set to False because ID already exists in rows. return rows def log_dados(self,save_cache=True): @@ -322,10 +330,10 @@ def log_dados(self,save_cache=True): rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows)) for _id,row in rows: - self.log.log_rows(rows=[row],subdir=_id,sep='\t') + self.log.log_rows(rows=[row],subdir=_id) rows = dict(rows) if save_cache: - self.log.cache_data(rows,sep='\t',index=False) #Index set to False because ID already exists in rows. + self.log.cache_data(rows) #Index set to False because ID already exists in rows. return rows def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs): diff --git a/pyduino/spectra.py b/pyduino/spectra.py index 4a488a1..6c59dfd 100644 --- a/pyduino/spectra.py +++ b/pyduino/spectra.py @@ -10,7 +10,7 @@ from datetime import date, datetime from pyduino.data_parser import RangeParser from collections import OrderedDict -from pyduino.utils import bcolors, get_param +from pyduino.utils import bcolors, get_param, partition from pyduino.paths import PATHS from pyduino.log import datetime_to_str, y_to_table, to_markdown_table import traceback @@ -200,12 +200,6 @@ def data_dict_to_matrix(self,D): [[self.D[i].get(u,np.nan) for u in self.parameters] for i in self.ids] ).astype(float) ) - def pretty_print_dict(self,D): - df = pd.DataFrame(D) - df.index = df.index.str.lower() - df = df.loc[self.parameters,:] - df.loc['fitness'] = self.y - return df.round(decimals=2) def F_get(self): """ Extracts relevant data from Arduinos. @@ -257,7 +251,7 @@ def log_data(self, i, tags={}): """ logging.info(f"LOGGING {datetime.now().strftime('%c')}") data = self.F_get() - logging.debug(f"DATA {data}") + logging.debug(f"DATA\n{pd.DataFrame(data)}") y = get_param(data, self.density_param, self.reactors) y = {k:float(v) for k,v in y.items()} additional_parameters = {} @@ -304,12 +298,12 @@ def ask_oracle(self, X) -> np.ndarray: Returns: np.ndarray: The fitness value calculated by the oracle. """ - y = np.array([]) assert X.shape[1] == len(self.parameters) - assert len(X.shape) == 2, "X must be a 2D array." - n_partitions = len(X) // len(self.reactors) + (len(X) % len(self.reactors) > 0) - partitions = np.array_split(X, n_partitions) + assert X.ndim == 2, "X must be a 2D array." + partitions = partition(X, len(self.reactors)) + + y = pd.Series([]) for partition in partitions: self.payload = self.assign_to_reactors(partition) @@ -318,21 +312,23 @@ def ask_oracle(self, X) -> np.ndarray: if self.deltaTgotod is not None and isinstance(self.deltaTgotod, int): self.gotod() data0 = self.F_get() - f0 = get_param(data0, self.density_param, reactors) + f0 = pd.Series(get_param(data0, self.density_param, reactors)) self.F_set(self.payload) time.sleep(self.deltaT) data = self.F_get() - f = get_param(data, self.density_param, reactors) + f = pd.Series(get_param(data, self.density_param, reactors)) #yield_rate = np.array([(float(f[id])/float(f[id]) - 1)/self.deltaT/self.power[id] for id in reactors]).astype(float) - fitness = np.array([self.power[id] for id in reactors]).astype(float) - - y = np.append(y,((-1)**(self.maximize))*(fitness)) + fitness = pd.Series(dict([(id, float(self.power[id])) for id in reactors])) - self.y = y - return y + partial_y = np.array(((-1)**(self.maximize))*(fitness)) + y = y.append(partial_y) + + assert len(y) == len(self.parameters), f"Got different shapes for y: {len(y)} and n params: {len(self.parameters)}" + + return np.array([y[id] for id in reactors]) # === * === def run( @@ -386,18 +382,13 @@ def _run(self, deltaT: float, mode: str = 'optimize', deltaTgotod: int = None): if self.brilho_param is None: self.step() else: - brilhos = np.array(list(get_param(self.F_get(), self.brilho_param, self.reactors))) + brilhos = np.array(list(self.brilho.values())) if np.all(brilhos > 0): self.step() else: logging.info(f"{self.brilho_param} is off. No optimization steps are being performed.") if self.deltaTgotod is not None and isinstance(self.deltaTgotod, int): self.gotod() - elif mode == "free": - data = self.F_get() - self.y = get_param(data, self.density_param, self.reactors) - logging.debug(f"SET {datetime.now().strftime('%c')}") - print(y_to_table(self.y)) self.log_data(self.iteration_counter) self.iteration_counter += 1 except Exception as e: diff --git a/pyduino/utils.py b/pyduino/utils.py index fd58049..1111a4a 100644 --- a/pyduino/utils.py +++ b/pyduino/utils.py @@ -131,4 +131,25 @@ def Q(self,x: int): return (self.N - abs((x%(2*self.N))-self.N))*(self.p_f - self.p_i)/self.N + self.p_i def y(self,x: int): - return self.Q(x + self.a) \ No newline at end of file + return self.Q(x + self.a) + +def partition(X: np.ndarray, n: int) -> list: + """ + Partitions the array `X` in blocks of size `n` except the last. + + Args: + X (numpy.array): Input 2D array + n (int): Number of partitions + + Returns: + list: A list containing the array partitions. + """ + assert X.ndim == 2, "X must be a matrix" + #Number of partitions + r = X.shape[0] % n + m = X.shape[0] // n + (r > 0) + X_enlarged = np.pad(X, ((0, n*m - X.shape[0]), (0,0)), constant_values=0) + X_split = np.array_split(X_enlarged, m) + if r > 0: + X_split[-1] = X_split[-1][:r,:] + return X_split diff --git a/tests/test_utils.py b/tests/test_utils.py index 31c47fe..4d78758 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,7 +1,8 @@ import sys import os sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from pyduino.utils import get_param +from pyduino.utils import get_param, partition +import numpy as np import dotenv dotenv.load_dotenv() dotenv.load_dotenv(dotenv_path=".env.local") @@ -25,4 +26,23 @@ def test_get_param(): key = 'param1' ids = {'C', 'A'} expected_output = {'C': 5, 'A': 1} - assert get_param(data, key, ids) == expected_output \ No newline at end of file + assert get_param(data, key, ids) == expected_output + +def test_partition(): + # Test case 1: Array size is divisible by n + X = np.array([[1, 2], [3, 4], [5, 6], [7, 8]]) + n = 2 + expected_output = [np.array([[1, 2], [3, 4]]), np.array([[5, 6], [7, 8]])] + assert np.array_equal(partition(X, n), expected_output) + + # Test case 2: Array size is not divisible by n + X = np.array([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]) + n = 3 + expected_output = [np.array([[1, 2], [3, 4], [5, 6]]), np.array([[7, 8], [9, 10]])] + assert all(np.array_equal(o, e) for o,e in zip(partition(X, n), expected_output)) + + # Test case 3: Array size is smaller than n + X = np.array([[1, 2]]) + n = 3 + expected_output = [np.array([[1, 2]])] + assert np.array_equal(partition(X, n), expected_output) \ No newline at end of file