Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Duke committed Sep 24, 2016
1 parent 7d7be17 commit 932fc93
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 349 deletions.
16 changes: 0 additions & 16 deletions setup/delete_episode_worker.go

This file was deleted.

33 changes: 0 additions & 33 deletions setup/duplicates_worker.go

This file was deleted.

30 changes: 0 additions & 30 deletions setup/language.go

This file was deleted.

7 changes: 1 addition & 6 deletions setup/orphan_channels.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package setup

import (
"github.com/jrallison/go-workers"
"github.com/uhuraapp/uhura-api/database"
"github.com/uhuraapp/uhura-api/models"
"github.com/jrallison/go-workers"
)

func orphanChannel(message *workers.Msg) {
Expand All @@ -20,11 +20,6 @@ func orphanChannel(message *workers.Msg) {
var users []models.Subscription
p.Table(models.Subscription{}.TableName()).Where("channel_id = ?", channel.Id).Find(&users)
if len(users) < 1 {
var episodes []models.Episode
p.Table(models.Episode{}.TableName()).Where("channel_id = ?", channel.Id).Find(&episodes)
for _, e := range episodes {
workers.Enqueue("delete-episode", "deleteEpisode", e.Id)
}
p.Table(models.Channel{}.TableName()).Where("id = ?", channel.Id).Delete(models.Channel{})
}
}
Expand Down
47 changes: 0 additions & 47 deletions setup/recommendation_worker.go

This file was deleted.

32 changes: 9 additions & 23 deletions setup/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package setup
import (
"time"

"github.com/uhuraapp/uhura-api/database"
runner "github.com/uhuraapp/uhura-worker/sync"
"github.com/jinzhu/gorm"
"github.com/jrallison/go-workers"
"github.com/uhuraapp/uhura-api/database"
runner "github.com/uhuraapp/uhura-worker/sync"
)

func sync(message *workers.Msg) {
Expand All @@ -15,23 +15,8 @@ func sync(message *workers.Msg) {
id, err := message.Args().Int64()
checkError(err)

syncer(id, true)
}

func syncLow(message *workers.Msg) {
defer reporter(message)

id, err := message.Args().Int64()
checkError(err)

syncer(id, false)

workers.EnqueueAt("sync-low", "syncLow", time.Now().Add(1*time.Hour), id)
}

func syncer(id int64, scheduleNext bool) {
p := database.NewPostgresql()
defer func(p gorm.DB) {
defer func(p *gorm.DB) {
if r := recover(); r != nil {
p.Close()
}
Expand All @@ -40,10 +25,11 @@ func syncer(id int64, scheduleNext bool) {
_, model := runner.Sync(id, p)
p.Close()

workers.Enqueue("duplicate-episodes", "duplicateEpisodes", nil)
if scheduleNext {
nextRunAt, err := runner.GetNextRun(model)
checkError(err)
workers.EnqueueAt("sync", "sync", nextRunAt, id)
nextRunAt, err := runner.GetNextRun(model)

if err != nil {
nextRunAt = time.Now().Add(5 * time.Hour)
}

workers.EnqueueAt("sync", "sync", nextRunAt, id)
}
71 changes: 29 additions & 42 deletions setup/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,13 @@ import (

"gopkg.in/redis.v3"

"github.com/uhuraapp/uhura-api/database"
"github.com/uhuraapp/uhura-api/models"
"github.com/jrallison/go-workers"
"github.com/stvp/rollbar"
"github.com/uhuraapp/uhura-api/database"
"github.com/uhuraapp/uhura-api/models"
)

func Worker(_redisURL string, runner bool) {
pool := "1"
if runner {
pool = "15"
}
func Worker(_redisURL string) {
redisURL, err := url.Parse(_redisURL)

if err != nil {
Expand All @@ -37,54 +33,45 @@ func Worker(_redisURL string, runner bool) {
"server": redisURL.Host,
"password": password,
"database": "0",
"pool": pool,
"pool": "15",
"process": "1",
})

if runner {
client := redis.NewClient(&redis.Options{
Addr: redisURL.Host,
Password: password, // no password set
DB: 0, // use default DB
})
client := redis.NewClient(&redis.Options{
Addr: redisURL.Host,
Password: password, // no password set
DB: 0, // use default DB
})

pong, err := client.Ping().Result()
fmt.Println(pong, err)
pong, err := client.Ping().Result()
fmt.Println(pong, err)

client.FlushDb()
client.Close()
client.FlushDb()
client.Close()

workers.Process("duplicate-episodes", duplicateEpisodes, 1)
workers.Process("delete-episode", deleteEpisode, 2)
workers.Process("sync-low", syncLow, 7)
workers.Process("sync", sync, 7)
workers.Process("language", language, 1)
// workers.Process("orphan-channel", orphanChannel(p), 2)
workers.Process("recommendations", recommendations, 1)
workers.Process("sync", sync, 7)

port, _ := strconv.Atoi(os.Getenv("PORT"))
port, _ := strconv.Atoi(os.Getenv("PORT"))

go workers.StatsServer(port)
go workers.StatsServer(port)

go func() {
workers.Enqueue("orphan-channel", "orphanChannel", 0)
go func() {
workers.Enqueue("orphan-channel", "orphanChannel", 0)

var c []int64
var c []int64

p := database.NewPostgresql()
p.Table(models.Channel{}.TableName()).Pluck("id", &c)
for _, id := range c {
workers.Enqueue("sync", "sync", id)
workers.Enqueue("language", "language", id)
}
p := database.NewPostgresql()
p.Table(models.Channel{}.TableName()).Pluck("id", &c)
for _, id := range c {
workers.Enqueue("sync", "sync", id)
}

p.Close()
}()
p.Close()
}()

workers.Enqueue("duplicate-episodes", "duplicateEpisodes", nil)
workers.Enqueue("recommendations", "recommendations", nil)
workers.Run()
}
workers.Enqueue("duplicate-episodes", "duplicateEpisodes", nil)
workers.Enqueue("recommendations", "recommendations", nil)
workers.Run()
}

func reporter(message *workers.Msg) {
Expand Down
Loading

0 comments on commit 932fc93

Please sign in to comment.