Skip to content

Commit

Permalink
marge
Browse files Browse the repository at this point in the history
  • Loading branch information
icarosadero committed Jun 23, 2024
1 parent 1499226 commit dfda8d5
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 74 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,6 @@ log/
calibrate/
experiment.py
nohup.out
cache.jsonl
quick_test.py
*.jsonl
**/config.yaml
84 changes: 84 additions & 0 deletions pyduino/config.template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
hyperparameters:
reset_density: true #Whether or not to use gotod
log_name: #Log folder name (delete to use the default)
density_param: DensidadeAtual #Parameter name for density counts.
brilho_param: Brilho #When Brilho = 0, optimization is turned off.
maximize: false
rng_seed: 2 #Random seed parameter initialization
ranges:
branco:
- 1
- 100
others:
- 0
- 100
slave:
port: "5000" #Must be a string
network: "192.168.1.1/24"
exclude: #Leave blank if there are none to exclude
system:
#About the partition system (This is done at `init_spectra.py`)
#`all` will create a single manager for all reactors.
#`single` will create a manager for each reactor.
partition: all
log_level: "DEBUG" #DEBUG, INFO, WARNING, ERROR, CRITICAL
sync_clocks: false #Whether or not to sync the clocks of the slaves.
initial_state: "preset_state.d/all.csv"
reboot_wait_time: 5 #Time in seconds to wait after a reboot.
relevant_parameters:
# - brilho
- branco
- full
- "440"
- "470"
- "495"
- "530"
- "595"
- "634"
- "660"
- "684"
irradiance:
#brilho: 12
"branco": 10.35
"full": 11.50
"440": 7.72
"470": 8.50
"495": 8.30
"530": 8.56
"595": 2.17
"634": 3.44
"660": 4.52
"684": 5.74
standard_parameters:
#brilho: float
branco: float
full: float
"440": float
"470": float
"495": float
"530": float
"595": float
"634": float
"660": float
"684": float
cor: int
modopainel: int
brilho: float
bomdia: int
boanoite: int
tau: int
step: int
modotemp: int
temp: int
densidade: float
mododil: int
ar: int
ima: int
modoco2: int
co2: int
dtco2: int
tensorboard:
additional_parameters:
- Brilho
- Temp
- pH
65 changes: 29 additions & 36 deletions pyduino/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from datetime import datetime
import io
from glob import glob
from uuid import uuid1
from tabulate import tabulate
from collections import OrderedDict
from datetime import datetime
Expand Down Expand Up @@ -37,7 +36,7 @@ def to_markdown_table(data: OrderedDict) -> str:
def y_to_table(y):
return tabulate(list(y.items()), tablefmt="pipe")

class log:
class Log:
@property
def timestamp(self):
"""str: Current date."""
Expand All @@ -49,15 +48,15 @@ def prefix(self):

