Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bookworm support for Hotspot using NetworkManager and create_ap #3018

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 57 additions & 148 deletions core/services/wifi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import argparse
import asyncio
import logging
import os
import stat
import sys
from pathlib import Path
from typing import Any, List, Optional

Expand All @@ -29,7 +26,9 @@
ScannedWifiNetwork,
WifiCredentials,
)
from WifiManager import WifiManager
from wifi_handlers.AbstractWifiHandler import AbstractWifiManager
from wifi_handlers.networkmanager.networkmanager import NetworkManagerWifi
from wifi_handlers.wpa_supplicant.WifiManager import WifiManager

FRONTEND_FOLDER = Path.joinpath(Path(__file__).parent.absolute(), "frontend")
SERVICE_NAME = "wifi-manager"
Expand All @@ -38,7 +37,9 @@
init_logger(SERVICE_NAME)

logger.info("Starting Wifi Manager.")
wifi_manager = WifiManager()
wpa_manager = WifiManager()
network_manager = NetworkManagerWifi()
wifi_manager: Optional[AbstractWifiManager] = None


app = FastAPI(
Expand All @@ -52,22 +53,19 @@
@app.get("/status", summary="Retrieve status of wifi manager.")
@version(1, 0)
async def network_status() -> Any:
assert wifi_manager is not None
wifi_status = await wifi_manager.status()
logger.info("Status:")
for line in tabulate(list(wifi_status.items())).splitlines():
for line in tabulate(list(vars(wifi_status).items())).splitlines():
logger.info(line)
return wifi_status


@app.get("/scan", response_model=List[ScannedWifiNetwork], summary="Retrieve available wifi networks.")
@version(1, 0)
async def scan() -> Any:
logger.info("Trying to perform network scan.")
assert wifi_manager is not None
try:
available_networks = await wifi_manager.get_wifi_available()
logger.info("Available networks:")
for line in tabulate([network.dict() for network in available_networks], headers="keys").splitlines():
logger.info(line)
return available_networks
except BusyError as error:
raise StackedHTTPException(status_code=status.HTTP_425_TOO_EARLY, error=error) from error
Expand All @@ -76,86 +74,25 @@ async def scan() -> Any:
@app.get("/saved", response_model=List[SavedWifiNetwork], summary="Retrieve saved wifi networks.")
@version(1, 0)
async def saved() -> Any:
logger.info("Trying to fetch saved networks.")
assert wifi_manager is not None
saved_networks = await wifi_manager.get_saved_wifi_network()
logger.info("Saved networks:")
for line in tabulate([network.dict() for network in saved_networks], headers="keys").splitlines():
logger.info(line)
return saved_networks


@app.post("/connect", summary="Connect to wifi network.")
@version(1, 0)
async def connect(credentials: WifiCredentials, hidden: bool = False) -> Any:
logger.info(f"Trying to connect to '{credentials.ssid}'.")

network_id: Optional[int] = None
is_new_network = False
try:
saved_networks = await wifi_manager.get_saved_wifi_network()
match_network = next(filter(lambda network: network.ssid == credentials.ssid, saved_networks))
network_id = match_network.networkid
logger.info(f"Network is already known, id={network_id}.")
except StopIteration:
logger.info("Network is not known.")
is_new_network = True

is_secure = False
try:
available_networks = await wifi_manager.get_wifi_available()
scanned_network = next(filter(lambda network: network.ssid == credentials.ssid, available_networks))
flags_for_passwords = ["WPA", "WEP", "WSN"]
for candidate in flags_for_passwords:
if candidate in scanned_network.flags:
is_secure = True
break
except StopIteration:
logger.info("Could not find wifi network around.")

if credentials.password == "" and network_id is None and is_secure:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No password received and network not found among saved ones.",
)

try:
# Update known network if password is not necessary anymore
if network_id is not None and not is_secure and credentials.password == "":
logger.info(f"Removing old entry for known network, id={network_id}.")
await wifi_manager.remove_network(network_id)
network_id = await wifi_manager.add_network(credentials, hidden)
logger.info(f"Network entry updated, id={network_id}.")

if network_id is None:
network_id = await wifi_manager.add_network(credentials, hidden)
logger.info(f"Saving new network entry, id={network_id}.")

logger.info("Performing network connection.")
if network_id is None:
raise ValueError("Missing 'network_id' for network connection.")
await wifi_manager.connect_to_network(network_id, timeout=40)
except ConnectionError as error:
if is_new_network and network_id is not None:
logger.info("Removing new network entry since connection failed.")
await wifi_manager.remove_network(network_id)
raise error
logger.info(f"Successfully connected to '{credentials.ssid}'.")
assert wifi_manager is not None
await wifi_manager.try_connect_to_network(credentials, hidden)


