-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
9 changed files
with
284 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
"""OAuth2 authentication to access protected resources.""" | ||
from typing import Optional | ||
|
||
import json | ||
from pathlib import Path | ||
|
||
|
||
def get_hosts_file() -> Path: | ||
"""Return the path to the hosts file.""" | ||
config_dir = Path("~/.config/cookiecomposer").expanduser().resolve() | ||
config_dir.mkdir(mode=0o755, parents=True, exist_ok=True) | ||
return config_dir / "hosts.json" | ||
|
||
|
||
def login_to_svc( | ||
service: Optional[str] = None, | ||
protocol: Optional[str] = None, | ||
scopes: Optional[str] = None, | ||
token: Optional[str] = None, | ||
) -> str: | ||
""" | ||
Log in and cache token. | ||
Args: | ||
service: The name of the service to authenticate with | ||
protocol: The protocol to use for git operations | ||
scopes: Additional authentication scopes to request | ||
token: A specific token to use instead of logging in | ||
Returns: | ||
The token for the service | ||
""" | ||
import questionary | ||
|
||
hosts_file = get_hosts_file() | ||
hosts = json.loads(hosts_file.read_text()) if hosts_file.exists() else {} | ||
|
||
if not service: | ||
title = "What account do you want to log into?" | ||
options = [ | ||
"github.com", | ||
] | ||
service = questionary.select(title, options).ask() | ||
|
||
if not protocol: | ||
title = "What is your preferred protocol for Git operations?" | ||
options = ["ssh", "https"] | ||
protocol = questionary.select(title, options).ask() | ||
|
||
if not token: | ||
token = github_auth_device() if service == "github.com" else "" | ||
hosts[service] = {"git_protocol": protocol, "oauth_token": token} | ||
|
||
hosts_file.write_text(json.dumps(hosts)) | ||
|
||
return token | ||
|
||
|
||
def get_cached_token(account_name: str) -> Optional[str]: | ||
"""Return the cached token for the account.""" | ||
hosts_file = get_hosts_file() | ||
hosts = json.loads(hosts_file.read_text()) if hosts_file.exists() else {} | ||
return hosts.get(account_name, {}).get("oauth_token") | ||
|
||
|
||
def add_auth_to_url(url: str) -> str: | ||
""" | ||
Add authentication information to a URL. | ||
Args: | ||
url: The URL to add authentication information to. | ||
Returns: | ||
The URL with authentication information added, or the original URL if no token is cached | ||
""" | ||
from urllib.parse import urlparse, urlunparse | ||
|
||
parsed = urlparse(url) | ||
if parsed.netloc == "github.com": | ||
token = get_cached_token("github.com") | ||
if token: | ||
parsed = parsed._replace(netloc=f"cookiecomposer:{token}@{parsed.netloc}") | ||
return urlunparse(parsed) | ||
|
||
|
||
def github_auth_device(n_polls=9999): | ||
""" | ||
Authenticate with GitHub, polling up to ``n_polls`` times to wait for completion. | ||
""" | ||
from ghapi.auth import GhDeviceAuth | ||
|
||
auth = GhDeviceAuth(client_id="de4e3ca9028661a80b50") | ||
print(f"First copy your one-time code: \x1b[33m{auth.user_code}\x1b[m") | ||
print(f"Then visit {auth.verification_uri} in your browser, and paste the code when prompted.") | ||
input("Press Enter to open github.com in your browser...") | ||
auth.open_browser() | ||
|
||
print("Waiting for authorization...", end="") | ||
token = auth.wait(lambda: print(".", end=""), n_polls=n_polls) | ||
if not token: | ||
return print("Authentication not complete!") | ||
print("Authenticated to GitHub") | ||
return token |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
"""Authentication subcommands.""" | ||
import sys | ||
|
||
import rich_click as click | ||
|
||
from cookie_composer.authentication import get_cached_token, login_to_svc | ||
|
||
|
||
@click.group() | ||
def auth_cli(): | ||
"""Authenticate cookie-composer to a service.""" | ||
pass | ||
|
||
|
||
@auth_cli.command() | ||
@click.option( | ||
"-p", | ||
"--git-protocol", | ||
type=click.Choice(["https", "ssh"]), | ||
default="https", | ||
help="The protocol to use for git operations", | ||
) | ||
@click.option( | ||
"-h", | ||
"--service", | ||
type=str, | ||
help="The host name of the service to authenticate with", | ||
default="github.com", | ||
) | ||
@click.option("-s", "--scopes", type=str, help="Additional authentication scopes to request") | ||
@click.option( | ||
"--with-token", type=click.File("r"), is_flag=False, flag_value=sys.stdin, help="Read token from standard input" | ||
) | ||
def login(git_protocol: str, service: str, scopes: str, with_token: click.File): | ||
"""Authenticate to a service.""" | ||
w_token = with_token.read() if with_token else None | ||
if not w_token and get_cached_token(service): | ||
click.echo("Already logged in.") | ||
else: | ||
login_to_svc(service, git_protocol, scopes, w_token) | ||
|
||
|
||
# TODO: Implement logout command | ||
# @auth_cli.command() | ||
# def logout(): | ||
# """Log out of a host.""" | ||
# pass | ||
|
||
|
||
# TODO: Implement refresh command | ||
# @auth_cli.command() | ||
# def refresh(): | ||
# """Refresh stored authentication credentials.""" | ||
# pass | ||
|
||
|
||
# TODO: Implement status command | ||
# @auth_cli.command() | ||
# def status(): | ||
# """View authentication status.""" | ||
# pass | ||
|
||
|
||
@auth_cli.command() | ||
@click.option( | ||
"-h", | ||
"--service", | ||
type=str, | ||
help="The host name of the service to authenticate with", | ||
default="github.com", | ||
) | ||
def token(service: str): | ||
"""Print the auth token cookie-composer is configured to use.""" | ||
oauth_token = get_cached_token(service) | ||
if oauth_token: | ||
click.echo(oauth_token) | ||
else: | ||
raise click.UsageError(f"No cookie-composer auth token configured for {service}.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.