def __init__(self,subdir,path="./log",name=None):
"""
Logs data into csvs with timestamps.
Logs data into jsonls with timestamps.
Example:
log_obj = log(['reactor_0','reactor_1'],path='./log',name='experiment_0')
log/YEAR/MONTH/
├─ experiment_0/
│ ├─ reactor_0.csv
│ ├─ reactor_1.csv
│ ├─ reactor_0.jsonl
│ ├─ reactor_1.jsonl
Args:
subdir (:obj:`list` of :obj:`str`): List of the names for the subdirectories of `path`.
Expand All @@ -75,7 +74,7 @@ def __init__(self,subdir,path="./log",name=None):
self.subdir = subdir
else:
raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
self.subdir = list(map(lambda x: str(x)+".csv" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
self.first_timestamp = None
self.data_frames = {}

Expand All @@ -88,7 +87,7 @@ def __init__(self,subdir,path="./log",name=None):
self.subdir = subdir
else:
raise ValueError("Invalid type for subdir. Must be either a list of strings or a glob string.")
self.subdir = list(map(lambda x: str(x)+".csv" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
self.subdir = list(map(lambda x: str(x)+".jsonl" if len(os.path.splitext(str(x))[1])==0 else str(x),self.subdir))
self.first_timestamp = None
self.data_frames = {}

Expand All @@ -100,19 +99,18 @@ def backup_config_file(self):
with open(config_file) as cfile, open(filename,'w') as wfile:
wfile.write(cfile.read())

def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs):
def log_rows(self,rows,subdir,add_timestamp=True,tags=None):
"""
Logs rows into csv format.
Logs rows into jsonl format.
Args:
rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows or pandas dataframe.
subdir (str): Subdirectory name. Intended to be an element of `self.subdir`.
add_timestamp (bool,optional): Whether or not to include a timestamp column.
tags (:obj:`dict` of :obj:`str`): Dictionary of strings to be inserted as constant columns.
**kwargs: Additional arguments passed to `pandas.to_csv`.
"""
t = self.timestamp
path = os.path.join(self.path,self.start_timestamp,f"{subdir}.csv")
path = os.path.join(self.path,self.start_timestamp,f"{subdir}.jsonl")

df = pd.DataFrame()
if isinstance(rows,list):
Expand All @@ -125,7 +123,7 @@ def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs):
if os.path.exists(path):
if self.first_timestamp is None:
with open(path) as file:
head = pd.read_csv(io.StringIO(file.readline()+file.readline()),index_col=False,**kwargs)
head = pd.read_json(io.StringIO(file.readline()+file.readline()), orient="records", lines=True)
self.first_timestamp = datetime_from_str(head.log_timestamp[0])
else:
self.first_timestamp = t
Expand All @@ -136,25 +134,21 @@ def log_rows(self,rows,subdir,add_timestamp=True,tags=None,**kwargs):
for key,value in tags.items():
df.loc[:,key] = value

df.to_csv(
path,
mode="a",
header=not os.path.exists(path),
index=False,
**kwargs
)
with open(path, mode="a") as log_file:
log_file.write(df.to_json(orient="records", lines=True))

return df
def log_many_rows(self,data,**kwargs):
"""
Logs rows into csv format.
Logs rows into jsonl format.
Args:
data (:obj:`dict` of :obj:`dict`): Dictionary encoded data frame.
**kwargs: Additional arguments passed to `self.log_rows`.
"""
self.data_frames = {}
for _id,row in data.items():
df = self.log_rows(rows=[row],subdir=_id,sep='\t',**kwargs)
df = self.log_rows(rows=[row],subdir=_id,**kwargs)
self.data_frames[_id] = df
self.data_frames = pd.concat(list(self.data_frames.values()))

Expand All @@ -164,7 +158,7 @@ def log_optimal(self,column,maximum=True,**kwargs):
"""
i=self.data_frames.loc[:,column].astype(float).argmax() if maximum else self.data_frames.loc[:,column].astype(float).argmin()
self.df_opt = self.data_frames.iloc[i,:]
self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',sep='\t',**kwargs)
self.log_rows(rows=[self.df_opt.to_dict()],subdir='opt',**kwargs)