@app.post("/remove", summary="Remove saved wifi network.")
@version(1, 0)
async def remove(ssid: str) -> Any:
logger.info(f"Trying to remove network '{ssid}'.")
assert wifi_manager is not None
logger.info(f"Processing remove request for SSID: {ssid}")
try:
saved_networks = await wifi_manager.get_saved_wifi_network()
# Here we get all networks that match the ssid
# and get a list where the biggest networkid comes first.
# If we remove the lowest numbers first, it'll change the highest values to -1
# TODO: We should move the entire wifi framestack to work with bssid
match_networks = [network for network in saved_networks if network.ssid == ssid]
match_networks = sorted(match_networks, key=lambda network: network.networkid, reverse=True)
for match_network in match_networks:
await wifi_manager.remove_network(match_network.networkid)
await wifi_manager.remove_network(ssid)
except StopIteration as error:
logger.info(f"Network '{ssid}' is unknown.")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Network '{ssid}' not saved.") from error
Expand All @@ -165,35 +102,40 @@ async def remove(ssid: str) -> Any:
@app.get("/disconnect", summary="Disconnect from wifi network.")
@version(1, 0)
async def disconnect() -> Any:
logger.info("Trying to disconnect from current network.")
assert wifi_manager is not None
await wifi_manager.disconnect()
logger.info("Successfully disconnected from network.")


@app.get("/hotspot", summary="Get hotspot state.")
@version(1, 0)
def hotspot_state() -> Any:
return wifi_manager.hotspot.is_running()
assert wifi_manager is not None
return wifi_manager.hotspot_is_running()


@app.get("/hotspot_extended_status", summary="Get extended hotspot status.")
@version(1, 0)
def hotspot_extended_state() -> HotspotStatus:
return HotspotStatus(supported=wifi_manager.hotspot.supports_hotspot, enabled=wifi_manager.hotspot.is_running())
async def hotspot_extended_state() -> HotspotStatus:
assert wifi_manager is not None
return HotspotStatus(
supported=await wifi_manager.supports_hotspot(), enabled=await wifi_manager.hotspot_is_running()
)


@app.post("/hotspot", summary="Enable/disable hotspot.")
@version(1, 0)
def toggle_hotspot(enable: bool) -> Any:
async def toggle_hotspot(enable: bool) -> Any:
assert wifi_manager is not None
if enable:
wifi_manager.enable_hotspot()
return
wifi_manager.disable_hotspot()
return await wifi_manager.enable_hotspot()
return await wifi_manager.disable_hotspot()


@app.post("/smart_hotspot", summary="Enable/disable smart-hotspot.")
@version(1, 0)
def toggle_smart_hotspot(enable: bool) -> Any:
assert wifi_manager is not None
if enable:
wifi_manager.enable_smart_hotspot()
return
Expand All @@ -203,86 +145,53 @@ def toggle_smart_hotspot(enable: bool) -> Any:
@app.get("/smart_hotspot", summary="Check if smart-hotspot is enabled.")
@version(1, 0)
def check_smart_hotspot() -> Any:
assert wifi_manager is not None
return wifi_manager.is_smart_hotspot_enabled()


@app.post("/hotspot_credentials", summary="Update hotspot credentials.")
@version(1, 0)
def set_hotspot_credentials(credentials: WifiCredentials) -> Any:
wifi_manager.set_hotspot_credentials(credentials)
async def set_hotspot_credentials(credentials: WifiCredentials) -> Any:
assert wifi_manager is not None
await wifi_manager.set_hotspot_credentials(credentials)


@app.get("/hotspot_credentials", summary="Get hotspot credentials.")
@version(1, 0)
def get_hotspot_credentials() -> Any:
assert wifi_manager is not None
return wifi_manager.hotspot_credentials()


app = VersionedFastAPI(app, version="1.0.0", prefix_format="/v{major}.{minor}", enable_latest=True)
app.mount("/", StaticFiles(directory=str(FRONTEND_FOLDER), html=True))


if __name__ == "__main__":
if os.geteuid() != 0:
logger.error("You need root privileges to run this script.\nPlease try again using **sudo**. Exiting.")
sys.exit(1)

