Skip to content

Commit

Permalink
Add support for Arizona Public Service (#95)
Browse files Browse the repository at this point in the history
* Add support for Arizona Public Service

* Moved cryptography dependency to non-optional dependencies and updated variable to lowercase

* Updated js_encrypt command to verify the key is an RSA key
  • Loading branch information
seferino-fernandez authored Sep 15, 2024
1 parent 8b2fa45 commit f8001ad
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Supported utilities (in alphabetical order):
- Kentucky Power
- Public Service Company of Oklahoma (PSO)
- Southwestern Electric Power Company (SWEPCO)
- Arizona Public Service (APS)
- City of Austin Utilities
- Consolidated Edison (ConEd)
- Orange & Rockland Utilities (ORU)
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "opower"
version = "0.7.0"
version = "0.8.0"
license = {text = "Apache-2.0"}
authors = [
{ name="tronikos", email="tronikos@gmail.com" },
Expand All @@ -12,6 +12,7 @@ dependencies = [
"aiohttp>=3.8",
"aiozoneinfo>=0.1",
"arrow>=1.2",
"cryptography>=43.0.0",
"pyotp>=2.0",
]

Expand Down
120 changes: 120 additions & 0 deletions src/opower/utilities/aps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Arizona Public Service (APS)."""

import logging
import re
from typing import Optional

import aiohttp

from ..const import USER_AGENT
from ..exceptions import CannotConnect, InvalidAuth
from .base import UtilityBase
from .helpers import async_auth_saml, js_encrypt

_LOGGER = logging.getLogger(__name__)


class Aps(UtilityBase):
"""Arizona Public Service (APS)."""

@staticmethod
def name() -> str:
"""Distinct recognizable name of the utility."""
return "Arizona Public Service (APS)"

@staticmethod
def subdomain() -> str:
"""Return the opower.com subdomain for this utility."""
return "aps"

@staticmethod
def timezone() -> str:
"""Return the timezone."""
return "America/Phoenix"

@staticmethod
async def async_login(
session: aiohttp.ClientSession,
username: str,
password: str,
optional_mfa_secret: Optional[str],
) -> None:
"""Login to the utility website."""
_LOGGER.debug("Starting login process for Arizona Public Service (APS)")
session.cookie_jar.clear(lambda cookie: cookie["domain"] == "www.aps.com")

# Get public RSA key APS uses to encrypt the password
async with session.get(
"https://www.aps.com/Assets/Js/aps-apscom.js",
headers={"User-Agent": USER_AGENT},
raise_for_status=True,
) as resp:
html = await resp.text()
rsa_key_text = extract_rsa_key(html)

# Encrypt the password
encrypted_password = js_encrypt(rsa_key_text, password)

data = {
"username": username,
"password": encrypted_password,
}

# Send the POST request
async with session.post(
"https://www.aps.com/api/sitecore/SitecoreReactApi/UserAuthentication",
json=data,
raise_for_status=True,
) as resp:
login_result = await resp.json(content_type="text/html")
if login_result["isLoginSuccess"] is False:
raise InvalidAuth("Username and password failed")

# Get All User Details to get APS Account ID and Service Address ID
async with session.get(
"https://www.aps.com/api/sitecore/sitecorereactapi/GetAllUserDetails",
raise_for_status=True,
) as resp:
user_details = await resp.json(content_type="application /json")
account_id = user_details["Details"]["AccountDetails"][
"getAccountDetailsResponse"
]["getAccountDetailsRes"]["getPersonDetails"]["accountID"]

service_address_id = user_details["Details"]["AccountDetails"][
"getAccountDetailsResponse"
]["getAccountDetailsRes"]["getSASPListByAccountID"]["premiseDetailsList"][
0
][
"sASPDetails"
][
0
][
"sAID"
]

account_service_id = f"{account_id}_{service_address_id}"

# Start SAML authentication with APS and Opower
url = f"https://www.aps.com/en/Residential/Save-Money-and-Energy/Opower?CA_SA={account_service_id}"
await async_auth_saml(session, url)


def extract_rsa_key(js_content: str) -> str:
"""Extract the RSA public key from the APS JS file, using the identifier 'APSCOMWebPasswordpublicKey'."""
pattern = r'APSCOMWebPasswordpublicKey:"(-----BEGIN PUBLIC KEY-----.*?-----END PUBLIC KEY-----)"'

# Find the RSA key associated with the regex pattern above
match = re.search(pattern, js_content, re.DOTALL)

if match:
# Get and format the RSA key
rsa_key = match.group(1)
formatted_key = re.sub(
r"(-----BEGIN PUBLIC KEY-----)(.*)(-----END PUBLIC KEY-----)",
r"\1\n\2\n\3",
rsa_key,
flags=re.DOTALL,
)
return formatted_key
else:
raise CannotConnect("The RSA public key was not found.")
20 changes: 20 additions & 0 deletions src/opower/utilities/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Helper functions."""

import base64
import re

import aiohttp
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa

from ..const import USER_AGENT

Expand Down Expand Up @@ -55,3 +58,20 @@ async def async_auth_saml(session: aiohttp.ClientSession, url: str) -> None:
raise_for_status=True,
) as resp:
pass


def js_encrypt(pub_key: str, text: str) -> str:
"""JSEncrypt-like encryption function using cryptography."""
# Load the public key
rsakey = serialization.load_pem_public_key(pub_key.encode())
# Check if the key is RSA before encrypting
if isinstance(rsakey, rsa.RSAPublicKey):
# Encrypt the text using RSA and PKCS1v15 padding
cipher_text = rsakey.encrypt(text.encode(), padding.PKCS1v15())

# Encode the encrypted text in base64
cipher_text_base64 = base64.b64encode(cipher_text)

return cipher_text_base64.decode()
else:
raise ConnectionError("Could not find public key to and encrypt password.")

0 comments on commit f8001ad

Please sign in to comment.