bars-icloud-drive/icloudpy/base.py

599 lines
21 KiB
Python

"""Library base file."""
import getpass
import http.cookiejar as cookielib
import inspect
import json
import logging
from os import mkdir, path
from re import match
from tempfile import gettempdir
from uuid import uuid1
from requests import Session
from six import PY2
from icloudpy.exceptions import (
ICloudPy2SARequiredException,
ICloudPyAPIResponseException,
ICloudPyFailedLoginException,
ICloudPyServiceNotActivatedException,
)
from icloudpy.services import (
DriveService,
WorkService
)
from icloudpy.utils import get_password_from_keyring
LOGGER = logging.getLogger(__name__)
HEADER_DATA = {
"X-Apple-ID-Account-Country": "account_country",
"X-Apple-ID-Session-Id": "session_id",
"X-Apple-Session-Token": "session_token",
"X-Apple-TwoSV-Trust-Token": "trust_token",
"scnt": "scnt",
}
class ICloudPyPasswordFilter(logging.Filter):
"""Password log hider."""
def __init__(self, password):
super().__init__(password)
def filter(self, record):
message = record.getMessage()
if self.name in message:
record.msg = message.replace(self.name, "*" * 8)
record.args = []
return True
class ICloudPySession(Session):
"""iCloud session."""
def __init__(self, service):
self.service = service
Session.__init__(self)
def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
# Charge logging to the right service endpoint
callee = inspect.stack()[2]
module = inspect.getmodule(callee[0])
request_logger = logging.getLogger(module.__name__).getChild("http")
if self.service.password_filter not in request_logger.filters:
request_logger.addFilter(self.service.password_filter)
request_logger.debug(f"{method} {url} {kwargs.get('data', '')}")
has_retried = kwargs.get("retried")
kwargs.pop("retried", None)
response = super().request(method, url, **kwargs)
content_type = response.headers.get("Content-Type", "").split(";")[0]
json_mimetypes = ["application/json", "text/json"]
for header, value in HEADER_DATA.items():
if response.headers.get(header):
session_arg = value
self.service.session_data.update(
{session_arg: response.headers.get(header)}
)
# Save session_data to file
with open(self.service.session_path, "w", encoding="utf-8") as outfile:
json.dump(self.service.session_data, outfile)
LOGGER.debug("Saved session data to file")
# Save cookies to file
if kwargs.get('save_cookie', True):
self.cookies.save(ignore_discard=True, ignore_expires=True)
LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
# print(response.text)
if not response.ok and (
content_type not in json_mimetypes
or response.status_code in [421, 450, 500]
):
try:
# pylint: disable=W0212
fmip_url = self.service._get_webservice_url("findme")
if (
has_retried is None
and response.status_code == 450
and fmip_url in url
):
# Handle re-authentication for Find My iPhone
LOGGER.debug("Re-authenticating Find My iPhone service")
try:
self.service.authenticate(True, "find")
except ICloudPyAPIResponseException:
LOGGER.debug("Re-authentication failed")
kwargs["retried"] = True
return self.request(method, url, **kwargs)
except Exception:
pass
if has_retried is None and response.status_code in [421, 450, 500]:
api_error = ICloudPyAPIResponseException(
response.reason, response.status_code, retry=True
)
request_logger.debug(api_error)
kwargs["retried"] = True
return self.request(method, url, **kwargs)
self._raise_error(response.status_code, response.reason)
if content_type not in json_mimetypes:
return response
try:
data = response.json()
except: # pylint: disable=bare-except
request_logger.warning("Failed to parse response with JSON mimetype")
return response
request_logger.debug(data)
if isinstance(data, dict):
reason = data.get("errorMessage")
reason = reason or data.get("reason")
reason = reason or data.get("errorReason")
if not reason and isinstance(data.get("error"), str):
reason = data.get("error")
if not reason and data.get("error"):
reason = "Unknown reason"
code = data.get("errorCode")
if not code and data.get("serverErrorCode"):
code = data.get("serverErrorCode")
if reason:
self._raise_error(code, reason)
return response
def _raise_error(self, code, reason):
if (
self.service.requires_2sa
and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie"
):
raise ICloudPy2SARequiredException(self.service.user["apple_id"])
if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"):
reason = (
reason + ". Please log into https://icloud.com/ to manually "
"finish setting up your iCloud service"
)
api_error = ICloudPyServiceNotActivatedException(reason, code)
LOGGER.error(api_error)
raise api_error
if code == "ACCESS_DENIED":
reason = (
reason + ". Please wait a few minutes then try again."
"The remote servers might be trying to throttle requests."
)
if code in [421, 450, 500]:
reason = "Authentication required for Account."
api_error = ICloudPyAPIResponseException(reason, code)
print(api_error)
LOGGER.error(api_error)
raise api_error
# Public method to resolve linting error
def raise_error(self, code, reason):
return self._raise_error(code=code, reason=reason)
class ICloudPyService:
"""
A base authentication class for the iCloud service. Handles the
authentication required to access iCloud services.
Usage:
from src import ICloudPyService
icloudpy = ICloudPyService('username@apple.com', 'password')
icloudpy.iphone.location()
"""
def __init__(
self,
apple_id,
password=None,
cookie_directory=None,
verify=True,
client_id=None,
with_family=True,
auth_endpoint="https://idmsa.apple.com/appleauth/auth",
# For China, use "https://www.icloud.com.cn"
home_endpoint="https://www.icloud.com",
# For China, use "https://setup.icloud.com.cn/setup/ws/1"
setup_endpoint="https://setup.icloud.com/setup/ws/1",
):
if password is None:
password = get_password_from_keyring(apple_id)
self.user = {"accountName": apple_id, "password": password}
self.data = {}
self.params = {}
self.client_id = client_id or (f"auth-{str(uuid1()).lower()}")
self.with_family = with_family
self.auth_endpoint = auth_endpoint
self.home_endpoint = home_endpoint
self.setup_endpoint = setup_endpoint
self.password_filter = ICloudPyPasswordFilter(password)
LOGGER.addFilter(self.password_filter)
if cookie_directory:
self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
if not path.exists(self._cookie_directory):
mkdir(self._cookie_directory, 0o700)
else:
topdir = path.join(gettempdir(), "icloudpy")
self._cookie_directory = path.join(topdir, getpass.getuser())
if not path.exists(topdir):
mkdir(topdir, 0o777)
if not path.exists(self._cookie_directory):
mkdir(self._cookie_directory, 0o700)
LOGGER.debug("Using session file %s", self.session_path)
self.session_data = {}
try:
with open(self.session_path, encoding="utf-8") as session_f:
self.session_data = json.load(session_f)
except: # pylint: disable=bare-except
LOGGER.info("Session file does not exist")
if self.session_data.get("client_id"):
self.client_id = self.session_data.get("client_id")
self.params["clientId"] = self.client_id
else:
self.session_data.update({"client_id": self.client_id})
self.params["clientId"] = self.client_id
self.session = ICloudPySession(self)
self.session.verify = verify
self.session.headers.update(
{"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"}
)
cookiejar_path = self.cookiejar_path
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
if path.exists(cookiejar_path):
try:
self.session.cookies.load(ignore_discard=True, ignore_expires=True)
LOGGER.debug("Read cookies from %s", cookiejar_path)
except: # pylint: disable=bare-except
# Most likely a pickled cookiejar from earlier versions.
# The cookiejar will get replaced with a valid one after
# successful authentication.
LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
self.authenticate()
self._drive = None
self._work = None
self._files = None
self._photos = None
def authenticate(self, force_refresh=False, service=None):
"""
Handles authentication, and persists cookies so that
subsequent logins will not cause additional e-mails from Apple.
"""
login_successful = False
if self.session_data.get("session_token") and not force_refresh:
LOGGER.debug("Checking session token validity")
try:
self.data = self._validate_token()
login_successful = True
except ICloudPyAPIResponseException:
LOGGER.debug("Invalid authentication token, will log in from scratch.")
if not login_successful and service is not None:
app = self.data["apps"][service]
if (
"canLaunchWithOneFactor" in app
and app["canLaunchWithOneFactor"] is True
):
LOGGER.debug(
"Authenticating as %s for %s", self.user["accountName"], service
)
try:
self._authenticate_with_credentials_service(service)
login_successful = True
except Exception as error:
LOGGER.debug(
"Could not log into service. Attempting brand new login. %s",
str(error),
)
if not login_successful:
LOGGER.debug("Authenticating as %s", self.user["accountName"])
data = dict(self.user)
data["rememberMe"] = True
data["trustTokens"] = []
if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]
headers = self._get_auth_headers()
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
self.session.post(
f"{self.auth_endpoint}/signin",
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
except ICloudPyAPIResponseException as error:
msg = "Invalid email/password combination."
raise ICloudPyFailedLoginException(msg, error) from error
self._authenticate_with_token()
self._webservices = self.data["webservices"]
LOGGER.debug("Authentication completed successfully")
def _authenticate_with_token(self):
"""Authenticate using session token."""
data = {
"accountCountryCode": self.session_data.get("account_country"),
"dsWebAuthToken": self.session_data.get("session_token"),
"extended_login": True,
"trustToken": self.session_data.get("trust_token", ""),
}
try:
req = self.session.post(
f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
)
self.data = req.json()
except ICloudPyAPIResponseException as error:
msg = "Invalid authentication token."
raise ICloudPyFailedLoginException(msg, error) from error
def _authenticate_with_credentials_service(self, service):
"""Authenticate to a specific service using credentials."""
data = {
"appName": service,
"apple_id": self.user["accountName"],
"password": self.user["password"],
}
try:
self.session.post(
f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
)
self.data = self._validate_token()
except ICloudPyAPIResponseException as error:
msg = "Invalid email/password combination."
raise ICloudPyFailedLoginException(msg, error) from error
def _validate_token(self):
"""Checks if the current access token is still valid."""
LOGGER.debug("Checking session token validity")
try:
req = self.session.post(f"{self.setup_endpoint}/validate", data="null")
LOGGER.debug("Session token is still valid")
return req.json()
except ICloudPyAPIResponseException as err:
LOGGER.debug("Invalid authentication token")
raise err
def _get_auth_headers(self, overrides=None):
headers = {
"Accept": "*/*",
"Content-Type": "application/json",
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
"X-Apple-OAuth-Require-Grant-Code": "true",
"X-Apple-OAuth-Response-Mode": "web_message",
"X-Apple-OAuth-Response-Type": "code",
"X-Apple-OAuth-State": self.client_id,
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
}
if overrides:
headers.update(overrides)
return headers
@property
def cookiejar_path(self):
"""Get path for cookiejar file."""
return path.join(
self._cookie_directory,
"".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
)
@property
def session_path(self):
"""Get path for session data file."""
return path.join(
self._cookie_directory,
"".join([c for c in self.user.get("accountName") if match(r"\w", c)])
+ ".session",
)
@property
def requires_2sa(self):
"""Returns True if two-step authentication is required."""
return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
)
@property
def requires_2fa(self):
"""Returns True if two-factor authentication is required."""
return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
)
@property
def is_trusted_session(self):
"""Returns True if the session is trusted."""
return self.data.get("hsaTrustedBrowser", False)
@property
def trusted_devices(self):
"""Returns devices trusted for two-step authentication."""
request = self.session.get(
f"{self.setup_endpoint}/listDevices", params=self.params
)
return request.json().get("devices")
def send_verification_code(self, device):
"""Requests that a verification code is sent to the given device."""
data = json.dumps(device)
request = self.session.post(
f"{self.setup_endpoint}/sendVerificationCode",
params=self.params,
data=data,
)
return request.json().get("success", False)
def validate_verification_code(self, device, code):
"""Verifies a verification code received on a trusted device."""
device.update({"verificationCode": code, "trustBrowser": True})
data = json.dumps(device)
try:
self.session.post(
f"{self.setup_endpoint}/validateVerificationCode",
params=self.params,
data=data,
)
except ICloudPyAPIResponseException as error:
if error.code == -21669:
# Wrong verification code
return False
raise
self.trust_session()
return not self.requires_2sa
def validate_2fa_code(self, code):
"""Verifies a verification code received via Apple's 2FA system (HSA2)."""
data = {"securityCode": {"code": code}}
headers = self._get_auth_headers({"Accept": "application/json"})
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
self.session.post(
f"{self.auth_endpoint}/verify/trusteddevice/securitycode",
data=json.dumps(data),
headers=headers,
)
except ICloudPyAPIResponseException as error:
if error.code == -21669:
# Wrong verification code
LOGGER.error("Code verification failed.")
return False
raise
LOGGER.debug("Code verification successful.")
self.trust_session()
return not self.requires_2sa
def trust_session(self):
"""Request session trust to avoid user log in going forward."""
headers = self._get_auth_headers()
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
self.session.get(
f"{self.auth_endpoint}/2sv/trust",
headers=headers,
)
self._authenticate_with_token()
return True
except ICloudPyAPIResponseException:
LOGGER.error("Session trust failed.")
return False
def _get_webservice_url(self, ws_key):
"""Get webservice URL, raise an exception if not exists."""
if self._webservices.get(ws_key) is None:
raise ICloudPyServiceNotActivatedException(
"Webservice not available", ws_key
)
return self._webservices[ws_key]["url"]
@property
def devices(self):
"""Returns all devices."""
service_root = self._get_webservice_url("findme")
return FindMyiPhoneServiceManager(
service_root, self.session, self.params, self.with_family
)
@property
def iphone(self):
"""Returns the iPhone."""
return self.devices[0]
@property
def drive(self):
"""Gets the 'Drive' service."""
if not self._drive:
self._drive = DriveService(
service_root=self._get_webservice_url("drivews"),
document_root=self._get_webservice_url("docws"),
session=self.session,
params=self.params,
)
return self._drive
@property
def work(self):
"""Gets the 'Work' service."""
if not self._work:
self._work = WorkService(
document_root=self._get_webservice_url("iworkexportws"),
session=self.session,
params=self.params,
client_id=self.client_id,
dsid=self.data["dsInfo"].get("dsid"),
)
return self._work
def __unicode__(self):
return f"iCloud API: {self.user.get('accountName')}"
def __str__(self):
as_unicode = self.__unicode__()
if PY2:
return as_unicode.encode("utf-8", "ignore")
return as_unicode
def __repr__(self):
return f"<{str(self)}>"