Skip to content
This repository was archived by the owner on Feb 19, 2019. It is now read-only.

Commit

Permalink
provide task persistent storage by postgres db and close the respecti…
Browse files Browse the repository at this point in the history
…ve db conn at the stop of scheduler
  • Loading branch information
Jnana Sagar committed Sep 25, 2018
1 parent be46358 commit fc5a235
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 1 deletion.
52 changes: 52 additions & 0 deletions _example/postgres/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"github.com/rakanalh/scheduler"
"github.com/rakanalh/scheduler/storage"
"io"
"log"
"time"
)

func TaskWithoutArgs() {
log.Println("TaskWithoutArgs is executed")
}

func TaskWithArgs(message string) {
log.Println("TaskWithArgs is executed. message:", message)
}

func main() {
storage,err := storage.NewPostgresStorage(
storage.PostgresConfig{
DbURL: "postgresql://<db-username>:<db-password>@localhost:5432/scheduler?sslmode=disable",
},
)
if err != nil{
log.Fatalf("Couldn't create scheduler storage : %v",err)
}

s := scheduler.New(storage)

go func(s scheduler.Scheduler,store io.Closer){
time.Sleep(time.Second *10)
// store.Close()
s.Stop()
}(s,storage)
// Start a task without arguments
if _, err := s.RunAfter(60*time.Second, TaskWithoutArgs); err != nil {
log.Fatal(err)
}

// Start a task with arguments
if _, err := s.RunEvery(5*time.Second, TaskWithArgs, "Hello from recurring task 1"); err != nil {
log.Fatal(err)
}

// Start the same task as above with a different argument
if _, err := s.RunEvery(10*time.Second, TaskWithArgs, "Hello from recurring task 2"); err != nil {
log.Fatal(err)
}
s.Start()
s.Wait()
}
1 change: 1 addition & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (scheduler *Scheduler) Start() error {

// Stop will put the scheduler to halt
func (scheduler *Scheduler) Stop() {
scheduler.taskStore.store.Close()
scheduler.stopChan <- true
}

Expand Down
4 changes: 4 additions & 0 deletions storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ func (memStore *MemoryStorage) Remove(task TaskAttributes) error {
memStore.tasks = newTasks
return nil
}

func (memStore *MemoryStorage) Close()error{
return nil
}
4 changes: 4 additions & 0 deletions storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ func (noop NoOpStorage) Fetch() ([]TaskAttributes, error) {
func (noop NoOpStorage) Remove(task TaskAttributes) error {
return nil
}

func (noop NoOpStorage) Close()error{
return nil
}
165 changes: 165 additions & 0 deletions storage/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package storage

import (
"database/sql"
"fmt"
"log"

_ "github.com/lib/pq"
)

type PostgresConfig struct {
DbURL string
}

type postgresStorage struct {
config PostgresConfig
db *sql.DB
}

// creates new instance of postgres DB
func NewPostgresStorage(config PostgresConfig) (postgres *postgresStorage,err error){
// TODO should connect and initialize as well.
postgres = &postgresStorage{config: config}
// tyr to connect to givenDB.
err = postgres.connect()
if err != nil{
log.Printf("Unable to connect to DB : %s, error : %v",config.DbURL,err)
return nil,err
}
// lets initialize the DB as needed.
err = postgres.initialize()
if err != nil{
log.Printf("Couldn't initialize the DB, error : %v",err)
return nil, err
}
return postgres,nil
}

// Connect creates a database connection to the given config URL, and assigns to the Storage fields `db`.
func (postgres *postgresStorage) connect() (err error) {
db, err := sql.Open("postgres", postgres.config.DbURL)
if err != nil {
return err
}
postgres.db = db
return nil
}

func (postgres *postgresStorage) initialize() (err error) {
stmt := `
CREATE TABLE IF NOT EXISTS task_store (
id SERIAL NOT NULL PRIMARY KEY,
name text,
params text,
duration text,
last_run text,
next_run text,
is_recurring text,
hash text
);
`
_, err = postgres.db.Exec(stmt)
if err != nil {
log.Printf("Error while initializing: %q - %+v", stmt, err)
return
}
return
}

func (postgres *postgresStorage) Close() error {
return postgres.db.Close()
}

func (postgres *postgresStorage) Add(task TaskAttributes) error {
// should add a task to the database `task_store` table
var count int
rows, err := postgres.db.Query("SELECT count(*) FROM task_store WHERE hash=($1) ;", task.Hash)
defer rows.Close()
if err == nil {
rows.Next()
_ = rows.Scan(&count)
}

if count == 0 {
return postgres.insert(task)
}
return nil
}

func (postgres *postgresStorage) Fetch() ([]TaskAttributes, error) {
// read all the rows task_store table.
rows, err := postgres.db.Query(`
SELECT name, params, duration, last_run, next_run, is_recurring
FROM task_store ;`)

if err != nil {
log.Fatal(err)
}

defer rows.Close()

var tasks []TaskAttributes

for rows.Next() {
// var task TaskAttributes
task := TaskAttributes{}
err = rows.Scan(&task.Name, &task.Params, &task.Duration, &task.LastRun, &task.NextRun, &task.IsRecurring)
if err != nil {
return []TaskAttributes{}, err
}
tasks = append(tasks, task)
}
err = rows.Err()
if err != nil {
log.Fatal(err)
}
return tasks, nil
}

func (postgres *postgresStorage) Remove(task TaskAttributes) error {
// should delete the entry from `task_stor` table.
stmt, err := postgres.db.Prepare(`DELETE FROM task_store WHERE hash=($1) ;`)

if err != nil {
return fmt.Errorf("Error while pareparing delete task statement: %s+v", err)
}

defer stmt.Close()

_, err = stmt.Exec(
task.Hash,
)
if err != nil {
return fmt.Errorf("Error while deleting task: %+v", err)
}

return nil
}

func (postgres *postgresStorage) insert(task TaskAttributes) (err error) {
stmt, err := postgres.db.Prepare(`
INSERT INTO task_store(name, params, duration, last_run, next_run, is_recurring, hash)
VALUES(($1), ($2), ($3), ($4), ($5), ($6), ($7));`)

if err != nil {
return fmt.Errorf("Error while pareparing insert task statement: %s", err)
}

defer stmt.Close()

_, err = stmt.Exec(
task.Name,
task.Params,
task.Duration,
task.LastRun,
task.NextRun,
task.IsRecurring,
task.Hash,
)
if err != nil {
return fmt.Errorf("Error while inserting task: %s", err)
}

return nil
}
2 changes: 1 addition & 1 deletion storage/sqlite3.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (sqlite *Sqlite3Storage) Connect() error {
}

// Close will close the open DB file.
func (sqlite *Sqlite3Storage) Close() error {
func (sqlite Sqlite3Storage) Close() error {
return sqlite.db.Close()
}

Expand Down
1 change: 1 addition & 0 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ type TaskStore interface {
Add(TaskAttributes) error
Fetch() ([]TaskAttributes, error)
Remove(TaskAttributes) error
Close()error
}

0 comments on commit fc5a235

Please sign in to comment.