Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 110 additions & 112 deletions get/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ var hashMap = map[string]crypto.Hash{
"sha512": crypto.SHA512,
}

const repomdPath = "repodata/repomd.xml"
const releasePath = "Release"

type RepoType struct {
MetadataPath string
PackagesType string
Expand Down Expand Up @@ -131,22 +128,13 @@ func NewSyncer(url url.URL, archs map[string]bool, storage Storage) *Syncer {
}

// StoreRepo stores an HTTP repo in a Storage, automatically retrying in case of recoverable errors
func (r *Syncer) StoreRepo() (err error) {
func (r *Syncer) StoreRepo() error {
checksumMap := r.readChecksumMap()
for i := 0; i < 20; i++ {
err = r.storeRepo(checksumMap)
if err == nil {
return
}

uerr, unexpectedStatusCode := err.(*UnexpectedStatusCodeError)
if unexpectedStatusCode {
sc := uerr.StatusCode
if sc == 401 || sc == 403 || sc == 404 || sc == 410 || sc == 502 || sc == 503 || sc == 504 {
log.Printf("Got %v, presumably temporarily, retrying...\n", sc)
} else {
return err
}
var err error
for i := 0; i < 20; i++ {
if err = r.storeRepo(checksumMap); err == nil {
return nil
}

_, checksumError := err.(*util.ChecksumError)
Expand All @@ -160,22 +148,39 @@ func (r *Syncer) StoreRepo() (err error) {
if signatureError {
log.Println(err.Error())
log.Println("Signature not valid, presumably the repo was published while syncing, retrying...")
} else {
return err
continue
}

if err = ignoreStatusCode(err, 403, 404, 410, 502, 503, 504); err == nil {
continue
}
return err
}

log.Println("Too many temporary errors, aborting...")
return err
}

// StoreRepo stores an HTTP repo in a Storage
func (r *Syncer) storeRepo(checksumMap map[string]XMLChecksum) (err error) {
packagesToDownload, packagesToRecycle, err := r.processMetadata(checksumMap)
func (r *Syncer) storeRepo(checksumMap map[string]XMLChecksum) error {
packagesToDownload, packagesToRecycle, err := r.processRepoMetadata(checksumMap)
if err != nil {
return
return err
}

if err := r.downloadPackages(packagesToDownload); err != nil {
return err
}

if err := r.recyclePackages(packagesToRecycle); err != nil {
return err
}

log.Println("Committing changes...")
return r.storage.Commit()
}

func (r *Syncer) downloadPackages(packagesToDownload []XMLPackage) error {
downloadCount := len(packagesToDownload)
log.Printf("Downloading %v packages...\n", downloadCount)
for i, pack := range packagesToDownload {
Expand All @@ -186,27 +191,24 @@ func (r *Syncer) storeRepo(checksumMap map[string]XMLChecksum) (err error) {
relativeURL := strings.TrimSuffix(pack.Location.Href, name) + escapedName

description := fmt.Sprintf("(%v/%v) %v", i+1, downloadCount, name)
err = r.downloadStoreApply(relativeURL, pack.Checksum.Checksum, description, hashMap[pack.Checksum.Type], util.Nop)
err := r.downloadStoreApply(relativeURL, pack.Checksum.Checksum, description, hashMap[pack.Checksum.Type], util.Nop)
if err != nil {
return err
}
}
return nil
}

func (r *Syncer) recyclePackages(packagesToRecycle []XMLPackage) error {
recycleCount := len(packagesToRecycle)
log.Printf("Recycling %v packages...\n", recycleCount)
for _, pack := range packagesToRecycle {
err = r.storage.Recycle(pack.Location.Href)
err := r.storage.Recycle(pack.Location.Href)
if err != nil {
return
return err
}
}

log.Println("Committing changes...")
err = r.storage.Commit()
if err != nil {
return
}
return
return nil
}

// downloadStoreApply downloads a repo-relative path into a file, while applying a ReaderConsumer
Expand All @@ -231,64 +233,62 @@ func (r *Syncer) downloadStoreApply(relativePath string, checksum string, descri

// processMetadata stores the repo metadata and returns a list of package file
// paths to download
func (r *Syncer) processMetadata(checksumMap map[string]XMLChecksum) (packagesToDownload []XMLPackage, packagesToRecycle []XMLPackage, err error) {
doProcessMetadata := func(reader io.ReadCloser, repoType RepoType) (err error) {
b, err := io.ReadAll(reader)
if err != nil {
return
}

err = r.checkRepomdSignature(bytes.NewReader(b), repoType)
if err != nil {
return
}

repomd, err := repoType.DecodeMetadata(bytes.NewReader(b))
if err != nil {
func (r *Syncer) processRepoMetadata(checksumMap map[string]XMLChecksum) (packagesToDownload []XMLPackage, packagesToRecycle []XMLPackage, err error) {
for name, config := range repoTypes {
log.Printf("Using repo type: %s", name)
err = r.downloadStoreApply(config.MetadataPath, "", path.Base(config.MetadataPath), 0, func(reader io.ReadCloser) error {
packagesToDownload, packagesToRecycle, err = r.processMetadata(reader, config, checksumMap)
return err
})
if err == nil {
return
}

data := repomd.Data
for i := 0; i < len(data); i++ {
log.Println(data[i].Location.Href)
metadataLocation := data[i].Location.Href
metadataChecksum := data[i].Checksum

decision := r.decide(metadataLocation, metadataChecksum, checksumMap)
switch decision {
case Download:
log.Println("...downloading")
err = r.downloadStoreApply(metadataLocation, metadataChecksum.Checksum, path.Base(metadataLocation), hashMap[metadataChecksum.Type], util.Nop)
if err != nil {
return
}
case Recycle:
log.Println("...recycling")
r.storage.Recycle(metadataLocation)
}
log.Println(err.Error())
log.Println("Fallback to next repo type")
}
return
}

if data[i].Type == repoType.PackagesType {
packagesToDownload, packagesToRecycle, err = r.processPrimary(metadataLocation, checksumMap, repoType)
}
}
func (r *Syncer) processMetadata(reader io.ReadCloser, repoType RepoType, checksumMap map[string]XMLChecksum) (packagesToDownload []XMLPackage, packagesToRecycle []XMLPackage, err error) {
b, err := io.ReadAll(reader)
if err != nil {
return
}

err = r.downloadStoreApply(repomdPath, "", path.Base(repomdPath), 0, func(reader io.ReadCloser) (err error) {
err = doProcessMetadata(reader, repoTypes["rpm"])
err = r.checkRepomdSignature(bytes.NewReader(b), repoType)
if err != nil {
return
})
}

repomd, err := repoType.DecodeMetadata(bytes.NewReader(b))
if err != nil {
log.Println(err.Error())
log.Println("Fallback to next repo type")
// attempt to download Debian's Release file
err = r.downloadStoreApply(releasePath, "", path.Base(releasePath), 0, func(reader io.ReadCloser) (err error) {
err = doProcessMetadata(reader, repoTypes["deb"])
return
})
return
}

data := repomd.Data
for i := 0; i < len(data); i++ {
log.Println(data[i].Location.Href)
metadataLocation := data[i].Location.Href
metadataChecksum := data[i].Checksum

decision := r.decide(metadataLocation, metadataChecksum, checksumMap)
switch decision {
case Download:
log.Println("...downloading")
err = r.downloadStoreApply(metadataLocation, metadataChecksum.Checksum, path.Base(metadataLocation), hashMap[metadataChecksum.Type], util.Nop)
if err != nil {
return
}
case Recycle:
log.Println("...recycling")
r.storage.Recycle(metadataLocation)
}

if data[i].Type == repoType.PackagesType {
packagesToDownload, packagesToRecycle, err = r.processPrimary(metadataLocation, checksumMap, repoType)
}
}
return
}

Expand Down Expand Up @@ -378,49 +378,47 @@ func readMetaData(reader io.Reader, compType string) (XMLMetaData, error) {
func (r *Syncer) readChecksumMap() (checksumMap map[string]XMLChecksum) {
checksumMap = make(map[string]XMLChecksum)

repoType := repoTypes["rpm"]
repomdReader, err := r.storage.NewReader(repomdPath, Permanent)
if err != nil {
if err == ErrFileNotFound {
repomdReader, err = r.storage.NewReader(releasePath, Permanent)
if err != nil {
log.Println("First-time sync started")
for _, repoType := range repoTypes {
repomdReader, err := r.storage.NewReader(repoType.MetadataPath, Permanent)
if err != nil {
if err != ErrFileNotFound {
log.Println("Error while reading previously-downloaded metadata. Starting sync from scratch")
return
}
repoType = repoTypes["deb"]
} else {
log.Println("Error while reading previously-downloaded metadata. Starting sync from scratch")
return
continue
}
}
defer repomdReader.Close()
defer repomdReader.Close()

repomd, err := repoType.DecodeMetadata(repomdReader)
if err != nil {
log.Println("Error while parsing previously-downloaded metadata. Starting sync from scratch")
return
}
repomd, err := repoType.DecodeMetadata(repomdReader)
if err != nil {
log.Println("Error while parsing previously-downloaded metadata. Starting sync from scratch")
return
}

data := repomd.Data
for i := 0; i < len(data); i++ {
dataHref := data[i].Location.Href
dataChecksum := data[i].Checksum
checksumMap[dataHref] = dataChecksum
if data[i].Type == repoType.PackagesType {
primaryReader, err := r.storage.NewReader(dataHref, Permanent)
if err != nil {
return
}
compType := strings.Trim(filepath.Ext(dataHref), ".")
primary, err := repoType.DecodePackages(primaryReader, compType)
if err != nil {
data := repomd.Data
for i := 0; i < len(data); i++ {
dataHref := data[i].Location.Href
dataChecksum := data[i].Checksum
checksumMap[dataHref] = dataChecksum
if data[i].Type == repoType.PackagesType {
primaryReader, err := r.storage.NewReader(dataHref, Permanent)
if err != nil {
return
}
compType := strings.Trim(filepath.Ext(dataHref), ".")
primary, err := repoType.DecodePackages(primaryReader, compType)
if err != nil {
return
}
for _, pack := range primary.Packages {
checksumMap[pack.Location.Href] = pack.Checksum
}
return
}
for _, pack := range primary.Packages {
checksumMap[pack.Location.Href] = pack.Checksum
}
}
}

log.Println("First-time sync started")
return
}

Expand Down