def log_average(self, cols: list, **kwargs):
"""
Expand All @@ -178,38 +172,38 @@ def log_average(self, cols: list, **kwargs):
df.loc[:, cols] = df.loc[:, cols].astype(float)
df.elapsed_time_hours = df.elapsed_time_hours.round(decimals=2)
self.df_avg = df.loc[:, cols + ['elapsed_time_hours']].groupby("elapsed_time_hours").mean().reset_index()
self.log_rows(rows=self.df_avg, subdir='avg', sep='\t', **kwargs)
self.log_rows(rows=self.df_avg, subdir='avg', **kwargs)

def cache_data(self,rows,path="./cache.csv",**kwargs):
def cache_data(self,rows,path="./cache.jsonl",**kwargs):
"""
Dumps rows into a single csv.
Dumps rows into a single jsonl.
Args:
rows (:obj:`list` of :obj:`dict`): List of dictionary-encoded rows.
path (str): Path to the csv file.
path (str): Path to the jsonl file.
"""
pd.DataFrame(rows).T.to_csv(path,**kwargs)
pd.DataFrame(rows).T.to_json(path, orient="records", lines=True, **kwargs)

def transpose(self,columns,destination,sep='\t',skip=1,**kwargs):
def transpose(self,columns,destination,skip=1,**kwargs):
"""
Maps reactor csv to column csvs with columns given by columns.
Maps reactor jsonl to column jsonls with columns given by columns.
Args:
columns (:obj:list of :obj:str): List of columns to extract.
destination (str): Destination path. Creates directories as needed and overwrites any existing files.
sep (str, optional): Column separator. Defaults to '\t'.
skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
"""
dfs = []
for file in self.paths:
df = pd.read_csv(file,index_col=False,sep=sep,**kwargs)
df = pd.read_json(file, orient="records", lines=True, **kwargs)
df['FILE'] = file
dfs.append(df.iloc[::skip,:])
df = pd.concat(dfs)

for column in columns:
Path(destination).mkdir(parents=True,exist_ok=True)
df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_csv(os.path.join(destination,f"{column}.csv"),sep=sep)
df.loc[:,['ID','FILE',column,'elapsed_time_hours']].to_json(os.path.join(destination,f"{column}.jsonl"), orient="records", lines=True)


class LogAggregator:
Expand All @@ -225,20 +219,19 @@ def __init__(self,log_paths,timestamp_col="log_timestamp",elapsed_time_col="elap
self.glob_list = log_paths
self.timestamp_col = timestamp_col
self.elapsed_time_col = elapsed_time_col
def agg(self,destination,skip=1,sep='\t',**kwargs):
def agg(self,destination,skip=1,**kwargs):
"""
Aggregator
Args:
destination (str): Destination path. Creates directories as needed and overwrites any existing files.
skip (int, optional): How many rows to jump while reading the input files. Defaults to 1.
sep (str, optional): Column separator. Defaults to '\t'.
"""
dfs = {}
for path in self.glob_list:
for file in glob(path):
basename = os.path.basename(file)
df = pd.read_csv(file,index_col=False,sep=sep,dtype={self.elapsed_time_col:float},**kwargs)
df = pd.read_json(file, orient="records", lines=True, dtype={self.elapsed_time_col:float},**kwargs)
df = df.iloc[::skip,:]
df['FILE'] = file
if dfs.get(basename,None) is not None:
Expand All @@ -256,5 +249,5 @@ def agg(self,destination,skip=1,sep='\t',**kwargs):
for filename, df in dfs.items():
Path(destination).mkdir(parents=True,exist_ok=True)
path = os.path.join(destination,filename)
df.to_csv(path,sep=sep,index=False)
df.to_json(path, orient="records", lines=True)

26 changes: 17 additions & 9 deletions pyduino/pyduino2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from time import sleep, time
import pandas as pd
from collections import OrderedDict
from pyduino.log import log
from pyduino.log import Log
import pandas as pd
from multiprocessing import Pool, Process
from functools import partial
Expand Down Expand Up @@ -227,11 +227,10 @@ def send(self,command,await_response=True,**kwargs):
r._send(command)
return out

def send_parallel(self,command,delay,await_response=True):
out = []
def send_parallel(self,command,await_response=True):
with Pool(7) as p:
out = p.map(partial(send_wrapper,command=command,await_response=await_response),list(self.reactors.items()))
return out
return dict(out)

def set(self, data=None, **kwargs):
for k,r in self.reactors.items():
Expand Down Expand Up @@ -268,9 +267,18 @@ def log_init(self,**kwargs):
Args:
name (str): Name of the subdirectory in the log folder where the files will be saved.
"""
self.log = log(subdir=list(self.reactors.keys()),**kwargs)
self.log = Log(subdir=list(self.reactors.keys()),**kwargs)
print(f"Log will be saved on: {bcolors.OKGREEN}{self.log.prefix}{bcolors.ENDC}")

@property
def brilho(self):
"""
Convenience method to get brilho from reactors.
"""
out = self.send_parallel(f"get({self.brilho_param.lower()})")
out = {k: float(v.strip()) for k,v in out.items()}
return out

def dados(self,save_cache=True):
"""
Get data from Arduinos.
Expand All @@ -284,7 +292,7 @@ def dados(self,save_cache=True):

len_empty = None
while len_empty!=0:
rows = self.send_parallel("dados",delay=20,await_response=True)
rows = self.send_parallel("dados",await_response=True).items()
#Checking if any reactor didn't respond.
empty = list(filter(lambda x: x[1] is None,rows))
len_empty = len(empty)
Expand All @@ -305,7 +313,7 @@ def dados(self,save_cache=True):

rows = dict(map(lambda x: (x[0],OrderedDict(zip(self.header,x[1].split(" ")))),rows))
if save_cache:
self.log.cache_data(rows,sep='\t',index=False) #Index set to False because ID already exists in rows.
self.log.cache_data(rows) #Index set to False because ID already exists in rows.
return rows

def log_dados(self,save_cache=True):
Expand All @@ -322,10 +330,10 @@ def log_dados(self,save_cache=True):
rows = list(map(lambda x: (x[0],OrderedDict(zip(header,x[1].split(" ")))),rows))

for _id,row in rows:
self.log.log_rows(rows=[row],subdir=_id,sep='\t')
self.log.log_rows(rows=[row],subdir=_id)
rows = dict(rows)
if save_cache:
self.log.cache_data(rows,sep='\t',index=False) #Index set to False because ID already exists in rows.
self.log.cache_data(rows) #Index set to False because ID already exists in rows.
return rows

def set_preset_state(self,path="preset_state.csv",sep="\t",chunksize=4, params=PATHS.REACTOR_PARAMETERS, **kwargs):
Expand Down
Loading

0 comments on commit dfda8d5

Please sign in to comment.