Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
complete rewrite
Browse files Browse the repository at this point in the history
Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
Krasi Georgiev committed Jul 5, 2018
1 parent 66b0380 commit f7f2578
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 475 deletions.
9 changes: 6 additions & 3 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,13 @@ const (
flagStd = 1
)

const indexFilename = "index"
const metaFilename = "meta.json"
const (
indexFilename = "index"
metaFilename = "meta.json"
chunksDirname = "chunks"
)

func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func chunkDir(dir string) string { return filepath.Join(dir, chunksDirname) }
func walDir(dir string) string { return filepath.Join(dir, "wal") }

func readMetaFile(dir string) (*BlockMeta, error) {
Expand Down
231 changes: 136 additions & 95 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,131 +74,172 @@ func main() {
printBlocks(db.Blocks(), listCmdHumanReadable)

case scanCmd.FullCommand():
var didSomething bool
var tmpFiles []string
filepath.Walk(*scanPath, func(path string, f os.FileInfo, _ error) error {
if filepath.Ext(path) == ".tmp" {
tmpFiles = append(tmpFiles, path)
}

return nil
})
if len(tmpFiles) > 0 {
didSomething = true
fmt.Println(`
These are usually caused by a crash or incomplete compaction and
are safe to delete as long as you are sure that no other application is currently using this database.`)
promptDelete(tmpFiles, scanCmdHumanReadable)
}
scanTmps(*scanPath, scanCmdHumanReadable)

scan, err := tsdb.NewDBScanner(*scanPath, logger)

unreadableBlocks, err := scan.Unreadable()
if err != nil {
exitWithError(err)
}
if len(unreadableBlocks) > 0 {
didSomething = true
fmt.Println("Unreadable blocks!")
fmt.Println("Deleting these will remove all data in the listed time range.")
promptDelete(unreadableBlocks, scanCmdHumanReadable)
}
scanTmbst(scan, scanCmdHumanReadable)
scanIndexes(scan, scanCmdHumanReadable)
scanOverlapping(scan, scanCmdHumanReadable)

repaired, err := scan.RepairIndex()
if err != nil {
exitWithError(err)
fmt.Println("Scan complete!")
fmt.Println("Hooray! The db is clean(or the scan tool is broken):\U0001f638")
}
flag.CommandLine.Set("log.level", "debug")
}

func scanOverlapping(scan tsdb.Scanner, hformat *bool) {
overlaps, err := scan.Overlapping()
if err != nil {
exitWithError(err)
}
if len(overlaps) > 0 {
fmt.Println("Overlaping blocks.")
fmt.Println("Deleting these will remove all data in the listed time range.")
var blocksDel []*tsdb.Block
for t, overBcks := range overlaps {
fmt.Printf("overlapping blocks : %v-%v \n", time.Unix(t.Min/1000, 0).Format("06/01/02 15:04"), time.Unix(t.Max/1000, 0).Format("15:04 06/01/02"))

var largest int
for i, b := range overBcks {
if b.Meta().Stats.NumSamples > overBcks[largest].Meta().Stats.NumSamples {
largest = i
}
}
// Don't delete the largest block in the overlaps.
blocksDel = append(overBcks[:largest], overBcks[largest+1:]...)
fmt.Printf("\nBlock %v contains highest samples count and is ommited from the deletion list! \n\n", overBcks[largest])
}
if len(repaired) > 0 {
didSomething = true
fmt.Println("Corrupted indexes that were repaired.")
for path, stats := range repaired {
fmt.Printf("path:%v stats:%+v \n", path, stats)

var paths []string
for _, b := range blocksDel {
_, folder := path.Split(b.Dir())
if _, err := ulid.Parse(folder); err != nil {
fmt.Printf("\nskipping invalid block dir: %v :%v \n\n", b.Dir(), err)
continue
}
paths = append(paths, b.Dir())
}
overlappingBlocks, err := scan.Overlapping()
if err != nil {
exitWithError(err)
printBlocks(blocksDel, hformat)
if prompt() {
if err = dellAll(paths); err != nil {
exitWithError(errors.Wrap(err, "deleting overlapping blocks"))
}
}
if len(overlappingBlocks) > 0 {
didSomething = true
fmt.Println("Overlaping blocks.")
fmt.Println("Deleting these will remove all data in the listed time range.")
for t, blocks := range overlappingBlocks {
fmt.Printf("overlapping blocks : %v-%v \n", time.Unix(t.Min/1000, 0).Format("06/01/02 15:04"), time.Unix(t.Max/1000, 0).Format("15:04 06/01/02"))

var biggestIndex int
biggest := &tsdb.Block{}
for i, b := range blocks {
if b.Meta().Stats.NumSamples > biggest.Meta().Stats.NumSamples {
biggest = b
biggestIndex = i
}
}
// Don't delete the bigest block in the overlaps.
blocks = append(blocks[:biggestIndex], blocks[biggestIndex+1:]...)
fmt.Printf("\nBlock %v contains most samples and is ommited from the deletion list! \n\n", biggest)
}
}

promptDelete(blocks, scanCmdHumanReadable)
}
func scanIndexes(scan tsdb.Scanner, hformat *bool) {
unrepairable, repaired, err := scan.Indexes()
if err != nil {
exitWithError(err)
}

if len(repaired) > 0 {
fmt.Println("Corrupted indexes that were repaired.")
for _, stats := range repaired {
fmt.Printf("path:%v stats:%+v \n", stats.BlockDir, stats)
}
}

fmt.Println("Scan complete!")
if !didSomething {
fmt.Println("Hooray! The db is clean(or the scan tool is broken):\U0001f638")
return
if len(unrepairable) > 0 {
for cause, bdirs := range unrepairable {
fmt.Println("Blocks with unrepairable indexes:", cause)
printFiles(bdirs, hformat)
if prompt() {
if err = dellAll(bdirs); err != nil {
exitWithError(errors.Wrap(err, "deleting blocks with invalid indexes"))
}
}
}
}
flag.CommandLine.Set("log.level", "debug")
}

func promptDelete(i interface{}, humanReadable *bool) {
var paths []string
switch i.(type) {
case []*tsdb.Block:
blocks := i.([]*tsdb.Block)
for _, b := range blocks {
_, folder := path.Split(b.Dir())
if _, err := ulid.Parse(folder); err != nil {
exitWithError(fmt.Errorf("dir doesn't contain a valid ULID:%v", err))
func scanTmbst(scan tsdb.Scanner, hformat *bool) {
invalid, err := scan.Tombstones()
if err != nil {
exitWithError(errors.Wrap(err, "scannings Tombstones"))
}

if len(invalid) > 0 {
fmt.Println("Tombstones include data to be deleted so removing these will cancel deleting these timeseries.")
for cause, files := range invalid {
for _, p := range files {
_, file := filepath.Split(p)
if file != "tombstone" {
exitWithError(fmt.Errorf("path doesn't contain a valid tombstone filename: %v", p))
}
}
fmt.Println("invalid tombstones:", cause)
printFiles(files, hformat)
if prompt() {
if err = dellAll(files); err != nil {
exitWithError(errors.Wrap(err, "deleting Tombstones"))
}
}
paths = append(paths, b.Dir())
}
printBlocks(blocks, humanReadable)
case []string:
paths = i.([]string)
}
}

for _, p := range paths {
func scanTmps(scanPath string, hformat *bool) {
var files []string
filepath.Walk(scanPath, func(path string, f os.FileInfo, _ error) error {
if filepath.Ext(path) == ".tmp" {
files = append(files, path)
}
return nil
})
if len(files) > 0 {
fmt.Println(`
These are usually caused by a crash or incomplete compaction and
are safe to delete as long as you are sure that no other application is currently using this database.`)
for _, p := range files {
if filepath.Ext(p) != ".tmp" {
exitWithError(fmt.Errorf("path doesn't contain a valid tmp extension: %v", p))
}
}
printTmps(paths, humanReadable)
printFiles(files, hformat)
if prompt() {
if err := dellAll(files); err != nil {
exitWithError(errors.Wrap(err, "deleting Tombstones"))
}
}
}
}

fmt.Printf("DELETE (y/N)? ")
var s string
_, err := fmt.Scanln(&s)
if err != nil {
exitWithError(err)
func dellAll(paths []string) error {
for _, p := range paths {
if err := os.RemoveAll(p); err != nil {
return fmt.Errorf("error deleting: %v, %v", p, err)
}
}
return nil
}

func prompt() bool {
for x := 0; x < 3; x++ {
fmt.Println("DELETE (y/N)?")
var s string
_, err := fmt.Scanln(&s)
if err != nil {
exitWithError(err)
}

s = strings.TrimSpace(s)
s = strings.ToLower(s)
s = strings.TrimSpace(s)
s = strings.ToLower(s)

if s == "y" || s == "yes" {
for _, p := range paths {
if err := os.RemoveAll(p); err != nil {
fmt.Printf("error deleting: %v, %v", p, err)
}
if s == "y" || s == "yes" {
return true
}
return
}
if s == "n" || s == "no" {
return
if s == "n" || s == "no" {
return false
}
fmt.Println(s, "is not a valid answer")
}
fmt.Printf("%v is not a valid answer \n", s)
promptDelete(i, humanReadable)
fmt.Printf("Bailing out, too many invalid answers! \n\n")
return false
}

type writeBenchmark struct {
Expand Down Expand Up @@ -479,12 +520,12 @@ func exitWithError(err error) {
os.Exit(1)
}

func printTmps(tmps []string, humanReadable *bool) {
func printFiles(files []string, humanReadable *bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush()

fmt.Fprintln(tw, "PATH\tSIZE\tDATE\t")
for _, path := range tmps {
for _, path := range files {
f, e := os.Stat(path)
if e != nil {
exitWithError(e)
Expand Down
5 changes: 3 additions & 2 deletions repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"os"
"path/filepath"
"reflect"
"testing"

Expand Down Expand Up @@ -53,9 +54,9 @@ func TestRepairBadIndexVersion(t *testing.T) {
t.Fatal("error expected but got none")
}
// Touch chunks dir in block.
os.MkdirAll(dir+"chunks", 0777)
os.MkdirAll(chunkDir(dir), 0777)

r, err := index.NewFileReader(dir + "index")
r, err := index.NewFileReader(filepath.Join(dir, indexFilename))
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit f7f2578

Please sign in to comment.