From fc5a235333bb9b7303a7436505e666b5189ad0ba Mon Sep 17 00:00:00 2001 From: Jnana Sagar Date: Tue, 11 Sep 2018 13:22:03 +0530 Subject: [PATCH] provide task persistent storage by postgres db and close the respective db conn at the stop of scheduler --- _example/postgres/main.go | 52 ++++++++++++ scheduler.go | 1 + storage/memory.go | 4 + storage/noop.go | 4 + storage/postgres.go | 165 ++++++++++++++++++++++++++++++++++++++ storage/sqlite3.go | 2 +- storage/store.go | 1 + 7 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 _example/postgres/main.go create mode 100644 storage/postgres.go diff --git a/_example/postgres/main.go b/_example/postgres/main.go new file mode 100644 index 0000000..a3ea94a --- /dev/null +++ b/_example/postgres/main.go @@ -0,0 +1,52 @@ +package main + +import ( +"github.com/rakanalh/scheduler" +"github.com/rakanalh/scheduler/storage" +"io" +"log" +"time" +) + +func TaskWithoutArgs() { + log.Println("TaskWithoutArgs is executed") +} + +func TaskWithArgs(message string) { + log.Println("TaskWithArgs is executed. message:", message) +} + +func main() { + storage,err := storage.NewPostgresStorage( + storage.PostgresConfig{ + DbURL: "postgresql://:@localhost:5432/scheduler?sslmode=disable", + }, + ) + if err != nil{ + log.Fatalf("Couldn't create scheduler storage : %v",err) + } + + s := scheduler.New(storage) + + go func(s scheduler.Scheduler,store io.Closer){ + time.Sleep(time.Second *10) + // store.Close() + s.Stop() + }(s,storage) + // Start a task without arguments + if _, err := s.RunAfter(60*time.Second, TaskWithoutArgs); err != nil { + log.Fatal(err) + } + + // Start a task with arguments + if _, err := s.RunEvery(5*time.Second, TaskWithArgs, "Hello from recurring task 1"); err != nil { + log.Fatal(err) + } + + // Start the same task as above with a different argument + if _, err := s.RunEvery(10*time.Second, TaskWithArgs, "Hello from recurring task 2"); err != nil { + log.Fatal(err) + } + s.Start() + s.Wait() +} diff --git a/scheduler.go b/scheduler.go index 60b6554..0412fab 100644 --- a/scheduler.go +++ b/scheduler.go @@ -107,6 +107,7 @@ func (scheduler *Scheduler) Start() error { // Stop will put the scheduler to halt func (scheduler *Scheduler) Stop() { + scheduler.taskStore.store.Close() scheduler.stopChan <- true } diff --git a/storage/memory.go b/storage/memory.go index 2e0accc..d9df928 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -33,3 +33,7 @@ func (memStore *MemoryStorage) Remove(task TaskAttributes) error { memStore.tasks = newTasks return nil } + +func (memStore *MemoryStorage) Close()error{ + return nil +} diff --git a/storage/noop.go b/storage/noop.go index d999e6b..13b4cf4 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -23,3 +23,7 @@ func (noop NoOpStorage) Fetch() ([]TaskAttributes, error) { func (noop NoOpStorage) Remove(task TaskAttributes) error { return nil } + +func (noop NoOpStorage) Close()error{ + return nil +} diff --git a/storage/postgres.go b/storage/postgres.go new file mode 100644 index 0000000..d795e65 --- /dev/null +++ b/storage/postgres.go @@ -0,0 +1,165 @@ +package storage + +import ( + "database/sql" + "fmt" + "log" + + _ "github.com/lib/pq" +) + +type PostgresConfig struct { + DbURL string +} + +type postgresStorage struct { + config PostgresConfig + db *sql.DB +} + +// creates new instance of postgres DB +func NewPostgresStorage(config PostgresConfig) (postgres *postgresStorage,err error){ + // TODO should connect and initialize as well. + postgres = &postgresStorage{config: config} + // tyr to connect to givenDB. + err = postgres.connect() + if err != nil{ + log.Printf("Unable to connect to DB : %s, error : %v",config.DbURL,err) + return nil,err + } + // lets initialize the DB as needed. + err = postgres.initialize() + if err != nil{ + log.Printf("Couldn't initialize the DB, error : %v",err) + return nil, err + } + return postgres,nil +} + +// Connect creates a database connection to the given config URL, and assigns to the Storage fields `db`. +func (postgres *postgresStorage) connect() (err error) { + db, err := sql.Open("postgres", postgres.config.DbURL) + if err != nil { + return err + } + postgres.db = db + return nil +} + +func (postgres *postgresStorage) initialize() (err error) { + stmt := ` + CREATE TABLE IF NOT EXISTS task_store ( + id SERIAL NOT NULL PRIMARY KEY, + name text, + params text, + duration text, + last_run text, + next_run text, + is_recurring text, + hash text + ); + ` + _, err = postgres.db.Exec(stmt) + if err != nil { + log.Printf("Error while initializing: %q - %+v", stmt, err) + return + } + return +} + +func (postgres *postgresStorage) Close() error { + return postgres.db.Close() +} + +func (postgres *postgresStorage) Add(task TaskAttributes) error { + // should add a task to the database `task_store` table + var count int + rows, err := postgres.db.Query("SELECT count(*) FROM task_store WHERE hash=($1) ;", task.Hash) + defer rows.Close() + if err == nil { + rows.Next() + _ = rows.Scan(&count) + } + + if count == 0 { + return postgres.insert(task) + } + return nil +} + +func (postgres *postgresStorage) Fetch() ([]TaskAttributes, error) { + // read all the rows task_store table. + rows, err := postgres.db.Query(` + SELECT name, params, duration, last_run, next_run, is_recurring + FROM task_store ;`) + + if err != nil { + log.Fatal(err) + } + + defer rows.Close() + + var tasks []TaskAttributes + + for rows.Next() { + // var task TaskAttributes + task := TaskAttributes{} + err = rows.Scan(&task.Name, &task.Params, &task.Duration, &task.LastRun, &task.NextRun, &task.IsRecurring) + if err != nil { + return []TaskAttributes{}, err + } + tasks = append(tasks, task) + } + err = rows.Err() + if err != nil { + log.Fatal(err) + } + return tasks, nil +} + +func (postgres *postgresStorage) Remove(task TaskAttributes) error { + // should delete the entry from `task_stor` table. + stmt, err := postgres.db.Prepare(`DELETE FROM task_store WHERE hash=($1) ;`) + + if err != nil { + return fmt.Errorf("Error while pareparing delete task statement: %s+v", err) + } + + defer stmt.Close() + + _, err = stmt.Exec( + task.Hash, + ) + if err != nil { + return fmt.Errorf("Error while deleting task: %+v", err) + } + + return nil +} + +func (postgres *postgresStorage) insert(task TaskAttributes) (err error) { + stmt, err := postgres.db.Prepare(` + INSERT INTO task_store(name, params, duration, last_run, next_run, is_recurring, hash) + VALUES(($1), ($2), ($3), ($4), ($5), ($6), ($7));`) + + if err != nil { + return fmt.Errorf("Error while pareparing insert task statement: %s", err) + } + + defer stmt.Close() + + _, err = stmt.Exec( + task.Name, + task.Params, + task.Duration, + task.LastRun, + task.NextRun, + task.IsRecurring, + task.Hash, + ) + if err != nil { + return fmt.Errorf("Error while inserting task: %s", err) + } + + return nil +} diff --git a/storage/sqlite3.go b/storage/sqlite3.go index 3bab59e..08a4b84 100644 --- a/storage/sqlite3.go +++ b/storage/sqlite3.go @@ -40,7 +40,7 @@ func (sqlite *Sqlite3Storage) Connect() error { } // Close will close the open DB file. -func (sqlite *Sqlite3Storage) Close() error { +func (sqlite Sqlite3Storage) Close() error { return sqlite.db.Close() } diff --git a/storage/store.go b/storage/store.go index 089cc76..5d743f1 100644 --- a/storage/store.go +++ b/storage/store.go @@ -18,4 +18,5 @@ type TaskStore interface { Add(TaskAttributes) error Fetch() ([]TaskAttributes, error) Remove(TaskAttributes) error + Close()error }