async def async_start() -> None:
# pylint: disable=global-statement
global wifi_manager
parser = argparse.ArgumentParser(description="Abstraction CLI for WifiManager configuration.")
parser.add_argument(
"--socket",
dest="socket_name",
type=str,
help="Name of the WPA Supplicant socket. Usually 'wlan0' or 'wlp4s0'.",
)
args = parser.parse_args()
candidates = [wpa_manager, network_manager]
for implementation in candidates:
implementation.add_arguments(parser)
# we need to configure all arguments before parsing them, hence two loops
for implementation in candidates:
implementation.configure(parser.parse_args())
async_loop = asyncio.get_event_loop()
# Running uvicorn with log disabled so loguru can handle it
config = Config(app=app, loop=async_loop, host="0.0.0.0", port=9000, log_config=None)
server = Server(config)
for implementation in candidates:
can_work = await implementation.can_work()
logger.info(f"{implementation} can work: {can_work}")
if can_work:
logger.info(f"Using {implementation} as wifi manager.")
await implementation.start()
wifi_manager = implementation
break
await server.serve()

wpa_socket_folder = "/var/run/wpa_supplicant/"
try:
if args.socket_name:
logger.info("Connecting via provided socket.")
socket_name = args.socket_name
else:
logger.info("Connecting via default socket.")

def is_socket(file_path: str) -> bool:
try:
mode = os.stat(file_path).st_mode
return stat.S_ISSOCK(mode)
except Exception as error:
logger.warning(f"Could not check if '{file_path}' is a socket: {error}")
return False

# We are going to sort and get the latest file, since this in theory will be an external interface
# added by the user
entries = os.scandir(wpa_socket_folder)
available_sockets = sorted(
[
entry.path
for entry in entries
if entry.name.startswith(("wlan", "wifi", "wlp")) and is_socket(entry.path)
]
)
if not available_sockets:
raise RuntimeError("No wifi sockets available.")
socket_name = available_sockets[-1]
logger.info(f"Going to use {socket_name} file")
WLAN_SOCKET = os.path.join(wpa_socket_folder, socket_name)
wifi_manager.connect(WLAN_SOCKET)
except Exception as socket_connection_error:
logger.warning(f"Could not connect with wifi socket. {socket_connection_error}")
logger.info("Connecting via internet wifi socket.")
try:
wifi_manager.connect(("127.0.0.1", 6664))
except Exception as udp_connection_error:
logger.error(f"Could not connect with internet socket: {udp_connection_error}. Exiting.")
sys.exit(1)

if __name__ == "__main__":
loop = asyncio.new_event_loop()

# # Running uvicorn with log disabled so loguru can handle it
config = Config(app=app, loop=loop, host="0.0.0.0", port=9000, log_config=None)
server = Server(config)

loop.create_task(wifi_manager.auto_reconnect(60))
loop.create_task(wifi_manager.start_hotspot_watchdog())
loop.run_until_complete(server.serve())
loop.run_until_complete(async_start())
16 changes: 16 additions & 0 deletions core/services/wifi/setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
#!/usr/bin/env python3

import os
import urllib.request

import setuptools


def download_script(url: str, dest: str) -> None:
urllib.request.urlretrieve(url, dest)
os.chmod(dest, 0o755)


CREATE_AP_COMMIT = "2cedd27e324ac7b9cffd1537ef0b6c9e8564e9a3"

download_script(
f"https://raw.githubusercontent.com/lakinduakash/linux-wifi-hotspot/{CREATE_AP_COMMIT}/src/scripts/create_ap",
"/usr/bin/create_ap",
)

setuptools.setup(
name="wifi_service",
version="0.1.0",
Expand Down
23 changes: 22 additions & 1 deletion core/services/wifi/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ class HotspotStatus(BaseModel):
enabled: bool


class WifiStatus(BaseModel):
bssid: Optional[str]
freq: Optional[str]
ssid: Optional[str]
id: Optional[str]
mode: Optional[str]
wifi_generation: Optional[str]
pairwise_cipher: Optional[str]
group_cipher: Optional[str]
key_mgmt: Optional[str]
wpa_state: Optional[str]
ip_address: Optional[str]
p2p_device_address: Optional[str]
address: Optional[str]
uuid: Optional[str]
ieee80211ac: Optional[str]
state: Optional[str]
disabled: Optional[str]


class ScannedWifiNetwork(BaseModel):
ssid: Optional[str]
bssid: str
Expand All @@ -20,8 +40,9 @@ class ScannedWifiNetwork(BaseModel):
class SavedWifiNetwork(BaseModel):
networkid: int
ssid: str
bssid: str
bssid: Optional[str]
flags: Optional[str]
nm_id: Optional[str]


class WifiCredentials(BaseModel):
Expand Down
Loading
Loading