From 8dc676f70ae13c166e5e9eb06fcf0f4137a5c572 Mon Sep 17 00:00:00 2001 From: MasloMaslane Date: Mon, 1 Jul 2024 13:35:22 +0200 Subject: [PATCH 1/6] Put route, bug fixes --- proxy/handlers/files.go | 13 +++++-- proxy/handlers/get_files.go | 33 +++++++++++++++++ proxy/handlers/get_list.go | 10 ++++++ storage/main.go | 70 +++++++++++++++++++++++++++++++------ storage/structs.go | 12 +++++++ 5 files changed, 126 insertions(+), 12 deletions(-) create mode 100644 proxy/handlers/get_files.go create mode 100644 proxy/handlers/get_list.go create mode 100644 storage/structs.go diff --git a/proxy/handlers/files.go b/proxy/handlers/files.go index 9a45abf..89b0ed4 100644 --- a/proxy/handlers/files.go +++ b/proxy/handlers/files.go @@ -1,14 +1,23 @@ package handlers import ( - "github.com/sio2project/ft-to-s3/v1/utils" - "net/http" + "strings" + + "github.com/sio2project/ft-to-s3/v1/utils" ) func Files(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) { if r.Method == http.MethodPut { Put(w, r, logger, bucketName) + } else if r.Method == http.MethodGet { + if strings.HasPrefix(r.URL.Path, "/files/") { + GetFiles(w, r, logger, bucketName) + } else if strings.HasPrefix(r.URL.Path, "/list/") { + GetList(w, r, logger, bucketName) + } else { + w.WriteHeader(http.StatusBadRequest) + } } else { w.WriteHeader(http.StatusMethodNotAllowed) } diff --git a/proxy/handlers/get_files.go b/proxy/handlers/get_files.go new file mode 100644 index 0000000..cb0d15a --- /dev/null +++ b/proxy/handlers/get_files.go @@ -0,0 +1,33 @@ +package handlers + +import ( + "github.com/sio2project/ft-to-s3/v1/storage" + "github.com/sio2project/ft-to-s3/v1/utils" + "io" + "net/http" + "strconv" +) + +func GetFiles(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) { + path := r.URL.Path[len("/files/"):] + result := storage.Get(bucketName, logger, path) + if result.Err != nil { + logger.Error("Error", result.Err) + w.WriteHeader(http.StatusInternalServerError) + return + } + if !result.Found { + w.WriteHeader(http.StatusNotFound) + return + } + if result.Gziped { + w.Header().Set("Content-Encoding", "gzip") + } + w.Header().Set("Logical-Size", strconv.FormatInt(result.LogicalSize, 10)) + w.Header().Set("Last-Modified", toRFC2822(result.LastModified)) + w.WriteHeader(http.StatusOK) + _, err := io.Copy(w, result.File) + if err != nil { + logger.Error("Error", err) + } +} diff --git a/proxy/handlers/get_list.go b/proxy/handlers/get_list.go new file mode 100644 index 0000000..229777a --- /dev/null +++ b/proxy/handlers/get_list.go @@ -0,0 +1,10 @@ +package handlers + +import ( + "github.com/sio2project/ft-to-s3/v1/utils" + "net/http" +) + +func GetList(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) { + //path := r.URL.Path[len("/list/"):] +} diff --git a/storage/main.go b/storage/main.go index 3fd0a63..5423c4a 100644 --- a/storage/main.go +++ b/storage/main.go @@ -3,15 +3,16 @@ package storage import ( "bytes" "context" - "io" - "github.com/minio/minio-go/v7" "github.com/sio2project/ft-to-s3/v1/db" "github.com/sio2project/ft-to-s3/v1/utils" + "io" ) func Store(bucketName string, logger *utils.LoggerObject, path string, reader io.Reader, version int64, size int64, compressed bool, sha256Digest string, logicalSize int64) (int64, error) { + logger.Debug("storage.Store called on", bucketName+":"+path) + session := db.GetSession() defer session.Close() fileMutex := db.GetMutex(session, bucketName+":"+path) @@ -25,6 +26,7 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io if version <= dbModified { return dbModified, nil } + logger.Debug("Version is greater than dbModified") oldFile, err := db.GetHashForPath(bucketName, path) if err != nil { @@ -32,26 +34,29 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io } options := minio.PutObjectOptions{} + var data bytes.Buffer if sha256Digest == "" || logicalSize == -1 { - var data []byte + var tempData []byte + teeReader := io.TeeReader(reader, &data) if compressed { - data, err = utils.ReadGzip(reader) + tempData, err = utils.ReadGzip(teeReader) if err != nil { return 0, err } } else { - data, err = io.ReadAll(reader) + tempData, err = io.ReadAll(teeReader) if err != nil { return 0, err } } - sha256Digest = utils.Sha256Checksum(data) - logicalSize = int64(len(data)) - reader = bytes.NewReader(data) + sha256Digest = utils.Sha256Checksum(tempData) + logicalSize = int64(len(tempData)) } if compressed { + logger.Debug("Setting ContentEncoding to gzip") options.ContentEncoding = "gzip" + options.ContentType = "application/gzip" } refCount, err := db.GetRefCount(bucketName, sha256Digest) @@ -60,10 +65,10 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io } if refCount == 0 { - logger.Debug("Storing with options ", options) + logger.Debug("Storing with options", options) minioClient := GetClient() - _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, reader, size, options) + _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, &data, size, options) if err != nil { return 0, err } @@ -98,3 +103,48 @@ func deleteByHash(bucketName string, logger *utils.LoggerObject, path string, lo logger.Debug("DeleteByHash called on ", path) return nil } + +func Get(bucketName string, logger *utils.LoggerObject, path string) *GetResult { + logger.Debug("storage.Get called on", bucketName+":"+path) + + fileHash, err := db.GetHashForPath(bucketName, path) + if err != nil { + return &GetResult{Err: err} + } + if fileHash == "" { + return &GetResult{Found: false} + } + + lastModified, err := db.GetModified(bucketName, path) + if err != nil { + return &GetResult{Err: err} + } + + minioClient := GetClient() + info, err := minioClient.StatObject(context.Background(), bucketName, fileHash, minio.StatObjectOptions{}) + if err != nil { + minioErr := minio.ToErrorResponse(err) + if minioErr.Code == "NoSuchKey" { + return &GetResult{Found: false} + } + return &GetResult{Err: err} + } + gziped := info.ContentType == "application/gzip" + + reader, err := minioClient.GetObject(context.Background(), bucketName, fileHash, minio.GetObjectOptions{}) + if err != nil { + minioErr := minio.ToErrorResponse(err) + if minioErr.Code == "NoSuchKey" { + return &GetResult{Found: false} + } + return &GetResult{Err: err} + } + + return &GetResult{ + Found: true, + File: reader, + Gziped: gziped, + LastModified: lastModified, + LogicalSize: info.Size, + } +} diff --git a/storage/structs.go b/storage/structs.go new file mode 100644 index 0000000..ae6017b --- /dev/null +++ b/storage/structs.go @@ -0,0 +1,12 @@ +package storage + +import "io" + +type GetResult struct { + Found bool + File io.Reader + Gziped bool + LastModified int64 + LogicalSize int64 + Err error +} From 7c93bfcae7087327f212755ed1bc7069892ef233 Mon Sep 17 00:00:00 2001 From: Mateusz Masiarz Date: Mon, 5 Aug 2024 14:13:37 +0200 Subject: [PATCH 2/6] Get list --- db/etcd.go | 12 ++++++++++++ proxy/handlers/get_list.go | 28 +++++++++++++++++++++++++++- storage/main.go | 23 +++++++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/db/etcd.go b/db/etcd.go index 0c79c81..c996fad 100644 --- a/db/etcd.go +++ b/db/etcd.go @@ -54,6 +54,18 @@ func GetModified(bucketName string, path string) (int64, error) { return strconv.ParseInt(string(resp.Kvs[0].Value), 10, 64) } +func GetKeys(prefix string) ([]string, error) { + resp, err := etcdClient.Get(etcdClient.Ctx(), prefix, clientv3.WithPrefix()) + if err != nil { + return nil, err + } + keys := make([]string, 0) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + } + return keys, nil +} + func SetModified(bucketName string, path string, modified int64) error { _, err := etcdClient.Put(etcdClient.Ctx(), GetModifiedName(bucketName, path), strconv.FormatInt(modified, 10)) return err diff --git a/proxy/handlers/get_list.go b/proxy/handlers/get_list.go index 229777a..1eab44a 100644 --- a/proxy/handlers/get_list.go +++ b/proxy/handlers/get_list.go @@ -1,10 +1,36 @@ package handlers import ( + "github.com/sio2project/ft-to-s3/v1/storage" "github.com/sio2project/ft-to-s3/v1/utils" "net/http" ) func GetList(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, bucketName string) { - //path := r.URL.Path[len("/list/"):] + path := r.URL.Path[len("/list/"):] + lastModifiedRFC := r.URL.Query().Get("last_modified") + if lastModifiedRFC == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("\"?last-modified=\" is required")) + return + } + lastModified, err := FromRFC2822(lastModifiedRFC) + if err != nil { + logger.Error("Error", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + files, err := storage.GetList(bucketName, logger, path, lastModified) + if err != nil { + logger.Error("Error", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/text") + w.WriteHeader(http.StatusOK) + for _, file := range files { + w.Write([]byte(file + "\n")) + } } diff --git a/storage/main.go b/storage/main.go index 5423c4a..e7633b9 100644 --- a/storage/main.go +++ b/storage/main.go @@ -148,3 +148,26 @@ func Get(bucketName string, logger *utils.LoggerObject, path string) *GetResult LogicalSize: info.Size, } } + +func GetList(bucketName string, logger *utils.LoggerObject, path string, last_modified int64) ([]string, error) { + logger.Debug("storage.GetList called on", bucketName+":"+path) + + prefix := db.GetModifiedName(bucketName, path) + keys, err := db.GetKeys(prefix) + if err != nil { + return nil, err + } + + files := make([]string, 0) + for _, key := range keys { + modified, err := db.GetModified(bucketName, key) + if err != nil { + return nil, err + } + if modified > last_modified { + files = append(files, key) + } + } + + return files, nil +} From d73b8a3980b7ff199fe491f04adae3244d599c3e Mon Sep 17 00:00:00 2001 From: Mateusz Masiarz Date: Mon, 5 Aug 2024 14:15:47 +0200 Subject: [PATCH 3/6] Install docker compose in workflow --- .github/workflows/test.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2f6f857..9b6b782 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,6 +18,12 @@ jobs: with: go-version: '1.22' + - name: Set up Docker Compose + run: | + sudo apt-get update + sudo apt-get install -y docker.io + sudo apt-get install -y docker-compose + - name: Start containers run: docker-compose -f docker-compose-github.yml up -d From 12c645030b2a019e724734d3ec7df0224ee90cad Mon Sep 17 00:00:00 2001 From: Mateusz Masiarz Date: Mon, 5 Aug 2024 14:22:09 +0200 Subject: [PATCH 4/6] Change to action for docker compose setup --- .github/workflows/test.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9b6b782..d9f0c7d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -19,10 +19,9 @@ jobs: go-version: '1.22' - name: Set up Docker Compose - run: | - sudo apt-get update - sudo apt-get install -y docker.io - sudo apt-get install -y docker-compose + uses: KengoTODA/actions-setup-docker-compose@v1 + with: + version: 'latest' - name: Start containers run: docker-compose -f docker-compose-github.yml up -d From dad5041d2ead8a111356771789fbf294baa1c5e6 Mon Sep 17 00:00:00 2001 From: Mateusz Masiarz Date: Mon, 5 Aug 2024 14:26:09 +0200 Subject: [PATCH 5/6] Change action --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d9f0c7d..b22a6b0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -19,7 +19,7 @@ jobs: go-version: '1.22' - name: Set up Docker Compose - uses: KengoTODA/actions-setup-docker-compose@v1 + uses: ndeloof/install-compose-action@v0.0.1 with: version: 'latest' From e080cfb48ef71094c414c4ee5090449be9bde747 Mon Sep 17 00:00:00 2001 From: Mateusz Masiarz Date: Mon, 5 Aug 2024 14:45:10 +0200 Subject: [PATCH 6/6] Better error logging, fix tests --- proxy/handlers/put.go | 2 +- storage/main.go | 32 +++++++++++++++++++++----------- utils/error.go | 14 ++++++++++++++ 3 files changed, 36 insertions(+), 12 deletions(-) create mode 100644 utils/error.go diff --git a/proxy/handlers/put.go b/proxy/handlers/put.go index 5961ca0..2e26cb1 100644 --- a/proxy/handlers/put.go +++ b/proxy/handlers/put.go @@ -51,7 +51,7 @@ func Put(w http.ResponseWriter, r *http.Request, logger *utils.LoggerObject, buc compressed, digest, logicalSize) if err != nil { - logger.Error("Error", err) + logger.Error("Error while storing the file:", err) w.WriteHeader(http.StatusInternalServerError) return } diff --git a/storage/main.go b/storage/main.go index e7633b9..32515a6 100644 --- a/storage/main.go +++ b/storage/main.go @@ -21,7 +21,7 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io dbModified, err := db.GetModified(bucketName, path) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while getting modified time", err) } if version <= dbModified { return dbModified, nil @@ -30,28 +30,34 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io oldFile, err := db.GetHashForPath(bucketName, path) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while getting hash for path", err) } options := minio.PutObjectOptions{} var data bytes.Buffer + var headersCalculated bool if sha256Digest == "" || logicalSize == -1 { + logger.Debug("Calculating sha256Digest and logicalSize") + headersCalculated = true var tempData []byte teeReader := io.TeeReader(reader, &data) if compressed { + logger.Debug("File is compressed, reading gzip") tempData, err = utils.ReadGzip(teeReader) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while reading gzip", err) } } else { + logger.Debug("File is not compressed, reading data") tempData, err = io.ReadAll(teeReader) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while reading data", err) } } sha256Digest = utils.Sha256Checksum(tempData) logicalSize = int64(len(tempData)) + logger.Debug("Calculated sha256Digest and logicalSize") } if compressed { logger.Debug("Setting ContentEncoding to gzip") @@ -61,39 +67,43 @@ func Store(bucketName string, logger *utils.LoggerObject, path string, reader io refCount, err := db.GetRefCount(bucketName, sha256Digest) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while getting refCount", err) } if refCount == 0 { logger.Debug("Storing with options", options) minioClient := GetClient() - _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, &data, size, options) + if headersCalculated { + _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, &data, size, options) + } else { + _, err = minioClient.PutObject(context.Background(), bucketName, sha256Digest, reader, size, options) + } if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while putting object", err) } } logger.Info("Putting refFile") err = db.SetHashForPath(bucketName, path, sha256Digest) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while setting hash for path", err) } logger.Info("Putting refCount") err = db.SetRefCount(bucketName, sha256Digest, refCount+1) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while setting refCount", err) } err = db.SetModified(bucketName, path, version) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while setting modified", err) } err = deleteByHash(bucketName, logger, oldFile, false) if err != nil { - return 0, err + return 0, utils.ErrorWrapper("Error while deleting old file", err) } return version, nil diff --git a/utils/error.go b/utils/error.go new file mode 100644 index 0000000..00c5aff --- /dev/null +++ b/utils/error.go @@ -0,0 +1,14 @@ +package utils + +type Error struct { + Message string + Err error +} + +func (e *Error) Error() string { + return e.Message + ": " + e.Err.Error() +} + +func ErrorWrapper(msg string, err error) *Error { + return &Error{msg, err} +}