Skip to content

Commit

Permalink
Merge pull request #1042 from marticliment/msstore-testing
Browse files Browse the repository at this point in the history
Implement Microsoft Store packages
  • Loading branch information
marticliment committed May 20, 2023
2 parents 438e5e6 + 8693323 commit fa63161
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 29 deletions.
105 changes: 92 additions & 13 deletions wingetui/PackageManagers/winget.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .PackageClasses import *
from .sampleHelper import *

class WingetPackageManager(SamplePackageManager):
class WingetPackageManager(DynamicPackageManager):

if getSettings("UseSystemWinget"):
EXECUTABLE = "winget.exe"
Expand Down Expand Up @@ -91,7 +91,7 @@ def cacheAvailablePackages(self) -> None:
"""
print(f"🔵 Starting {self.NAME} package caching")
try:
p = subprocess.Popen([self.EXECUTABLE, "search", "", "--source", "winget", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True)
p = subprocess.Popen([self.EXECUTABLE, "search", "", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True)
ContentsToCache = ""
hasShownId: bool = False
idPosition: int = 0
Expand Down Expand Up @@ -162,15 +162,90 @@ def cacheAvailablePackages(self) -> None:
print(f"🟢 {self.NAME} packages cached successfuly")
except Exception as e:
report(e)

def getPackagesForQuery(self, query: str) -> list[Package]:
if getSettings("DisableMicrosoftStore"):
print("🟡 Microsoft Store source is disabled")
return []
print(f"🔵 Starting {self.NAME} search for dynamic packages (msstore source)")
try:
packages: list[Package] = []
p = subprocess.Popen([self.EXECUTABLE, "search", query, "--source", "msstore", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True)
ContentsToCache = ""
hasShownId: bool = False
idPosition: int = 0
versionPosition: int = 0
while p.poll() is None:
line: str = str(p.stdout.readline().strip(), "utf-8", errors="ignore")
print(line)
if line:
if not hasShownId:
if "Id" in line:
line = line.replace("\x08-\x08\\\x08|\x08 \r","")
for char in ("\r", "/", "|", "\\", "-"):
line = line.split(char)[-1].strip()
hasShownId = True
idPosition = len(line.split("Id")[0])
versionPosition = len(line.split("Version")[0])
elif "---" in line:
pass
else:
try:
name = line[0:idPosition].strip()
idVersionSubstr = line[idPosition:].strip()
if " " in name:
oName = name
while " " in oName:
oName = oName.replace(" ", " ")
idVersionSubstr = oName.split(" ")[-1]+idVersionSubstr
name = " ".join(oName.split(" ")[:-1])
idVersionSubstr.replace("\t", " ")
while " " in idVersionSubstr:
idVersionSubstr = idVersionSubstr.replace(" ", " ")
iOffset = 0
id = idVersionSubstr.split(" ")[iOffset]
ver = idVersionSubstr.split(" ")[iOffset+1]
if len(id) == 1:
iOffset + 1
id = idVersionSubstr.split(" ")[iOffset]
ver = idVersionSubstr.split(" ")[iOffset+1]
if ver.strip() in ("<", "-"):
iOffset += 1
ver = idVersionSubstr.split(" ")[iOffset+1]
if not " " in name:
if not name in self.BLACKLISTED_PACKAGE_NAMES and not id in self.BLACKLISTED_PACKAGE_IDS and not version in self.BLACKLISTED_PACKAGE_VERSIONS:
ContentsToCache += f"{name},{id},{ver}\n"
else:
if not name in self.BLACKLISTED_PACKAGE_NAMES and not id in self.BLACKLISTED_PACKAGE_IDS and not version in self.BLACKLISTED_PACKAGE_VERSIONS:
name = name.replace(" ", "#").replace("# ", "#").replace(" #", "#")
while "##" in name:
name = name.replace("##", "#")
print(f"🟡 package {name} failed parsing, going for method 2...")
ContentsToCache += f"{name},{id},{ver}\n"
except Exception as e:
ContentsToCache += f"{line[0:idPosition].strip()},{line[idPosition:versionPosition].strip()},{line[versionPosition:].strip()}\n"
if type(e) != IndexError:
report(e)

for line in ContentsToCache.split("\n"):
package = line.split(",")
if len(package) >= 2:
packages.append(Package(package[0], package[1], package[2], self.NAME, Winget))

print(f"🟢 {self.NAME} search for updates finished with {len(packages)} result(s) (msstore)")
return packages

except Exception as e:
report(e)

def getAvailableUpdates(self) -> list[UpgradablePackage]:
f"""
Will retieve the upgradable packages by {self.NAME} in the format of a list[UpgradablePackage] object.
"""
print(f"🔵 Starting {self.NAME} search for updates")
try:
packages: list[UpgradablePackage] = []
p = subprocess.Popen(["mode", "400,30&", self.EXECUTABLE, "upgrade", "--include-unknown", "--source", "winget", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen(["mode", "400,30&", self.EXECUTABLE, "upgrade", "--include-unknown", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
hasShownId: bool = False
idPosition: int = 0
versionPosition: int = 0
Expand Down Expand Up @@ -272,16 +347,18 @@ def getSource(id: str) -> str:
if id.count("GOG") == 1:
s = "GOG"
if s == "Winget":
if len(id.split("_")[-1]) == 13 and len(id.split("_"))==2:
if len(id.split("_")[-1]) in (13, 14) and (len(id.split("_"))==2 or id == id.upper()):
s = "Microsoft Store"
elif len(id.split("_")[-1]) <= 13 and len(id.split("_"))==2 and "…" == id.split("_")[-1][-1]: # Delect microsoft store ellipsed packages
s = "Microsoft Store"
if len(id) in (13, 14) and (id.upper() == id):
s = "Winget"
return s

print(f"🔵 Starting {self.NAME} search for installed packages")
try:
packages: list[Package] = []
p = subprocess.Popen(["mode", "400,30&", self.EXECUTABLE, "list", "--source", "winget", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen(["mode", "400,30&", self.EXECUTABLE, "list", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
hasShownId: bool = False
idPosition: int = 0
versionPosition: int = 0
Expand Down Expand Up @@ -346,7 +423,7 @@ def getPackageDetails(self, package: Package) -> PackageDetails:
"""
Will return a PackageDetails object containing the information of the given Package object
"""
print(f"🔵 Starting get info for {package.Name} on {self.NAME}")
print(f"🔵 Starting get info for {package.Id} on {self.NAME}")
if "…" in package.Id:
newId = self.getFullPackageId(package.Id)
if newId:
Expand All @@ -355,7 +432,7 @@ def getPackageDetails(self, package: Package) -> PackageDetails:
details = PackageDetails(package)
try:
details.Scopes = [_("Current user"), _("Local machine")]
details.ManifestUrl = f"https://github.com/microsoft/winget-pkgs/tree/master/manifests/{package.Id[0].lower()}/{'/'.join(package.Id.split('.'))}"
details.ManifestUrl = f"https://github.com/microsoft/winget-pkgs/tree/master/manifests/{package.Id[0].lower()}/{'/'.join(package.Id.split('.'))}" if not (len(package.Id) == 14 and package.Id == package.Id.upper()) else f"https://apps.microsoft.com/store/detail/{package.Id}"
details.Architectures = ["x64", "x86", "arm64"]
loadedInformationPieces = 0
currentIteration = 0
Expand All @@ -364,16 +441,18 @@ def getPackageDetails(self, package: Package) -> PackageDetails:
outputIsDescribing = False
outputIsShowingNotes = False
outputIsShowingTags = False
p = subprocess.Popen([self.EXECUTABLE, "show", "--id", f"{package.Id}", "--exact", "--source", "winget", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen([self.EXECUTABLE, "show", "--id", f"{package.Id}", "--exact", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
output: list[str] = []
while p.poll() is None:
line = p.stdout.readline()
if line:
if b"No package found matching input criteria." in line:
return details
output.append(str(line, encoding='utf-8', errors="ignore"))

for line in output:
if line[0] == " " and outputIsDescribing:
details.Description += "\n"+line
details.Description += "<br>"+line
else:
outputIsDescribing = False
if line[0] == " " and outputIsShowingNotes:
Expand All @@ -383,8 +462,7 @@ def getPackageDetails(self, package: Package) -> PackageDetails:
if line[0] == " " and outputIsShowingTags:
details.Tags.append(line.strip())
else:
outputIsShowingTags = False

outputIsShowingTags = False
if "Publisher:" in line:
details.Publisher = line.replace("Publisher:", "").strip()
loadedInformationPieces += 1
Expand Down Expand Up @@ -436,7 +514,7 @@ def getPackageDetails(self, package: Package) -> PackageDetails:
versions = []
while versions == [] and currentIteration < 50:
currentIteration += 1
p = subprocess.Popen([self.EXECUTABLE, "show", "--id", f"{package.Id}", "-e", "--versions", "--source", "winget", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen([self.EXECUTABLE, "show", "--id", f"{package.Id}", "-e", "--versions", "--accept-source-agreements"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
foundDashes = False
while p.poll() is None:
line = p.stdout.readline().strip()
Expand Down Expand Up @@ -495,6 +573,7 @@ def getParameters(self, options: InstallationOptions) -> list[str]:
Parameters.append("--ignore-security-hash")
if options.Version:
Parameters += ["--version", options.Version, "--force"]
Parameters += ["--accept-package-agreements"] # TODO: --disable-interactivity
return Parameters

def startInstallation(self, package: Package, options: InstallationOptions, widget: InstallationWidgetType) -> subprocess.Popen:
Expand Down Expand Up @@ -566,7 +645,7 @@ def uninstallationThread(self, p: subprocess.Popen, options: InstallationOptions
widget.finishInstallation.emit(outputCode, output)

def getFullPackageId(self, id: str) -> tuple[str, str]:
p = subprocess.Popen(["mode", "400,30&", self.EXECUTABLE, "search", "--id", id.replace("…", ""), "--source", "winget", "--accept-source-agreements"] ,stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen(["mode", "400,30&", self.EXECUTABLE, "search", "--id", id.replace("…", ""), "--accept-source-agreements"] ,stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
idSeparator = -1
print(f"🔵 Finding Id for {id}")
while p.poll() is None:
Expand Down
10 changes: 6 additions & 4 deletions wingetui/customWidgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
Scoop,
Choco,
Pip,
Npm
Npm,
]

PackagesLoadedDict: dict[PackageClasses.PackageManagerModule:bool] = {
Winget: False,
Scoop: False,
Choco: False,
Pip: False,
Npm: False
Npm: False,
}

StaticPackageManagersList: list[PackageClasses.PackageManagerModule] = [
Expand All @@ -46,12 +46,14 @@

DynaimcPackageManagersList: list[PackageClasses.DynamicPackageManager] = [
Pip,
Npm
Npm,
Winget # Microsoft Store source only
]

DynamicPackagesLoadedDict: dict[PackageClasses.PackageManagerModule:bool] = {
Pip: False,
Npm: False
Npm: False,
Winget: False # Microsoft Store source only
}

class QLinkLabel(QLabel):
Expand Down
29 changes: 17 additions & 12 deletions wingetui/uiSections.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,15 +890,15 @@ def showContextMenu(self, pos: QPoint) -> None:
if self.packageList.currentItem().isHidden():
return
ApplyMenuBlur(self.contextMenu.winId().__int__(), self.contextMenu)

try:
Capabilities: PackageManagerCapabilities = self.ItemPackageReference[self.packageList.currentItem()].PackageManager.Capabilities
self.AdminAction.setVisible(Capabilities.CanRunAsAdmin)
self.RemoveDataAction.setVisible(Capabilities.CanRemoveDataOnUninstall)
self.InteractiveAction.setVisible(Capabilities.CanRunInteractively)
except Exception as e:
report(e)

if self.ItemPackageReference[self.packageList.currentItem()].Source not in ((_("Local PC"), "Microsoft Store", "Steam", "GOG", "Ubisoft Connect", _("Android Subsystem"))):
self.IgnoreUpdatesAction.setVisible(True)
self.ShareAction.setVisible(True)
Expand All @@ -909,7 +909,7 @@ def showContextMenu(self, pos: QPoint) -> None:
self.DetailsAction.setVisible(False)

pos.setY(pos.y()+35)

self.contextMenu.exec(self.packageList.mapToGlobal(pos))

def getToolbar(self) -> QToolBar:
Expand Down Expand Up @@ -1372,11 +1372,11 @@ def changeLang():
self.language.toggleRestartButton(selectedLang != langName)
setSettingsValue("PreferredLanguage", selectedLang)

def restartElevenClockByLangChange():
def restartWingetUIByLangChange():
subprocess.run(str("start /B \"\" \""+sys.executable)+"\"", shell=True)
globals.app.quit()

self.language.restartButton.clicked.connect(restartElevenClockByLangChange)
self.language.restartButton.clicked.connect(restartWingetUIByLangChange)
self.language.combobox.currentTextChanged.connect(changeLang)

updateCheckBox = SectionCheckBox(_("Update WingetUI automatically"))
Expand Down Expand Up @@ -1466,7 +1466,7 @@ def restartElevenClockByLangChange():
report(e)

self.theme.combobox.currentTextChanged.connect(lambda v: (setSettingsValue("PreferredTheme", themes[v]), self.theme.restartButton.setVisible(True)))
self.theme.restartButton.clicked.connect(restartElevenClockByLangChange)
self.theme.restartButton.clicked.connect(restartWingetUIByLangChange)

self.startup = CollapsableSection(_("Startup options"), getMedia("launch"), _("WingetUI autostart behaviour, application launch settings"))
self.layout.addWidget(self.startup)
Expand Down Expand Up @@ -1556,6 +1556,10 @@ def resetAdminRightsCache():
disableShareApi.setChecked(getSettings("DisableApi"))
disableShareApi.stateChanged.connect(lambda v: setSettings("DisableApi", bool(v)))
self.advancedOptions.addWidget(disableShareApi)
parallelInstalls = SectionCheckBox(_("Allow parallel installs (NOT RECOMMENDED)"))
parallelInstalls.setChecked(getSettings("AllowParallelInstalls"))
parallelInstalls.stateChanged.connect(lambda v: setSettings("AllowParallelInstalls", bool(v)))
self.advancedOptions.addWidget(parallelInstalls)

enableSystemWinget = SectionCheckBox(_("Use system Winget (Needs a restart)"))
enableSystemWinget.setChecked(getSettings("UseSystemWinget"))
Expand All @@ -1581,7 +1585,7 @@ def resetWingetUIStore():
pass
setSettings("DisableScoop", sd)
setSettings("DisableWinget", wd)
restartElevenClockByLangChange()
restartWingetUIByLangChange()

resetWingetUI = SectionButton(_("Reset WingetUI and its preferences"), _("Reset"))
resetWingetUI.clicked.connect(lambda: resetWingetUIStore())
Expand All @@ -1599,11 +1603,12 @@ def resetWingetUIStore():
disableWinget.setChecked(not getSettings(f"Disable{Winget.NAME}"))
disableWinget.stateChanged.connect(lambda v: (setSettings(f"Disable{Winget.NAME}", not bool(v)), parallelInstalls.setEnabled(v), button.setEnabled(v), enableSystemWinget.setEnabled(v)))
self.wingetPreferences.addWidget(disableWinget)
disableWinget = SectionCheckBox(_("Enable Microsoft Store package source"))
disableWinget.setChecked(not getSettings(f"DisableMicrosoftStore"))
disableWinget.stateChanged.connect(lambda v: (setSettings(f"DisableMicrosoftStore", not bool(v))))
self.wingetPreferences.addWidget(disableWinget)

parallelInstalls = SectionCheckBox(_("Allow parallel installs (NOT RECOMMENDED)"))
parallelInstalls.setChecked(getSettings("AllowParallelInstalls"))
parallelInstalls.stateChanged.connect(lambda v: setSettings("AllowParallelInstalls", bool(v)))
self.wingetPreferences.addWidget(parallelInstalls)

button = SectionButton(_("Reset Winget sources (might help if no packages are listed)"), _("Reset"))
button.clicked.connect(lambda: (os.startfile(os.path.join(realpath, "resources/reset_winget_sources.cmd"))))
self.wingetPreferences.addWidget(button)
Expand Down Expand Up @@ -2257,7 +2262,7 @@ def loadPackageCommandLine(self):
parameters = " ".join(self.getCommandLineParameters())
if self.currentPackage.isManager(Winget):
if not "…" in self.currentPackage.Id:
self.commandWindow.setText(f"winget {'update' if self.isAnUpdate else ('uninstall' if self.isAnUninstall else 'install')} --id {self.currentPackage.Id} --exact {parameters} --source winget --accept-source-agreements --force ".strip().replace(" ", " ").replace(" ", " "))
self.commandWindow.setText(f"winget {'update' if self.isAnUpdate else ('uninstall' if self.isAnUninstall else 'install')} --id {self.currentPackage.Id} --exact {parameters} --accept-source-agreements --force ".strip().replace(" ", " ").replace(" ", " "))
else:
self.commandWindow.setText(_("Loading..."))
elif self.currentPackage.isManager(Scoop):
Expand Down

0 comments on commit fa63161

Please sign in to comment.