599 lines
21 KiB
Python
599 lines
21 KiB
Python
"""Library base file."""
|
|
import getpass
|
|
import http.cookiejar as cookielib
|
|
import inspect
|
|
import json
|
|
import logging
|
|
from os import mkdir, path
|
|
from re import match
|
|
from tempfile import gettempdir
|
|
from uuid import uuid1
|
|
|
|
from requests import Session
|
|
from six import PY2
|
|
|
|
from icloudpy.exceptions import (
|
|
ICloudPy2SARequiredException,
|
|
ICloudPyAPIResponseException,
|
|
ICloudPyFailedLoginException,
|
|
ICloudPyServiceNotActivatedException,
|
|
)
|
|
from icloudpy.services import (
|
|
DriveService,
|
|
WorkService
|
|
)
|
|
from icloudpy.utils import get_password_from_keyring
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
HEADER_DATA = {
|
|
"X-Apple-ID-Account-Country": "account_country",
|
|
"X-Apple-ID-Session-Id": "session_id",
|
|
"X-Apple-Session-Token": "session_token",
|
|
"X-Apple-TwoSV-Trust-Token": "trust_token",
|
|
"scnt": "scnt",
|
|
}
|
|
|
|
|
|
class ICloudPyPasswordFilter(logging.Filter):
|
|
"""Password log hider."""
|
|
|
|
def __init__(self, password):
|
|
super().__init__(password)
|
|
|
|
def filter(self, record):
|
|
message = record.getMessage()
|
|
if self.name in message:
|
|
record.msg = message.replace(self.name, "*" * 8)
|
|
record.args = []
|
|
|
|
return True
|
|
|
|
|
|
class ICloudPySession(Session):
|
|
"""iCloud session."""
|
|
|
|
def __init__(self, service):
|
|
self.service = service
|
|
Session.__init__(self)
|
|
|
|
def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
|
|
|
|
# Charge logging to the right service endpoint
|
|
callee = inspect.stack()[2]
|
|
module = inspect.getmodule(callee[0])
|
|
request_logger = logging.getLogger(module.__name__).getChild("http")
|
|
if self.service.password_filter not in request_logger.filters:
|
|
request_logger.addFilter(self.service.password_filter)
|
|
|
|
request_logger.debug(f"{method} {url} {kwargs.get('data', '')}")
|
|
|
|
has_retried = kwargs.get("retried")
|
|
kwargs.pop("retried", None)
|
|
response = super().request(method, url, **kwargs)
|
|
|
|
content_type = response.headers.get("Content-Type", "").split(";")[0]
|
|
json_mimetypes = ["application/json", "text/json"]
|
|
|
|
for header, value in HEADER_DATA.items():
|
|
if response.headers.get(header):
|
|
session_arg = value
|
|
self.service.session_data.update(
|
|
{session_arg: response.headers.get(header)}
|
|
)
|
|
|
|
# Save session_data to file
|
|
with open(self.service.session_path, "w", encoding="utf-8") as outfile:
|
|
json.dump(self.service.session_data, outfile)
|
|
LOGGER.debug("Saved session data to file")
|
|
|
|
# Save cookies to file
|
|
if kwargs.get('save_cookie', True):
|
|
self.cookies.save(ignore_discard=True, ignore_expires=True)
|
|
LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
|
|
# print(response.text)
|
|
if not response.ok and (
|
|
content_type not in json_mimetypes
|
|
or response.status_code in [421, 450, 500]
|
|
):
|
|
try:
|
|
# pylint: disable=W0212
|
|
fmip_url = self.service._get_webservice_url("findme")
|
|
if (
|
|
has_retried is None
|
|
and response.status_code == 450
|
|
and fmip_url in url
|
|
):
|
|
# Handle re-authentication for Find My iPhone
|
|
LOGGER.debug("Re-authenticating Find My iPhone service")
|
|
try:
|
|
self.service.authenticate(True, "find")
|
|
except ICloudPyAPIResponseException:
|
|
LOGGER.debug("Re-authentication failed")
|
|
kwargs["retried"] = True
|
|
return self.request(method, url, **kwargs)
|
|
except Exception:
|
|
pass
|
|
|
|
if has_retried is None and response.status_code in [421, 450, 500]:
|
|
api_error = ICloudPyAPIResponseException(
|
|
response.reason, response.status_code, retry=True
|
|
)
|
|
request_logger.debug(api_error)
|
|
kwargs["retried"] = True
|
|
return self.request(method, url, **kwargs)
|
|
|
|
self._raise_error(response.status_code, response.reason)
|
|
|
|
if content_type not in json_mimetypes:
|
|
return response
|
|
|
|
try:
|
|
data = response.json()
|
|
except: # pylint: disable=bare-except
|
|
request_logger.warning("Failed to parse response with JSON mimetype")
|
|
return response
|
|
|
|
request_logger.debug(data)
|
|
|
|
if isinstance(data, dict):
|
|
reason = data.get("errorMessage")
|
|
reason = reason or data.get("reason")
|
|
reason = reason or data.get("errorReason")
|
|
if not reason and isinstance(data.get("error"), str):
|
|
reason = data.get("error")
|
|
if not reason and data.get("error"):
|
|
reason = "Unknown reason"
|
|
|
|
code = data.get("errorCode")
|
|
if not code and data.get("serverErrorCode"):
|
|
code = data.get("serverErrorCode")
|
|
|
|
if reason:
|
|
self._raise_error(code, reason)
|
|
|
|
return response
|
|
|
|
def _raise_error(self, code, reason):
|
|
if (
|
|
self.service.requires_2sa
|
|
and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie"
|
|
):
|
|
raise ICloudPy2SARequiredException(self.service.user["apple_id"])
|
|
if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"):
|
|
reason = (
|
|
reason + ". Please log into https://icloud.com/ to manually "
|
|
"finish setting up your iCloud service"
|
|
)
|
|
api_error = ICloudPyServiceNotActivatedException(reason, code)
|
|
LOGGER.error(api_error)
|
|
|
|
raise api_error
|
|
if code == "ACCESS_DENIED":
|
|
reason = (
|
|
reason + ". Please wait a few minutes then try again."
|
|
"The remote servers might be trying to throttle requests."
|
|
)
|
|
if code in [421, 450, 500]:
|
|
reason = "Authentication required for Account."
|
|
|
|
|
|
api_error = ICloudPyAPIResponseException(reason, code)
|
|
print(api_error)
|
|
LOGGER.error(api_error)
|
|
raise api_error
|
|
|
|
# Public method to resolve linting error
|
|
def raise_error(self, code, reason):
|
|
return self._raise_error(code=code, reason=reason)
|
|
|
|
|
|
class ICloudPyService:
|
|
"""
|
|
A base authentication class for the iCloud service. Handles the
|
|
authentication required to access iCloud services.
|
|
|
|
Usage:
|
|
from src import ICloudPyService
|
|
icloudpy = ICloudPyService('username@apple.com', 'password')
|
|
icloudpy.iphone.location()
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
apple_id,
|
|
password=None,
|
|
cookie_directory=None,
|
|
verify=True,
|
|
client_id=None,
|
|
with_family=True,
|
|
auth_endpoint="https://idmsa.apple.com/appleauth/auth",
|
|
# For China, use "https://www.icloud.com.cn"
|
|
home_endpoint="https://www.icloud.com",
|
|
# For China, use "https://setup.icloud.com.cn/setup/ws/1"
|
|
setup_endpoint="https://setup.icloud.com/setup/ws/1",
|
|
):
|
|
if password is None:
|
|
password = get_password_from_keyring(apple_id)
|
|
|
|
self.user = {"accountName": apple_id, "password": password}
|
|
self.data = {}
|
|
self.params = {}
|
|
self.client_id = client_id or (f"auth-{str(uuid1()).lower()}")
|
|
self.with_family = with_family
|
|
self.auth_endpoint = auth_endpoint
|
|
self.home_endpoint = home_endpoint
|
|
self.setup_endpoint = setup_endpoint
|
|
|
|
self.password_filter = ICloudPyPasswordFilter(password)
|
|
LOGGER.addFilter(self.password_filter)
|
|
|
|
if cookie_directory:
|
|
self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
|
|
if not path.exists(self._cookie_directory):
|
|
mkdir(self._cookie_directory, 0o700)
|
|
else:
|
|
topdir = path.join(gettempdir(), "icloudpy")
|
|
self._cookie_directory = path.join(topdir, getpass.getuser())
|
|
if not path.exists(topdir):
|
|
mkdir(topdir, 0o777)
|
|
if not path.exists(self._cookie_directory):
|
|
mkdir(self._cookie_directory, 0o700)
|
|
|
|
LOGGER.debug("Using session file %s", self.session_path)
|
|
|
|
self.session_data = {}
|
|
try:
|
|
with open(self.session_path, encoding="utf-8") as session_f:
|
|
self.session_data = json.load(session_f)
|
|
except: # pylint: disable=bare-except
|
|
LOGGER.info("Session file does not exist")
|
|
if self.session_data.get("client_id"):
|
|
self.client_id = self.session_data.get("client_id")
|
|
self.params["clientId"] = self.client_id
|
|
else:
|
|
self.session_data.update({"client_id": self.client_id})
|
|
self.params["clientId"] = self.client_id
|
|
|
|
self.session = ICloudPySession(self)
|
|
self.session.verify = verify
|
|
self.session.headers.update(
|
|
{"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"}
|
|
)
|
|
|
|
cookiejar_path = self.cookiejar_path
|
|
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
|
|
if path.exists(cookiejar_path):
|
|
try:
|
|
self.session.cookies.load(ignore_discard=True, ignore_expires=True)
|
|
LOGGER.debug("Read cookies from %s", cookiejar_path)
|
|
except: # pylint: disable=bare-except
|
|
# Most likely a pickled cookiejar from earlier versions.
|
|
# The cookiejar will get replaced with a valid one after
|
|
# successful authentication.
|
|
LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
|
|
|
|
self.authenticate()
|
|
|
|
self._drive = None
|
|
self._work = None
|
|
self._files = None
|
|
self._photos = None
|
|
|
|
def authenticate(self, force_refresh=False, service=None):
|
|
"""
|
|
Handles authentication, and persists cookies so that
|
|
subsequent logins will not cause additional e-mails from Apple.
|
|
"""
|
|
|
|
login_successful = False
|
|
if self.session_data.get("session_token") and not force_refresh:
|
|
LOGGER.debug("Checking session token validity")
|
|
try:
|
|
self.data = self._validate_token()
|
|
login_successful = True
|
|
except ICloudPyAPIResponseException:
|
|
LOGGER.debug("Invalid authentication token, will log in from scratch.")
|
|
|
|
if not login_successful and service is not None:
|
|
app = self.data["apps"][service]
|
|
if (
|
|
"canLaunchWithOneFactor" in app
|
|
and app["canLaunchWithOneFactor"] is True
|
|
):
|
|
LOGGER.debug(
|
|
"Authenticating as %s for %s", self.user["accountName"], service
|
|
)
|
|
|
|
try:
|
|
self._authenticate_with_credentials_service(service)
|
|
login_successful = True
|
|
except Exception as error:
|
|
LOGGER.debug(
|
|
"Could not log into service. Attempting brand new login. %s",
|
|
str(error),
|
|
)
|
|
|
|
if not login_successful:
|
|
LOGGER.debug("Authenticating as %s", self.user["accountName"])
|
|
|
|
data = dict(self.user)
|
|
|
|
data["rememberMe"] = True
|
|
data["trustTokens"] = []
|
|
if self.session_data.get("trust_token"):
|
|
data["trustTokens"] = [self.session_data.get("trust_token")]
|
|
|
|
headers = self._get_auth_headers()
|
|
|
|
if self.session_data.get("scnt"):
|
|
headers["scnt"] = self.session_data.get("scnt")
|
|
|
|
if self.session_data.get("session_id"):
|
|
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
|
|
|
|
try:
|
|
self.session.post(
|
|
f"{self.auth_endpoint}/signin",
|
|
params={"isRememberMeEnabled": "true"},
|
|
data=json.dumps(data),
|
|
headers=headers,
|
|
)
|
|
except ICloudPyAPIResponseException as error:
|
|
msg = "Invalid email/password combination."
|
|
raise ICloudPyFailedLoginException(msg, error) from error
|
|
|
|
self._authenticate_with_token()
|
|
|
|
self._webservices = self.data["webservices"]
|
|
|
|
LOGGER.debug("Authentication completed successfully")
|
|
|
|
def _authenticate_with_token(self):
|
|
"""Authenticate using session token."""
|
|
data = {
|
|
"accountCountryCode": self.session_data.get("account_country"),
|
|
"dsWebAuthToken": self.session_data.get("session_token"),
|
|
"extended_login": True,
|
|
"trustToken": self.session_data.get("trust_token", ""),
|
|
}
|
|
|
|
try:
|
|
req = self.session.post(
|
|
f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
|
|
)
|
|
self.data = req.json()
|
|
except ICloudPyAPIResponseException as error:
|
|
msg = "Invalid authentication token."
|
|
raise ICloudPyFailedLoginException(msg, error) from error
|
|
|
|
def _authenticate_with_credentials_service(self, service):
|
|
"""Authenticate to a specific service using credentials."""
|
|
data = {
|
|
"appName": service,
|
|
"apple_id": self.user["accountName"],
|
|
"password": self.user["password"],
|
|
}
|
|
|
|
try:
|
|
self.session.post(
|
|
f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
|
|
)
|
|
|
|
self.data = self._validate_token()
|
|
except ICloudPyAPIResponseException as error:
|
|
msg = "Invalid email/password combination."
|
|
raise ICloudPyFailedLoginException(msg, error) from error
|
|
|
|
def _validate_token(self):
|
|
"""Checks if the current access token is still valid."""
|
|
LOGGER.debug("Checking session token validity")
|
|
try:
|
|
req = self.session.post(f"{self.setup_endpoint}/validate", data="null")
|
|
LOGGER.debug("Session token is still valid")
|
|
return req.json()
|
|
except ICloudPyAPIResponseException as err:
|
|
LOGGER.debug("Invalid authentication token")
|
|
raise err
|
|
|
|
def _get_auth_headers(self, overrides=None):
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Content-Type": "application/json",
|
|
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
|
|
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
|
|
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
|
|
"X-Apple-OAuth-Require-Grant-Code": "true",
|
|
"X-Apple-OAuth-Response-Mode": "web_message",
|
|
"X-Apple-OAuth-Response-Type": "code",
|
|
"X-Apple-OAuth-State": self.client_id,
|
|
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
|
|
}
|
|
if overrides:
|
|
headers.update(overrides)
|
|
return headers
|
|
|
|
@property
|
|
def cookiejar_path(self):
|
|
"""Get path for cookiejar file."""
|
|
return path.join(
|
|
self._cookie_directory,
|
|
"".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
|
|
)
|
|
|
|
@property
|
|
def session_path(self):
|
|
"""Get path for session data file."""
|
|
return path.join(
|
|
self._cookie_directory,
|
|
"".join([c for c in self.user.get("accountName") if match(r"\w", c)])
|
|
+ ".session",
|
|
)
|
|
|
|
@property
|
|
def requires_2sa(self):
|
|
"""Returns True if two-step authentication is required."""
|
|
return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
|
|
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
|
|
)
|
|
|
|
@property
|
|
def requires_2fa(self):
|
|
"""Returns True if two-factor authentication is required."""
|
|
return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
|
|
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
|
|
)
|
|
|
|
@property
|
|
def is_trusted_session(self):
|
|
"""Returns True if the session is trusted."""
|
|
return self.data.get("hsaTrustedBrowser", False)
|
|
|
|
@property
|
|
def trusted_devices(self):
|
|
"""Returns devices trusted for two-step authentication."""
|
|
request = self.session.get(
|
|
f"{self.setup_endpoint}/listDevices", params=self.params
|
|
)
|
|
return request.json().get("devices")
|
|
|
|
def send_verification_code(self, device):
|
|
"""Requests that a verification code is sent to the given device."""
|
|
data = json.dumps(device)
|
|
request = self.session.post(
|
|
f"{self.setup_endpoint}/sendVerificationCode",
|
|
params=self.params,
|
|
data=data,
|
|
)
|
|
return request.json().get("success", False)
|
|
|
|
def validate_verification_code(self, device, code):
|
|
"""Verifies a verification code received on a trusted device."""
|
|
device.update({"verificationCode": code, "trustBrowser": True})
|
|
data = json.dumps(device)
|
|
|
|
try:
|
|
self.session.post(
|
|
f"{self.setup_endpoint}/validateVerificationCode",
|
|
params=self.params,
|
|
data=data,
|
|
)
|
|
except ICloudPyAPIResponseException as error:
|
|
if error.code == -21669:
|
|
# Wrong verification code
|
|
return False
|
|
raise
|
|
|
|
self.trust_session()
|
|
|
|
return not self.requires_2sa
|
|
|
|
def validate_2fa_code(self, code):
|
|
"""Verifies a verification code received via Apple's 2FA system (HSA2)."""
|
|
data = {"securityCode": {"code": code}}
|
|
|
|
headers = self._get_auth_headers({"Accept": "application/json"})
|
|
|
|
if self.session_data.get("scnt"):
|
|
headers["scnt"] = self.session_data.get("scnt")
|
|
|
|
if self.session_data.get("session_id"):
|
|
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
|
|
|
|
try:
|
|
self.session.post(
|
|
f"{self.auth_endpoint}/verify/trusteddevice/securitycode",
|
|
data=json.dumps(data),
|
|
headers=headers,
|
|
)
|
|
except ICloudPyAPIResponseException as error:
|
|
if error.code == -21669:
|
|
# Wrong verification code
|
|
LOGGER.error("Code verification failed.")
|
|
return False
|
|
raise
|
|
|
|
LOGGER.debug("Code verification successful.")
|
|
|
|
self.trust_session()
|
|
return not self.requires_2sa
|
|
|
|
def trust_session(self):
|
|
"""Request session trust to avoid user log in going forward."""
|
|
headers = self._get_auth_headers()
|
|
|
|
if self.session_data.get("scnt"):
|
|
headers["scnt"] = self.session_data.get("scnt")
|
|
|
|
if self.session_data.get("session_id"):
|
|
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
|
|
|
|
try:
|
|
self.session.get(
|
|
f"{self.auth_endpoint}/2sv/trust",
|
|
headers=headers,
|
|
)
|
|
self._authenticate_with_token()
|
|
return True
|
|
except ICloudPyAPIResponseException:
|
|
LOGGER.error("Session trust failed.")
|
|
return False
|
|
|
|
def _get_webservice_url(self, ws_key):
|
|
"""Get webservice URL, raise an exception if not exists."""
|
|
if self._webservices.get(ws_key) is None:
|
|
raise ICloudPyServiceNotActivatedException(
|
|
"Webservice not available", ws_key
|
|
)
|
|
return self._webservices[ws_key]["url"]
|
|
|
|
@property
|
|
def devices(self):
|
|
"""Returns all devices."""
|
|
service_root = self._get_webservice_url("findme")
|
|
return FindMyiPhoneServiceManager(
|
|
service_root, self.session, self.params, self.with_family
|
|
)
|
|
|
|
@property
|
|
def iphone(self):
|
|
"""Returns the iPhone."""
|
|
return self.devices[0]
|
|
|
|
@property
|
|
def drive(self):
|
|
"""Gets the 'Drive' service."""
|
|
if not self._drive:
|
|
self._drive = DriveService(
|
|
service_root=self._get_webservice_url("drivews"),
|
|
document_root=self._get_webservice_url("docws"),
|
|
session=self.session,
|
|
params=self.params,
|
|
)
|
|
return self._drive
|
|
|
|
@property
|
|
def work(self):
|
|
"""Gets the 'Work' service."""
|
|
if not self._work:
|
|
self._work = WorkService(
|
|
document_root=self._get_webservice_url("iworkexportws"),
|
|
session=self.session,
|
|
params=self.params,
|
|
client_id=self.client_id,
|
|
dsid=self.data["dsInfo"].get("dsid"),
|
|
)
|
|
return self._work
|
|
|
|
def __unicode__(self):
|
|
return f"iCloud API: {self.user.get('accountName')}"
|
|
|
|
def __str__(self):
|
|
as_unicode = self.__unicode__()
|
|
if PY2:
|
|
return as_unicode.encode("utf-8", "ignore")
|
|
return as_unicode
|
|
|
|
def __repr__(self):
|
|
return f"<{str(self)}>"
|