@@ -41,13 +41,19 @@ type AOF struct {
41
41
aofChan chan * AofCmd
42
42
Options * AOFOptions
43
43
listeners map [Listener ]struct {}
44
- buffer []CmdLine
44
+ buffer [][][] byte
45
45
segment * os.File
46
46
segments []* segment
47
47
48
48
reader * Reader
49
49
}
50
50
51
+ type AofCmd struct {
52
+ CmdLine [][]byte
53
+ DbIndex int
54
+ Wg * sync.WaitGroup
55
+ }
56
+
51
57
func NewAOF (aofOptions * AOFOptions ) (* AOF , error ) {
52
58
var err error
53
59
@@ -74,15 +80,14 @@ func NewAOF(aofOptions *AOFOptions) (*AOF, error) {
74
80
return nil , err
75
81
}
76
82
77
- // schedule fsync every 1 second to avoid data loss
78
83
go func () {
79
84
for {
80
85
select {
81
86
case <- aof .ctx .Done ():
82
87
return
83
88
default :
84
89
if FsyncEverySecond == aof .Options .Fsync {
85
- aof .fsync ()
90
+ aof .fsync () // fsync every second
86
91
}
87
92
aof .Retention ()
88
93
time .Sleep (time .Second * 1 )
@@ -95,7 +100,7 @@ func NewAOF(aofOptions *AOFOptions) (*AOF, error) {
95
100
}
96
101
97
102
// WriteAof writes a new entry to the Append-Only Log.
98
- func (aof * AOF ) WriteAof (p * AofCmd ) {
103
+ func (aof * AOF ) WriteAof (p * AofCmd ) error {
99
104
aof .sync .Lock ()
100
105
defer aof .sync .Unlock ()
101
106
aof .buffer = aof .buffer [:0 ] // clear the buffer
@@ -107,7 +112,7 @@ func (aof *AOF) WriteAof(p *AofCmd) {
107
112
108
113
// write to aof file in byte format
109
114
if _ , err := aof .segment .Write (bufferInBytes ); err != nil {
110
- panic ( err ) // TODO: use a logger
115
+ return err
111
116
}
112
117
113
118
if FsyncAlways == aof .Options .Fsync {
@@ -118,6 +123,8 @@ func (aof *AOF) WriteAof(p *AofCmd) {
118
123
for listener := range aof .listeners {
119
124
listener .Callback (aof .buffer )
120
125
}
126
+
127
+ return nil
121
128
}
122
129
123
130
// bufferToBytes converts the buffer to bytes and adds a file header.
0 commit comments