"""Library base file.""" import getpass import http.cookiejar as cookielib import inspect import json import logging from os import mkdir, path from re import match from tempfile import gettempdir from uuid import uuid1 from requests import Session from six import PY2 from icloudpy.exceptions import ( ICloudPy2SARequiredException, ICloudPyAPIResponseException, ICloudPyFailedLoginException, ICloudPyServiceNotActivatedException, ) from icloudpy.services import ( DriveService, WorkService ) from icloudpy.utils import get_password_from_keyring LOGGER = logging.getLogger(__name__) HEADER_DATA = { "X-Apple-ID-Account-Country": "account_country", "X-Apple-ID-Session-Id": "session_id", "X-Apple-Session-Token": "session_token", "X-Apple-TwoSV-Trust-Token": "trust_token", "scnt": "scnt", } class ICloudPyPasswordFilter(logging.Filter): """Password log hider.""" def __init__(self, password): super().__init__(password) def filter(self, record): message = record.getMessage() if self.name in message: record.msg = message.replace(self.name, "*" * 8) record.args = [] return True class ICloudPySession(Session): """iCloud session.""" def __init__(self, service): self.service = service Session.__init__(self) def request(self, method, url, **kwargs): # pylint: disable=arguments-differ # Charge logging to the right service endpoint callee = inspect.stack()[2] module = inspect.getmodule(callee[0]) request_logger = logging.getLogger(module.__name__).getChild("http") if self.service.password_filter not in request_logger.filters: request_logger.addFilter(self.service.password_filter) request_logger.debug(f"{method} {url} {kwargs.get('data', '')}") has_retried = kwargs.get("retried") kwargs.pop("retried", None) response = super().request(method, url, **kwargs) content_type = response.headers.get("Content-Type", "").split(";")[0] json_mimetypes = ["application/json", "text/json"] for header, value in HEADER_DATA.items(): if response.headers.get(header): session_arg = value self.service.session_data.update( {session_arg: response.headers.get(header)} ) # Save session_data to file with open(self.service.session_path, "w", encoding="utf-8") as outfile: json.dump(self.service.session_data, outfile) LOGGER.debug("Saved session data to file") # Save cookies to file if kwargs.get('save_cookie', True): self.cookies.save(ignore_discard=True, ignore_expires=True) LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path) # print(response.text) if not response.ok and ( content_type not in json_mimetypes or response.status_code in [421, 450, 500] ): try: # pylint: disable=W0212 fmip_url = self.service._get_webservice_url("findme") if ( has_retried is None and response.status_code == 450 and fmip_url in url ): # Handle re-authentication for Find My iPhone LOGGER.debug("Re-authenticating Find My iPhone service") try: self.service.authenticate(True, "find") except ICloudPyAPIResponseException: LOGGER.debug("Re-authentication failed") kwargs["retried"] = True return self.request(method, url, **kwargs) except Exception: pass if has_retried is None and response.status_code in [421, 450, 500]: api_error = ICloudPyAPIResponseException( response.reason, response.status_code, retry=True ) request_logger.debug(api_error) kwargs["retried"] = True return self.request(method, url, **kwargs) self._raise_error(response.status_code, response.reason) if content_type not in json_mimetypes: return response try: data = response.json() except: # pylint: disable=bare-except request_logger.warning("Failed to parse response with JSON mimetype") return response request_logger.debug(data) if isinstance(data, dict): reason = data.get("errorMessage") reason = reason or data.get("reason") reason = reason or data.get("errorReason") if not reason and isinstance(data.get("error"), str): reason = data.get("error") if not reason and data.get("error"): reason = "Unknown reason" code = data.get("errorCode") if not code and data.get("serverErrorCode"): code = data.get("serverErrorCode") if reason: self._raise_error(code, reason) return response def _raise_error(self, code, reason): if ( self.service.requires_2sa and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie" ): raise ICloudPy2SARequiredException(self.service.user["apple_id"]) if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"): reason = ( reason + ". Please log into https://icloud.com/ to manually " "finish setting up your iCloud service" ) api_error = ICloudPyServiceNotActivatedException(reason, code) LOGGER.error(api_error) raise api_error if code == "ACCESS_DENIED": reason = ( reason + ". Please wait a few minutes then try again." "The remote servers might be trying to throttle requests." ) if code in [421, 450, 500]: reason = "Authentication required for Account." api_error = ICloudPyAPIResponseException(reason, code) print(api_error) LOGGER.error(api_error) raise api_error # Public method to resolve linting error def raise_error(self, code, reason): return self._raise_error(code=code, reason=reason) class ICloudPyService: """ A base authentication class for the iCloud service. Handles the authentication required to access iCloud services. Usage: from src import ICloudPyService icloudpy = ICloudPyService('username@apple.com', 'password') icloudpy.iphone.location() """ def __init__( self, apple_id, password=None, cookie_directory=None, verify=True, client_id=None, with_family=True, auth_endpoint="https://idmsa.apple.com/appleauth/auth", # For China, use "https://www.icloud.com.cn" home_endpoint="https://www.icloud.com", # For China, use "https://setup.icloud.com.cn/setup/ws/1" setup_endpoint="https://setup.icloud.com/setup/ws/1", ): if password is None: password = get_password_from_keyring(apple_id) self.user = {"accountName": apple_id, "password": password} self.data = {} self.params = {} self.client_id = client_id or (f"auth-{str(uuid1()).lower()}") self.with_family = with_family self.auth_endpoint = auth_endpoint self.home_endpoint = home_endpoint self.setup_endpoint = setup_endpoint self.password_filter = ICloudPyPasswordFilter(password) LOGGER.addFilter(self.password_filter) if cookie_directory: self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) if not path.exists(self._cookie_directory): mkdir(self._cookie_directory, 0o700) else: topdir = path.join(gettempdir(), "icloudpy") self._cookie_directory = path.join(topdir, getpass.getuser()) if not path.exists(topdir): mkdir(topdir, 0o777) if not path.exists(self._cookie_directory): mkdir(self._cookie_directory, 0o700) LOGGER.debug("Using session file %s", self.session_path) self.session_data = {} try: with open(self.session_path, encoding="utf-8") as session_f: self.session_data = json.load(session_f) except: # pylint: disable=bare-except LOGGER.info("Session file does not exist") if self.session_data.get("client_id"): self.client_id = self.session_data.get("client_id") self.params["clientId"] = self.client_id else: self.session_data.update({"client_id": self.client_id}) self.params["clientId"] = self.client_id self.session = ICloudPySession(self) self.session.verify = verify self.session.headers.update( {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"} ) cookiejar_path = self.cookiejar_path self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) if path.exists(cookiejar_path): try: self.session.cookies.load(ignore_discard=True, ignore_expires=True) LOGGER.debug("Read cookies from %s", cookiejar_path) except: # pylint: disable=bare-except # Most likely a pickled cookiejar from earlier versions. # The cookiejar will get replaced with a valid one after # successful authentication. LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) self.authenticate() self._drive = None self._work = None self._files = None self._photos = None def authenticate(self, force_refresh=False, service=None): """ Handles authentication, and persists cookies so that subsequent logins will not cause additional e-mails from Apple. """ login_successful = False if self.session_data.get("session_token") and not force_refresh: LOGGER.debug("Checking session token validity") try: self.data = self._validate_token() login_successful = True except ICloudPyAPIResponseException: LOGGER.debug("Invalid authentication token, will log in from scratch.") if not login_successful and service is not None: app = self.data["apps"][service] if ( "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"] is True ): LOGGER.debug( "Authenticating as %s for %s", self.user["accountName"], service ) try: self._authenticate_with_credentials_service(service) login_successful = True except Exception as error: LOGGER.debug( "Could not log into service. Attempting brand new login. %s", str(error), ) if not login_successful: LOGGER.debug("Authenticating as %s", self.user["accountName"]) data = dict(self.user) data["rememberMe"] = True data["trustTokens"] = [] if self.session_data.get("trust_token"): data["trustTokens"] = [self.session_data.get("trust_token")] headers = self._get_auth_headers() if self.session_data.get("scnt"): headers["scnt"] = self.session_data.get("scnt") if self.session_data.get("session_id"): headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") try: self.session.post( f"{self.auth_endpoint}/signin", params={"isRememberMeEnabled": "true"}, data=json.dumps(data), headers=headers, ) except ICloudPyAPIResponseException as error: msg = "Invalid email/password combination." raise ICloudPyFailedLoginException(msg, error) from error self._authenticate_with_token() self._webservices = self.data["webservices"] LOGGER.debug("Authentication completed successfully") def _authenticate_with_token(self): """Authenticate using session token.""" data = { "accountCountryCode": self.session_data.get("account_country"), "dsWebAuthToken": self.session_data.get("session_token"), "extended_login": True, "trustToken": self.session_data.get("trust_token", ""), } try: req = self.session.post( f"{self.setup_endpoint}/accountLogin", data=json.dumps(data) ) self.data = req.json() except ICloudPyAPIResponseException as error: msg = "Invalid authentication token." raise ICloudPyFailedLoginException(msg, error) from error def _authenticate_with_credentials_service(self, service): """Authenticate to a specific service using credentials.""" data = { "appName": service, "apple_id": self.user["accountName"], "password": self.user["password"], } try: self.session.post( f"{self.setup_endpoint}/accountLogin", data=json.dumps(data) ) self.data = self._validate_token() except ICloudPyAPIResponseException as error: msg = "Invalid email/password combination." raise ICloudPyFailedLoginException(msg, error) from error def _validate_token(self): """Checks if the current access token is still valid.""" LOGGER.debug("Checking session token validity") try: req = self.session.post(f"{self.setup_endpoint}/validate", data="null") LOGGER.debug("Session token is still valid") return req.json() except ICloudPyAPIResponseException as err: LOGGER.debug("Invalid authentication token") raise err def _get_auth_headers(self, overrides=None): headers = { "Accept": "*/*", "Content-Type": "application/json", "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", "X-Apple-OAuth-Client-Type": "firstPartyAuth", "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", "X-Apple-OAuth-Require-Grant-Code": "true", "X-Apple-OAuth-Response-Mode": "web_message", "X-Apple-OAuth-Response-Type": "code", "X-Apple-OAuth-State": self.client_id, "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", } if overrides: headers.update(overrides) return headers @property def cookiejar_path(self): """Get path for cookiejar file.""" return path.join( self._cookie_directory, "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), ) @property def session_path(self): """Get path for session data file.""" return path.join( self._cookie_directory, "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) + ".session", ) @property def requires_2sa(self): """Returns True if two-step authentication is required.""" return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and ( self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session ) @property def requires_2fa(self): """Returns True if two-factor authentication is required.""" return self.data["dsInfo"].get("hsaVersion", 0) == 2 and ( self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session ) @property def is_trusted_session(self): """Returns True if the session is trusted.""" return self.data.get("hsaTrustedBrowser", False) @property def trusted_devices(self): """Returns devices trusted for two-step authentication.""" request = self.session.get( f"{self.setup_endpoint}/listDevices", params=self.params ) return request.json().get("devices") def send_verification_code(self, device): """Requests that a verification code is sent to the given device.""" data = json.dumps(device) request = self.session.post( f"{self.setup_endpoint}/sendVerificationCode", params=self.params, data=data, ) return request.json().get("success", False) def validate_verification_code(self, device, code): """Verifies a verification code received on a trusted device.""" device.update({"verificationCode": code, "trustBrowser": True}) data = json.dumps(device) try: self.session.post( f"{self.setup_endpoint}/validateVerificationCode", params=self.params, data=data, ) except ICloudPyAPIResponseException as error: if error.code == -21669: # Wrong verification code return False raise self.trust_session() return not self.requires_2sa def validate_2fa_code(self, code): """Verifies a verification code received via Apple's 2FA system (HSA2).""" data = {"securityCode": {"code": code}} headers = self._get_auth_headers({"Accept": "application/json"}) if self.session_data.get("scnt"): headers["scnt"] = self.session_data.get("scnt") if self.session_data.get("session_id"): headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") try: self.session.post( f"{self.auth_endpoint}/verify/trusteddevice/securitycode", data=json.dumps(data), headers=headers, ) except ICloudPyAPIResponseException as error: if error.code == -21669: # Wrong verification code LOGGER.error("Code verification failed.") return False raise LOGGER.debug("Code verification successful.") self.trust_session() return not self.requires_2sa def trust_session(self): """Request session trust to avoid user log in going forward.""" headers = self._get_auth_headers() if self.session_data.get("scnt"): headers["scnt"] = self.session_data.get("scnt") if self.session_data.get("session_id"): headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") try: self.session.get( f"{self.auth_endpoint}/2sv/trust", headers=headers, ) self._authenticate_with_token() return True except ICloudPyAPIResponseException: LOGGER.error("Session trust failed.") return False def _get_webservice_url(self, ws_key): """Get webservice URL, raise an exception if not exists.""" if self._webservices.get(ws_key) is None: raise ICloudPyServiceNotActivatedException( "Webservice not available", ws_key ) return self._webservices[ws_key]["url"] @property def devices(self): """Returns all devices.""" service_root = self._get_webservice_url("findme") return FindMyiPhoneServiceManager( service_root, self.session, self.params, self.with_family ) @property def iphone(self): """Returns the iPhone.""" return self.devices[0] @property def drive(self): """Gets the 'Drive' service.""" if not self._drive: self._drive = DriveService( service_root=self._get_webservice_url("drivews"), document_root=self._get_webservice_url("docws"), session=self.session, params=self.params, ) return self._drive @property def work(self): """Gets the 'Work' service.""" if not self._work: self._work = WorkService( document_root=self._get_webservice_url("iworkexportws"), session=self.session, params=self.params, client_id=self.client_id, dsid=self.data["dsInfo"].get("dsid"), ) return self._work def __unicode__(self): return f"iCloud API: {self.user.get('accountName')}" def __str__(self): as_unicode = self.__unicode__() if PY2: return as_unicode.encode("utf-8", "ignore") return as_unicode def __repr__(self): return f"<{str(self)}>"