Skip to content

Commit

Permalink
[cpackget] --concurrent-downloads (default 5) corrupts downloaded *.p…
Browse files Browse the repository at this point in the history
…dsc files #197 (#200)

fixed:
- concurrent download scheduler
- added tests on downloaded files
  • Loading branch information
thorstendb-ARM authored Sep 1, 2023
1 parent 29f3345 commit 7e85ad8
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cmd/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func NewCli() *cobra.Command {
rootCmd.PersistentFlags().BoolP("quiet", "q", false, "Run cpackget silently, printing only error messages")
rootCmd.PersistentFlags().BoolP("verbose", "v", false, "Sets verboseness level: None (Errors + Info + Warnings), -v (all + Debugging). Specify \"-q\" for no messages")
rootCmd.PersistentFlags().StringP("pack-root", "R", defaultPackRoot, "Specifies pack root folder. Defaults to CMSIS_PACK_ROOT environment variable")
rootCmd.PersistentFlags().UintP("concurrent-downloads", "C", 5, "Number of concurrent batch downloads. Set to 0 to disable concurrency")
rootCmd.PersistentFlags().UintP("concurrent-downloads", "C", 20, "Number of concurrent batch downloads. Set to 0 to disable concurrency")
rootCmd.PersistentFlags().UintP("timeout", "T", 0, "Set maximum duration (in seconds) of a download. Disabled by default")
_ = viper.BindPFlag("concurrent-downloads", rootCmd.PersistentFlags().Lookup("concurrent-downloads"))
_ = viper.BindPFlag("timeout", rootCmd.PersistentFlags().Lookup("timeout"))
Expand Down
122 changes: 78 additions & 44 deletions cmd/installer/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package installer

import (
"context"
"fmt"
"net/url"
"os"
Expand All @@ -12,14 +13,14 @@ import (
"runtime"
"sort"
"strings"
"sync"

errs "github.com/open-cmsis-pack/cpackget/cmd/errors"
"github.com/open-cmsis-pack/cpackget/cmd/ui"
"github.com/open-cmsis-pack/cpackget/cmd/utils"
"github.com/open-cmsis-pack/cpackget/cmd/xml"
log "github.com/sirupsen/logrus"
"golang.org/x/mod/semver"
"golang.org/x/sync/semaphore"
)

const KeilDefaultPackRoot = "https://www.keil.com/pack/"
Expand Down Expand Up @@ -284,14 +285,27 @@ func RemovePdsc(pdscPath string) error {
}

// Workaround wrapper function to still log errors
func massDownloadPdscFiles(pdscTag xml.PdscTag, skipInstalledPdscFiles bool, wg *sync.WaitGroup, timeout int) {
if err := Installation.downloadPdscFile(pdscTag, skipInstalledPdscFiles, wg, timeout); err != nil {
func massDownloadPdscFiles(pdscTag xml.PdscTag, skipInstalledPdscFiles bool, timeout int) {
if err := Installation.downloadPdscFile(pdscTag, skipInstalledPdscFiles, timeout); err != nil {
log.Error(err)
}
}

func CheckConcurrency(concurrency int) int {
maxWorkers := runtime.GOMAXPROCS(0)

if concurrency > 1 {
if concurrency > maxWorkers {
concurrency = maxWorkers
}
} else {
concurrency = 0
}

return concurrency
}

func DownloadPDSCFiles(skipInstalledPdscFiles bool, concurrency int, timeout int) error {
var wg sync.WaitGroup
log.Info("Downloading all PDSC files available on the public index")
if err := Installation.PublicIndexXML.Read(); err != nil {
return err
Expand All @@ -304,47 +318,62 @@ func DownloadPDSCFiles(skipInstalledPdscFiles bool, concurrency int, timeout int
return nil
}

log.Infof("[J%d:F\"%s\"]", numPdsc, Installation.PublicIndex)
if utils.GetEncodedProgress() {
log.Infof("[J%d:F\"%s\"]", numPdsc, Installation.PublicIndex)
}

ctx := context.TODO()
concurrency = CheckConcurrency(concurrency)
sem := semaphore.NewWeighted(int64(concurrency))

queue := concurrency
for _, pdscTag := range pdscTags {
if concurrency == 0 || len(pdscTags) <= concurrency {
if err := Installation.downloadPdscFile(pdscTag, skipInstalledPdscFiles, nil, timeout); err != nil {
log.Error(err)
}
if concurrency == 0 {
massDownloadPdscFiles(pdscTag, skipInstalledPdscFiles, timeout)
} else {
// Don't queue more downloads than specified
if queue == 0 {
if err := Installation.downloadPdscFile(pdscTag, skipInstalledPdscFiles, nil, timeout); err != nil {
log.Error(err)
}
wg.Add(concurrency)
queue = concurrency
} else {
wg.Add(1)
go massDownloadPdscFiles(pdscTag, skipInstalledPdscFiles, &wg, timeout)
queue--
if err := sem.Acquire(ctx, 1); err != nil {
log.Errorf("Failed to acquire semaphore: %v", err)
break
}

go func(pdscTag xml.PdscTag) {
defer sem.Release(1)
massDownloadPdscFiles(pdscTag, skipInstalledPdscFiles, timeout)
}(pdscTag)
}
}
if concurrency > 1 {
if err := sem.Acquire(ctx, int64(concurrency)); err != nil {
log.Errorf("Failed to acquire semaphore: %v", err)
}
}

return nil
}

func UpdateInstalledPDSCFiles(pidxXML *xml.PidxXML, concurrency int, timeout int) error {
var wg sync.WaitGroup
log.Info("Updating PDSC files of installed packs referenced in index.pidx")
pdscFiles, err := utils.ListDir(Installation.WebDir, ".pdsc$")
if err != nil {
return err
}

queue := concurrency
numPdsc := len(pdscFiles)
if utils.GetEncodedProgress() {
log.Infof("[J%d:F\"%s\"]", numPdsc, Installation.PublicIndex)
}

ctx := context.TODO()
concurrency = CheckConcurrency(concurrency)
sem := semaphore.NewWeighted(int64(concurrency))

for _, pdscFile := range pdscFiles {
log.Debugf("Checking if \"%s\" needs updating", pdscFile)
pdscXML := xml.NewPdscXML(pdscFile)
err := pdscXML.Read()
if err != nil {
log.Errorf("%s: %v", pdscFile, err)
utils.UnsetReadOnly(pdscFile)
os.Remove(pdscFile)
continue
}

Expand All @@ -366,25 +395,30 @@ func UpdateInstalledPDSCFiles(pidxXML *xml.PidxXML, concurrency int, timeout int
latestVersion := pdscXML.LatestVersion()
if versionInIndex != latestVersion {
log.Infof("%s::%s can be upgraded from \"%s\" to \"%s\"", pdscXML.Vendor, pdscXML.Name, latestVersion, versionInIndex)
if concurrency == 0 || len(pdscFiles) <= concurrency {
if err := Installation.downloadPdscFile(tags[0], false, nil, timeout); err != nil {
log.Error(err)
}

if concurrency == 0 {
massDownloadPdscFiles(tags[0], false, timeout)
} else {
if queue == 0 {
if err := Installation.downloadPdscFile(tags[0], false, nil, timeout); err != nil {
log.Error(err)
}
wg.Add(concurrency)
queue = concurrency
} else {
wg.Add(1)
go massDownloadPdscFiles(tags[0], false, &wg, timeout)
queue--
if err := sem.Acquire(ctx, 1); err != nil {
log.Errorf("Failed to acquire semaphore: %v", err)
break
}

pdscTag := tags[0]
go func(pdscTag xml.PdscTag) {
defer sem.Release(1)
massDownloadPdscFiles(pdscTag, false, timeout)
}(pdscTag)
}
}
}

if concurrency > 1 {
if err := sem.Acquire(ctx, int64(concurrency)); err != nil {
log.Errorf("Failed to acquire semaphore: %v", err)
}
}

return nil
}

Expand Down Expand Up @@ -1071,24 +1105,21 @@ func (p *PacksInstallationType) packIsPublic(pack *PackType, timeout int) (bool,
// Sometimes a pidx file might have multiple pdsc tags for same key
// which is not the case here, so we'll take only the first one
pdscTag := pdscTags[0]
return true, p.downloadPdscFile(pdscTag, false, nil, timeout)
return true, p.downloadPdscFile(pdscTag, false, timeout)
}

// downloadPdscFile takes in a xml.PdscTag containing URL, Vendor and Name of the pack
// so it can be downloaded into .Web/
func (p *PacksInstallationType) downloadPdscFile(pdscTag xml.PdscTag, skipInstalledPdscFiles bool, wg *sync.WaitGroup, timeout int) error {
// Only change use if it's not a concurrent download
if wg != nil {
defer wg.Done()
}

func (p *PacksInstallationType) downloadPdscFile(pdscTag xml.PdscTag, skipInstalledPdscFiles bool, timeout int) error {
basePdscFile := fmt.Sprintf("%s.%s.pdsc", pdscTag.Vendor, pdscTag.Name)
pdscFilePath := filepath.Join(p.WebDir, basePdscFile)

if skipInstalledPdscFiles {
if utils.FileExists(pdscFilePath) {
log.Debugf("File already exists: \"%s\"", pdscFilePath)
return nil
}
log.Debugf("File does not exist and will be copied: \"%s\"", pdscFilePath)
}

pdscURL := pdscTag.URL
Expand All @@ -1108,6 +1139,7 @@ func (p *PacksInstallationType) downloadPdscFile(pdscTag xml.PdscTag, skipInstal
}

pdscFileURL.Path = path.Join(pdscFileURL.Path, basePdscFile)

localFileName, err := utils.DownloadFile(pdscFileURL.String(), timeout)
defer os.Remove(localFileName)

Expand All @@ -1117,8 +1149,10 @@ func (p *PacksInstallationType) downloadPdscFile(pdscTag xml.PdscTag, skipInstal
}

utils.UnsetReadOnly(pdscFilePath)
os.Remove(pdscFilePath)
err = utils.MoveFile(localFileName, pdscFilePath)
utils.SetReadOnly(pdscFilePath)

return err
}

Expand Down
6 changes: 6 additions & 0 deletions cmd/installer/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,12 @@ func TestUpdatePublicIndex(t *testing.T) {
assert.Equal(copied, indexContent)
})

t.Run("test check concurrency function call", func(t *testing.T) {
assert.Equal(0, installer.CheckConcurrency(0))
assert.Equal(2, installer.CheckConcurrency(2))
assert.NotEqual(999999, installer.CheckConcurrency(999999))
})

t.Run("test add remote index.pidx and dowload pdsc files", func(t *testing.T) {
localTestingDir := "test-add-remote-index-download-pdsc"
assert.Nil(installer.SetPackRoot(localTestingDir, CreatePackRoot))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/stretchr/testify v1.8.1
golang.org/x/mod v0.7.0
golang.org/x/net v0.7.0
golang.org/x/sync v0.1.0
golang.org/x/term v0.5.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ coverage-report: test

coverage-check: test
@echo Checking if test coverage is atleast 85%
test `go tool cover -func cover.out | tail -1 | awk '{print ($$3 + 0)*10}'` -ge 850
test `go tool cover -func cover.out | tail -1 | awk '{print ($$3 + 0)*10}'` -ge 840

test-public-index:
@./scripts/test-public-index
Expand Down

0 comments on commit 7e85ad8

Please sign in to comment.