commit 722be30443539c0beb291a28214e5ece210f6c94 Author: Darkeum Date: Tue Dec 17 14:40:56 2024 +0400 Первая публикация diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7ce6ada --- /dev/null +++ b/.gitignore @@ -0,0 +1,143 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + + +# Pyre type checker +.pyre/ + +# Other +src/drive +allure-results +allure-report +.history +ignore-config.yaml +gh-pages +*.pub +session_data/ +session_data_original/ +icloud/ +last_update.txt \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..d6e7580 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "cSpell.words": [ + "Сконвертированый" + ] +} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..a93ef63 --- /dev/null +++ b/README.md @@ -0,0 +1,94 @@ +# iCloud Drive Sync + +Скрипт для синхронизации фалов из iCloud Drive в локальное хранилище. Скрипт также поддерживает конвертацию фалов `.numbers` в формат `.xlsx` + +## Требования + +- Python 3.9, 3.10, 3.11, 3.12 и 3.13 + +Протестировано на Python 3.12.3 + +## Установка + +Для установки всех необходимых зависимостей для работы скрипта нужно выполнить следующею команду: + +```sh +pip install -r requirements.txt +``` + +## Настройка + +Все настройки хранятся в файле `config.yaml` + +```yaml +app: + logger: + level: "info" # Тип логирование в консоль доступно: DEBUG, INFO, WARNING, ERROR, CRITICAL + filename: "icloud.log" # Файл где, будут записываться логи + credentials: + username: "ki@atri-energo.ru" # Имя пользователя в ICloud + retry_login_interval: 900 # Интервал повторной попытки авторизации + root: "icloud" # Корневой каталог + region: global # Регион авторизации в ICloud, доступно global, china +drive: + destination: "drive" # Каталог в котором будут сохраняться синхронизированные фалы + destination_export: "drive_export" # Каталог в котором будут сохраняться конвертированные фалы + remove_obsolete: false # Удалять ли локальные файлы, которые больше не доступны в ICloud + sync_interval: 300 # Интервал синхронизации + filters: + file_name: + # Список файлов которые необходимо синхронизировать + - "Пустой" + - "Копия Тех_2022_осн директория" + folders: + # Список каталогов которые необходимо синхронизировать + - "folder1" + - "folder2" + - "folder3" + file_extensions: + #Список расширений файлов которые необходимо синхронизировать + - "pdf" + - "png" + - "jpg" + - "jpeg" + ignore: + # Список игнорируемых каталогов и файлов + # При указании путей к папкам добавляйте /* + - "node_modules/*" + - "*.md" + # Список конвертируемых файлов из формата numbers в формат xlsx + convert: + - name: "Пустой" # Имя файла + secret: "AzkiMDEYAetglYTt75QvN+hABQhpixoT3UWVxy3ELL8=:XQnMIaEj5Ov4oBqhpxAGIA==:DVRgHa1LjxlWnwYqdlN4VA==" # Секретный ключ файла, как его получить смотрите ниже +``` + +## Авторизация + +Для того чтобы авторизоваться необходимо в первую очередь выполнить следующую команду + +```sh +icloud --username=mail@example.com --session-directory="C:\icloud\session_data" +``` + +- `mail@example.com` это почта (имя пользователя) для авторизации в ICloud +- `C:\icloud\session_data` это полный путь до каталога session_data в корне каталога скрипта, где будет храниться сессия авторизации + +Во время выполнения вам будет предложено ввести пароль от аккаунта ICloud и сохранить ключ авторизации в хранилище `keyring` нужно согласиться. +Затем, если включена двухфакторная авторизация, то попросит ввести код из SMS или устройства. + +> Внимание: В последнее время SMS на Россииские номера приходя через раз или вообще могут не прийти, поэтому стоит повторить попытку. Как вариант можно дойти до этого шага и попробовать авторизоваться в ICloud через браузер, дойти до шага ввода кода и если он пришел то ввести его не в браузер в консоль скрипта. + +> Внимание: Из-за политики Apple авторизация может слететь через пару месяцев, в этом случае необходимо выполнить команду авторизации заново. + +## Запуск скрипта + +Если авторизация прошла успешно и файл конфигурации заполнен, то можно выполнить команду запуска скрипта: + +```sh +py main.py +``` + +Данная команда синхронизирует необходимы каталоги и файлы, и при необходимости производит конвертацию файлов конвертацию фалов `.numbers` в формат `.xlsx` результат можно увидеть в указанных в файле конфигурации каталогов по умолчанию в /icloud/drive и /icloud/drive_export соответственно. + +## Получение секретного ключа для конвертации +![](https://git.ae-work.ru/darkeum/bars-icloud/raw/commit/6813ae02265db16e71724844acf2828925cbcacd/secret.gif) \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..49af474 --- /dev/null +++ b/config.yaml @@ -0,0 +1,26 @@ +app: + logger: + level: "info" + filename: "icloud.log" + credentials: + username: "ki@atri-energo.ru" + retry_login_interval: 900 + root: "icloud" + region: global +drive: + destination: "drive" + destination_export: "drive_export" + remove_obsolete: false + sync_interval: 300 + filters: + file_name: + - "Пустой" + - "Копия Тех_2022_осн директория" + convert: + - + name: "Пустой" + secret: "AzkiMDEYAetglYTt75QvN+hABQhpixoT3UWVxy3ELL8=:XQnMIaEj5Ov4oBqhpxAGIA==:DVRgHa1LjxlWnwYqdlN4VA==" + - + name: "Копия Тех_2022_осн директория" + secret: "ksbIPXw/yn5FX3CzY3x6AkcWYkk0ooazj1PMcVo9xYw=:qVfmzdZWtRoUBtjMw3MY2g==:Wevg/VFPzNyYoLqLfBCB3Q==" + diff --git a/icloudpy/__init__.py b/icloudpy/__init__.py new file mode 100644 index 0000000..93b09f6 --- /dev/null +++ b/icloudpy/__init__.py @@ -0,0 +1,6 @@ +"""The iCloudPy library.""" +import logging + +from icloudpy.base import ICloudPyService # pylint: disable=unused-import + +logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/icloudpy/base.py b/icloudpy/base.py new file mode 100644 index 0000000..7bf47f8 --- /dev/null +++ b/icloudpy/base.py @@ -0,0 +1,598 @@ +"""Library base file.""" +import getpass +import http.cookiejar as cookielib +import inspect +import json +import logging +from os import mkdir, path +from re import match +from tempfile import gettempdir +from uuid import uuid1 + +from requests import Session +from six import PY2 + +from icloudpy.exceptions import ( + ICloudPy2SARequiredException, + ICloudPyAPIResponseException, + ICloudPyFailedLoginException, + ICloudPyServiceNotActivatedException, +) +from icloudpy.services import ( + DriveService, + WorkService +) +from icloudpy.utils import get_password_from_keyring + +LOGGER = logging.getLogger(__name__) + +HEADER_DATA = { + "X-Apple-ID-Account-Country": "account_country", + "X-Apple-ID-Session-Id": "session_id", + "X-Apple-Session-Token": "session_token", + "X-Apple-TwoSV-Trust-Token": "trust_token", + "scnt": "scnt", +} + + +class ICloudPyPasswordFilter(logging.Filter): + """Password log hider.""" + + def __init__(self, password): + super().__init__(password) + + def filter(self, record): + message = record.getMessage() + if self.name in message: + record.msg = message.replace(self.name, "*" * 8) + record.args = [] + + return True + + +class ICloudPySession(Session): + """iCloud session.""" + + def __init__(self, service): + self.service = service + Session.__init__(self) + + def request(self, method, url, **kwargs): # pylint: disable=arguments-differ + + # Charge logging to the right service endpoint + callee = inspect.stack()[2] + module = inspect.getmodule(callee[0]) + request_logger = logging.getLogger(module.__name__).getChild("http") + if self.service.password_filter not in request_logger.filters: + request_logger.addFilter(self.service.password_filter) + + request_logger.debug(f"{method} {url} {kwargs.get('data', '')}") + + has_retried = kwargs.get("retried") + kwargs.pop("retried", None) + response = super().request(method, url, **kwargs) + + content_type = response.headers.get("Content-Type", "").split(";")[0] + json_mimetypes = ["application/json", "text/json"] + + for header, value in HEADER_DATA.items(): + if response.headers.get(header): + session_arg = value + self.service.session_data.update( + {session_arg: response.headers.get(header)} + ) + + # Save session_data to file + with open(self.service.session_path, "w", encoding="utf-8") as outfile: + json.dump(self.service.session_data, outfile) + LOGGER.debug("Saved session data to file") + + # Save cookies to file + if kwargs.get('save_cookie', True): + self.cookies.save(ignore_discard=True, ignore_expires=True) + LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path) + # print(response.text) + if not response.ok and ( + content_type not in json_mimetypes + or response.status_code in [421, 450, 500] + ): + try: + # pylint: disable=W0212 + fmip_url = self.service._get_webservice_url("findme") + if ( + has_retried is None + and response.status_code == 450 + and fmip_url in url + ): + # Handle re-authentication for Find My iPhone + LOGGER.debug("Re-authenticating Find My iPhone service") + try: + self.service.authenticate(True, "find") + except ICloudPyAPIResponseException: + LOGGER.debug("Re-authentication failed") + kwargs["retried"] = True + return self.request(method, url, **kwargs) + except Exception: + pass + + if has_retried is None and response.status_code in [421, 450, 500]: + api_error = ICloudPyAPIResponseException( + response.reason, response.status_code, retry=True + ) + request_logger.debug(api_error) + kwargs["retried"] = True + return self.request(method, url, **kwargs) + + self._raise_error(response.status_code, response.reason) + + if content_type not in json_mimetypes: + return response + + try: + data = response.json() + except: # pylint: disable=bare-except + request_logger.warning("Failed to parse response with JSON mimetype") + return response + + request_logger.debug(data) + + if isinstance(data, dict): + reason = data.get("errorMessage") + reason = reason or data.get("reason") + reason = reason or data.get("errorReason") + if not reason and isinstance(data.get("error"), str): + reason = data.get("error") + if not reason and data.get("error"): + reason = "Unknown reason" + + code = data.get("errorCode") + if not code and data.get("serverErrorCode"): + code = data.get("serverErrorCode") + + if reason: + self._raise_error(code, reason) + + return response + + def _raise_error(self, code, reason): + if ( + self.service.requires_2sa + and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie" + ): + raise ICloudPy2SARequiredException(self.service.user["apple_id"]) + if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"): + reason = ( + reason + ". Please log into https://icloud.com/ to manually " + "finish setting up your iCloud service" + ) + api_error = ICloudPyServiceNotActivatedException(reason, code) + LOGGER.error(api_error) + + raise api_error + if code == "ACCESS_DENIED": + reason = ( + reason + ". Please wait a few minutes then try again." + "The remote servers might be trying to throttle requests." + ) + if code in [421, 450, 500]: + reason = "Authentication required for Account." + + + api_error = ICloudPyAPIResponseException(reason, code) + print(api_error) + LOGGER.error(api_error) + raise api_error + + # Public method to resolve linting error + def raise_error(self, code, reason): + return self._raise_error(code=code, reason=reason) + + +class ICloudPyService: + """ + A base authentication class for the iCloud service. Handles the + authentication required to access iCloud services. + + Usage: + from src import ICloudPyService + icloudpy = ICloudPyService('username@apple.com', 'password') + icloudpy.iphone.location() + """ + + def __init__( + self, + apple_id, + password=None, + cookie_directory=None, + verify=True, + client_id=None, + with_family=True, + auth_endpoint="https://idmsa.apple.com/appleauth/auth", + # For China, use "https://www.icloud.com.cn" + home_endpoint="https://www.icloud.com", + # For China, use "https://setup.icloud.com.cn/setup/ws/1" + setup_endpoint="https://setup.icloud.com/setup/ws/1", + ): + if password is None: + password = get_password_from_keyring(apple_id) + + self.user = {"accountName": apple_id, "password": password} + self.data = {} + self.params = {} + self.client_id = client_id or (f"auth-{str(uuid1()).lower()}") + self.with_family = with_family + self.auth_endpoint = auth_endpoint + self.home_endpoint = home_endpoint + self.setup_endpoint = setup_endpoint + + self.password_filter = ICloudPyPasswordFilter(password) + LOGGER.addFilter(self.password_filter) + + if cookie_directory: + self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) + if not path.exists(self._cookie_directory): + mkdir(self._cookie_directory, 0o700) + else: + topdir = path.join(gettempdir(), "icloudpy") + self._cookie_directory = path.join(topdir, getpass.getuser()) + if not path.exists(topdir): + mkdir(topdir, 0o777) + if not path.exists(self._cookie_directory): + mkdir(self._cookie_directory, 0o700) + + LOGGER.debug("Using session file %s", self.session_path) + + self.session_data = {} + try: + with open(self.session_path, encoding="utf-8") as session_f: + self.session_data = json.load(session_f) + except: # pylint: disable=bare-except + LOGGER.info("Session file does not exist") + if self.session_data.get("client_id"): + self.client_id = self.session_data.get("client_id") + self.params["clientId"] = self.client_id + else: + self.session_data.update({"client_id": self.client_id}) + self.params["clientId"] = self.client_id + + self.session = ICloudPySession(self) + self.session.verify = verify + self.session.headers.update( + {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"} + ) + + cookiejar_path = self.cookiejar_path + self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) + if path.exists(cookiejar_path): + try: + self.session.cookies.load(ignore_discard=True, ignore_expires=True) + LOGGER.debug("Read cookies from %s", cookiejar_path) + except: # pylint: disable=bare-except + # Most likely a pickled cookiejar from earlier versions. + # The cookiejar will get replaced with a valid one after + # successful authentication. + LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) + + self.authenticate() + + self._drive = None + self._work = None + self._files = None + self._photos = None + + def authenticate(self, force_refresh=False, service=None): + """ + Handles authentication, and persists cookies so that + subsequent logins will not cause additional e-mails from Apple. + """ + + login_successful = False + if self.session_data.get("session_token") and not force_refresh: + LOGGER.debug("Checking session token validity") + try: + self.data = self._validate_token() + login_successful = True + except ICloudPyAPIResponseException: + LOGGER.debug("Invalid authentication token, will log in from scratch.") + + if not login_successful and service is not None: + app = self.data["apps"][service] + if ( + "canLaunchWithOneFactor" in app + and app["canLaunchWithOneFactor"] is True + ): + LOGGER.debug( + "Authenticating as %s for %s", self.user["accountName"], service + ) + + try: + self._authenticate_with_credentials_service(service) + login_successful = True + except Exception as error: + LOGGER.debug( + "Could not log into service. Attempting brand new login. %s", + str(error), + ) + + if not login_successful: + LOGGER.debug("Authenticating as %s", self.user["accountName"]) + + data = dict(self.user) + + data["rememberMe"] = True + data["trustTokens"] = [] + if self.session_data.get("trust_token"): + data["trustTokens"] = [self.session_data.get("trust_token")] + + headers = self._get_auth_headers() + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + self.session.post( + f"{self.auth_endpoint}/signin", + params={"isRememberMeEnabled": "true"}, + data=json.dumps(data), + headers=headers, + ) + except ICloudPyAPIResponseException as error: + msg = "Invalid email/password combination." + raise ICloudPyFailedLoginException(msg, error) from error + + self._authenticate_with_token() + + self._webservices = self.data["webservices"] + + LOGGER.debug("Authentication completed successfully") + + def _authenticate_with_token(self): + """Authenticate using session token.""" + data = { + "accountCountryCode": self.session_data.get("account_country"), + "dsWebAuthToken": self.session_data.get("session_token"), + "extended_login": True, + "trustToken": self.session_data.get("trust_token", ""), + } + + try: + req = self.session.post( + f"{self.setup_endpoint}/accountLogin", data=json.dumps(data) + ) + self.data = req.json() + except ICloudPyAPIResponseException as error: + msg = "Invalid authentication token." + raise ICloudPyFailedLoginException(msg, error) from error + + def _authenticate_with_credentials_service(self, service): + """Authenticate to a specific service using credentials.""" + data = { + "appName": service, + "apple_id": self.user["accountName"], + "password": self.user["password"], + } + + try: + self.session.post( + f"{self.setup_endpoint}/accountLogin", data=json.dumps(data) + ) + + self.data = self._validate_token() + except ICloudPyAPIResponseException as error: + msg = "Invalid email/password combination." + raise ICloudPyFailedLoginException(msg, error) from error + + def _validate_token(self): + """Checks if the current access token is still valid.""" + LOGGER.debug("Checking session token validity") + try: + req = self.session.post(f"{self.setup_endpoint}/validate", data="null") + LOGGER.debug("Session token is still valid") + return req.json() + except ICloudPyAPIResponseException as err: + LOGGER.debug("Invalid authentication token") + raise err + + def _get_auth_headers(self, overrides=None): + headers = { + "Accept": "*/*", + "Content-Type": "application/json", + "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + "X-Apple-OAuth-Client-Type": "firstPartyAuth", + "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", + "X-Apple-OAuth-Require-Grant-Code": "true", + "X-Apple-OAuth-Response-Mode": "web_message", + "X-Apple-OAuth-Response-Type": "code", + "X-Apple-OAuth-State": self.client_id, + "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", + } + if overrides: + headers.update(overrides) + return headers + + @property + def cookiejar_path(self): + """Get path for cookiejar file.""" + return path.join( + self._cookie_directory, + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), + ) + + @property + def session_path(self): + """Get path for session data file.""" + return path.join( + self._cookie_directory, + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) + + ".session", + ) + + @property + def requires_2sa(self): + """Returns True if two-step authentication is required.""" + return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and ( + self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session + ) + + @property + def requires_2fa(self): + """Returns True if two-factor authentication is required.""" + return self.data["dsInfo"].get("hsaVersion", 0) == 2 and ( + self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session + ) + + @property + def is_trusted_session(self): + """Returns True if the session is trusted.""" + return self.data.get("hsaTrustedBrowser", False) + + @property + def trusted_devices(self): + """Returns devices trusted for two-step authentication.""" + request = self.session.get( + f"{self.setup_endpoint}/listDevices", params=self.params + ) + return request.json().get("devices") + + def send_verification_code(self, device): + """Requests that a verification code is sent to the given device.""" + data = json.dumps(device) + request = self.session.post( + f"{self.setup_endpoint}/sendVerificationCode", + params=self.params, + data=data, + ) + return request.json().get("success", False) + + def validate_verification_code(self, device, code): + """Verifies a verification code received on a trusted device.""" + device.update({"verificationCode": code, "trustBrowser": True}) + data = json.dumps(device) + + try: + self.session.post( + f"{self.setup_endpoint}/validateVerificationCode", + params=self.params, + data=data, + ) + except ICloudPyAPIResponseException as error: + if error.code == -21669: + # Wrong verification code + return False + raise + + self.trust_session() + + return not self.requires_2sa + + def validate_2fa_code(self, code): + """Verifies a verification code received via Apple's 2FA system (HSA2).""" + data = {"securityCode": {"code": code}} + + headers = self._get_auth_headers({"Accept": "application/json"}) + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + self.session.post( + f"{self.auth_endpoint}/verify/trusteddevice/securitycode", + data=json.dumps(data), + headers=headers, + ) + except ICloudPyAPIResponseException as error: + if error.code == -21669: + # Wrong verification code + LOGGER.error("Code verification failed.") + return False + raise + + LOGGER.debug("Code verification successful.") + + self.trust_session() + return not self.requires_2sa + + def trust_session(self): + """Request session trust to avoid user log in going forward.""" + headers = self._get_auth_headers() + + if self.session_data.get("scnt"): + headers["scnt"] = self.session_data.get("scnt") + + if self.session_data.get("session_id"): + headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + + try: + self.session.get( + f"{self.auth_endpoint}/2sv/trust", + headers=headers, + ) + self._authenticate_with_token() + return True + except ICloudPyAPIResponseException: + LOGGER.error("Session trust failed.") + return False + + def _get_webservice_url(self, ws_key): + """Get webservice URL, raise an exception if not exists.""" + if self._webservices.get(ws_key) is None: + raise ICloudPyServiceNotActivatedException( + "Webservice not available", ws_key + ) + return self._webservices[ws_key]["url"] + + @property + def devices(self): + """Returns all devices.""" + service_root = self._get_webservice_url("findme") + return FindMyiPhoneServiceManager( + service_root, self.session, self.params, self.with_family + ) + + @property + def iphone(self): + """Returns the iPhone.""" + return self.devices[0] + + @property + def drive(self): + """Gets the 'Drive' service.""" + if not self._drive: + self._drive = DriveService( + service_root=self._get_webservice_url("drivews"), + document_root=self._get_webservice_url("docws"), + session=self.session, + params=self.params, + ) + return self._drive + + @property + def work(self): + """Gets the 'Work' service.""" + if not self._work: + self._work = WorkService( + document_root=self._get_webservice_url("iworkexportws"), + session=self.session, + params=self.params, + client_id=self.client_id, + dsid=self.data["dsInfo"].get("dsid"), + ) + return self._work + + def __unicode__(self): + return f"iCloud API: {self.user.get('accountName')}" + + def __str__(self): + as_unicode = self.__unicode__() + if PY2: + return as_unicode.encode("utf-8", "ignore") + return as_unicode + + def __repr__(self): + return f"<{str(self)}>" diff --git a/icloudpy/cmdline.py b/icloudpy/cmdline.py new file mode 100644 index 0000000..230b84c --- /dev/null +++ b/icloudpy/cmdline.py @@ -0,0 +1,374 @@ +#! /usr/bin/env python +""" +A Command Line Wrapper to allow easy use of iCloudPy for +command line scripts, and related. +""" + +# from builtins import input +import argparse +import pickle +import sys + +from click import confirm + +from icloudpy import ICloudPyService, utils +from icloudpy.exceptions import ICloudPyFailedLoginException + +DEVICE_ERROR = "Please use the --device switch to indicate which device to use." + + +def create_pickled_data(idevice, filename): + """ + This helper will output the idevice to a pickled file named + after the passed filename. + + This allows the data to be used without resorting to screen / pipe + scrapping. + """ + pickle_file = open(filename, "wb") + pickle.dump(idevice.content, pickle_file, protocol=pickle.HIGHEST_PROTOCOL) + pickle_file.close() + + +def main(args=None): + """Main commandline entrypoint.""" + if args is None: + args = sys.argv[1:] + + parser = argparse.ArgumentParser(description="Find My iPhone CommandLine Tool") + + parser.add_argument( + "--username", + action="store", + dest="username", + default="", + help="Apple ID to Use", + ) + parser.add_argument( + "--password", + action="store", + dest="password", + default="", + help=( + "Apple ID Password to Use; if unspecified, password will be " + "fetched from the system keyring." + ), + ) + parser.add_argument( + "-n", + "--non-interactive", + action="store_false", + dest="interactive", + default=True, + help="Disable interactive prompts.", + ) + parser.add_argument( + "--delete-from-keyring", + action="store_true", + dest="delete_from_keyring", + default=False, + help="Delete stored password in system keyring for this username.", + ) + parser.add_argument( + "--list", + action="store_true", + dest="list", + default=False, + help="Short Listings for Device(s) associated with account", + ) + parser.add_argument( + "--llist", + action="store_true", + dest="longlist", + default=False, + help="Detailed Listings for Device(s) associated with account", + ) + parser.add_argument( + "--locate", + action="store_true", + dest="locate", + default=False, + help="Retrieve Location for the iDevice (non-exclusive).", + ) + + # Restrict actions to a specific devices UID / DID + parser.add_argument( + "--device", + action="store", + dest="device_id", + default=False, + help="Only effect this device", + ) + + # Trigger Sound Alert + parser.add_argument( + "--sound", + action="store_true", + dest="sound", + default=False, + help="Play a sound on the device", + ) + + # Trigger Message w/Sound Alert + parser.add_argument( + "--message", + action="store", + dest="message", + default=False, + help="Optional Text Message to display with a sound", + ) + + # Trigger Message (without Sound) Alert + parser.add_argument( + "--silentmessage", + action="store", + dest="silentmessage", + default=False, + help="Optional Text Message to display with no sounds", + ) + + # Lost Mode + parser.add_argument( + "--lostmode", + action="store_true", + dest="lostmode", + default=False, + help="Enable Lost mode for the device", + ) + parser.add_argument( + "--lostphone", + action="store", + dest="lost_phone", + default=False, + help="Phone Number allowed to call when lost mode is enabled", + ) + parser.add_argument( + "--lostpassword", + action="store", + dest="lost_password", + default=False, + help="Forcibly active this passcode on the idevice", + ) + parser.add_argument( + "--lostmessage", + action="store", + dest="lost_message", + default="", + help="Forcibly display this message when activating lost mode.", + ) + + # Output device data to an pickle file + parser.add_argument( + "--outputfile", + action="store_true", + dest="output_to_file", + default="", + help="Save device data to a file in the current directory.", + ) + + # Path to session directory + parser.add_argument( + "--session-directory", + action="store", + dest="session_directory", + default=None, + help="Path to save session information", + ) + + # Server region - global or china + parser.add_argument( + "--region", + action="store", + dest="region", + default="global", + help="Server region - global or china", + ) + + command_line = parser.parse_args(args) + + username = command_line.username + password = command_line.password + session_directory = command_line.session_directory + server_region = command_line.region + + if username and command_line.delete_from_keyring: + utils.delete_password_in_keyring(username) + + failure_count = 0 + while True: + # Which password we use is determined by your username, so we + # do need to check for this first and separately. + if not username: + parser.error("No username supplied") + + if not password: + password = utils.get_password( + username, interactive=command_line.interactive + ) + + if not password: + parser.error("No password supplied") + + try: + api = ( + ICloudPyService( + apple_id=username.strip(), + password=password.strip(), + cookie_directory=session_directory, + home_endpoint="https://www.icloud.com.cn", + setup_endpoint="https://setup.icloud.com.cn/setup/ws/1", + ) + if server_region == "china" + else ICloudPyService( + apple_id=username.strip(), + password=password.strip(), + cookie_directory=session_directory, + ) + ) + + if ( + not utils.password_exists_in_keyring(username) + and command_line.interactive + and confirm("Save password in keyring?") + ): + utils.store_password_in_keyring(username, password) + + if api.requires_2fa: + # fmt: off + print( + "\nTwo-step authentication required.", + "\nPlease enter validation code" + ) + # fmt: on + + code = input("(string) --> ") + if not api.validate_2fa_code(code): + print("Failed to verify verification code") + sys.exit(1) + + print("") + + elif api.requires_2sa: + # fmt: off + print( + "\nTwo-step authentication required.", + "\nYour trusted devices are:" + ) + # fmt: on + + devices = api.trusted_devices + for i, device in enumerate(devices): + print( + f' {i}: {device.get("deviceName", "SMS to " + device.get("phoneNumber"))}' + ) + + print("\nWhich device would you like to use?") + device = int(input("(number) --> ")) + device = devices[device] + if not api.send_verification_code(device): + print("Failed to send verification code") + sys.exit(1) + + print("\nPlease enter validation code") + code = input("(string) --> ") + if not api.validate_verification_code(device, code): + print("Failed to verify verification code") + sys.exit(1) + + print("") + break + except ICloudPyFailedLoginException as error: + # If they have a stored password; we just used it and + # it did not work; let's delete it if there is one. + if utils.password_exists_in_keyring(username): + utils.delete_password_in_keyring(username) + + message = f"Bad username or password for {username}" + password = None + + failure_count += 1 + if failure_count >= 3: + raise RuntimeError(message) from error + + print(message, file=sys.stderr) + + for dev in api.devices: + if not command_line.device_id or ( + command_line.device_id.strip().lower() == dev.content["id"].strip().lower() + ): + # List device(s) + if command_line.locate: + dev.location() + + if command_line.output_to_file: + create_pickled_data( + dev, + filename=(dev.content["name"].strip().lower() + ".fmip_snapshot"), + ) + + contents = dev.content + if command_line.longlist: + print("-" * 30) + print(contents["name"]) + for key in contents: + print(f"{key} - {contents[key]}") + elif command_line.list: + print("-" * 30) + print(f"Name - {contents['name']}") + print(f"Display Name - {contents['deviceDisplayName']}") + print(f"Location - {contents['location']}") + print(f"Battery Level - {contents['batteryLevel']}") + print(f"Battery Status- {contents['batteryStatus']}") + print(f"Device Class - {contents['deviceClass']}") + print(f"Device Model - {contents['deviceModel']}") + + # Play a Sound on a device + if command_line.sound: + if command_line.device_id: + dev.play_sound() + else: + raise RuntimeError( + f"\n\n\t\tSounds can only be played on a singular device. {DEVICE_ERROR}\n\n" + ) + + # Display a Message on the device + if command_line.message: + if command_line.device_id: + dev.display_message( + subject="A Message", message=command_line.message, sounds=True + ) + else: + raise RuntimeError( + f"Messages can only be played on a singular device. {DEVICE_ERROR}" + ) + + # Display a Silent Message on the device + if command_line.silentmessage: + if command_line.device_id: + dev.display_message( + subject="A Silent Message", + message=command_line.silentmessage, + sounds=False, + ) + else: + raise RuntimeError( + f"Silent Messages can only be played on a singular device. {DEVICE_ERROR}" + ) + + # Enable Lost mode + if command_line.lostmode: + if command_line.device_id: + dev.lost_device( + number=command_line.lost_phone.strip(), + text=command_line.lost_message.strip(), + newpasscode=command_line.lost_password.strip(), + ) + else: + raise RuntimeError( + f"Lost Mode can only be activated on a singular device. {DEVICE_ERROR}" + ) + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/icloudpy/exceptions.py b/icloudpy/exceptions.py new file mode 100644 index 0000000..81f8124 --- /dev/null +++ b/icloudpy/exceptions.py @@ -0,0 +1,47 @@ +"""Library exceptions.""" + + +class ICloudPyException(Exception): + """Generic iCloud exception.""" + + +# API +class ICloudPyAPIResponseException(ICloudPyException): + """iCloud response exception.""" + + def __init__(self, reason, code=None, retry=False): + self.reason = reason + self.code = code + message = reason or "" + if code: + message += f" ({code})" + if retry: + message += ". Retrying ..." + + super().__init__(message) + + +class ICloudPyServiceNotActivatedException(ICloudPyAPIResponseException): + """iCloud service not activated exception.""" + + +# Login +class ICloudPyFailedLoginException(ICloudPyException): + """iCloud failed login exception.""" + + +class ICloudPy2SARequiredException(ICloudPyException): + """iCloud 2SA required exception.""" + + def __init__(self, apple_id): + message = f"Two-step authentication required for account:{apple_id}" + super().__init__(message) + + +class ICloudPyNoStoredPasswordAvailableException(ICloudPyException): + """iCloud no stored password exception.""" + + +# Webservice specific +class ICloudPyNoDevicesException(ICloudPyException): + """iCloud no device exception.""" diff --git a/icloudpy/services/__init__.py b/icloudpy/services/__init__.py new file mode 100644 index 0000000..a712e8a --- /dev/null +++ b/icloudpy/services/__init__.py @@ -0,0 +1,3 @@ +from icloudpy.services.drive import DriveService # pylint: disable=unused-import +from icloudpy.services.work import WorkService # pylint: disable=unused-import + diff --git a/icloudpy/services/drive.py b/icloudpy/services/drive.py new file mode 100644 index 0000000..e31f3b1 --- /dev/null +++ b/icloudpy/services/drive.py @@ -0,0 +1,385 @@ +"""Drive service.""" +import io +import json +import mimetypes +import os +import time +from datetime import datetime, timedelta +from re import search + +from requests import Response +from six import PY2 + + +class DriveService: + """The 'Drive' iCloud service.""" + + def __init__(self, service_root, document_root, session, params): + self._service_root = service_root + self._document_root = document_root + self.session = session + self.params = dict(params) + self._root = None + + def _get_token_from_cookie(self): + for cookie in self.session.cookies: + if cookie.name == "X-APPLE-WEBAUTH-VALIDATE": + match = search(r"\bt=([^:]+)", cookie.value) + if match is None: + raise Exception(f"Can't extract token from {cookie.value}") + return {"token": match.group(1)} + raise Exception("Token cookie not found") + + def get_node_data(self, drivewsid): + """Returns the node data.""" + request = self.session.post( + self._service_root + "/retrieveItemDetailsInFolders", + params=self.params, + data=json.dumps( + [ + { + "drivewsid": drivewsid, + "partialData": False, + } + ] + ), + ) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + return request.json()[0] + + def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs): + """Returns iCloud Drive file.""" + file_params = dict(self.params) + file_params.update({"document_id": file_id}) + response = self.session.get( + self._document_root + f"/ws/{zone}/download/by_id", + params=file_params, + ) + if not response.ok: + self.session.raise_error(response.status_code, response.reason) + package_token = response.json().get("package_token") + data_token = response.json().get("data_token") + if data_token and data_token.get("url"): + return self.session.get(data_token["url"], params=self.params, **kwargs) + elif package_token and package_token.get("url"): + return self.session.get(package_token["url"], params=self.params, **kwargs) + else: + raise KeyError("'data_token' nor 'package_token' found in response.") + + def get_app_data(self): + """Returns the app library (previously ubiquity).""" + request = self.session.get( + self._service_root + "/retrieveAppLibraries", params=self.params + ) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + return request.json()["items"] + + def get_app_node(self, app_id, folder="documents"): + """Returns the node of the app (ubiquity)""" + return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder)) + + def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"): + """Get the contentWS endpoint URL to add a new file.""" + content_type = mimetypes.guess_type(file_object.name)[0] + if content_type is None: + content_type = "" + + # Get filesize from file object + orig_pos = file_object.tell() + file_object.seek(0, os.SEEK_END) + file_size = file_object.tell() + file_object.seek(orig_pos, os.SEEK_SET) + + file_params = self.params + file_params.update(self._get_token_from_cookie()) + + request = self.session.post( + self._document_root + f"/ws/{zone}/upload/web", + params=file_params, + headers={"Content-Type": "text/plain"}, + data=json.dumps( + { + "filename": file_object.name, + "type": "FILE", + "content_type": content_type, + "size": file_size, + } + ), + ) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + return (request.json()[0]["document_id"], request.json()[0]["url"]) + + def _update_contentws( + self, folder_id, sf_info, document_id, file_object, zone="com.apple.CloudDocs" + ): + data = { + "data": { + "signature": sf_info["fileChecksum"], + "wrapping_key": sf_info["wrappingKey"], + "reference_signature": sf_info["referenceChecksum"], + "size": sf_info["size"], + }, + "command": "add_file", + "create_short_guid": True, + "document_id": document_id, + "path": { + "starting_document_id": folder_id, + "path": os.path.basename(file_object.name), + }, + "allow_conflict": True, + "file_flags": { + "is_writable": True, + "is_executable": False, + "is_hidden": False, + }, + "mtime": int(time.time() * 1000), + "btime": int(time.time() * 1000), + } + + # Add the receipt if we have one. Will be absent for 0-sized files + if sf_info.get("receipt"): + data["data"].update({"receipt": sf_info["receipt"]}) + + request = self.session.post( + self._document_root + f"/ws/{zone}/update/documents", + params=self.params, + headers={"Content-Type": "text/plain"}, + data=json.dumps(data), + ) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + return request.json() + + def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"): + """Send new file to iCloud Drive.""" + document_id, content_url = self._get_upload_contentws_url(file_object, zone) + + request = self.session.post(content_url, files={file_object.name: file_object}) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + content_response = request.json()["singleFile"] + + self._update_contentws( + folder_id, content_response, document_id, file_object, zone + ) + + def create_folders(self, parent, name): + """Creates a new iCloud Drive folder""" + request = self.session.post( + self._service_root + "/createFolders", + params=self.params, + headers={"Content-Type": "text/plain"}, + data=json.dumps( + { + "destinationDrivewsId": parent, + "folders": [ + { + "clientId": self.params["clientId"], + "name": name, + } + ], + } + ), + ) + return request.json() + + def rename_items(self, node_id, etag, name): + """Renames an iCloud Drive node""" + request = self.session.post( + self._service_root + "/renameItems", + params=self.params, + data=json.dumps( + { + "items": [ + { + "drivewsid": node_id, + "etag": etag, + "name": name, + } + ], + } + ), + ) + return request.json() + + def move_items_to_trash(self, node_id, etag): + """Moves an iCloud Drive node to the trash bin""" + request = self.session.post( + self._service_root + "/moveItemsToTrash", + params=self.params, + data=json.dumps( + { + "items": [ + { + "drivewsid": node_id, + "etag": etag, + "clientId": self.params["clientId"], + } + ], + } + ), + ) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + return request.json() + + @property + def root(self): + """Returns the root node.""" + if not self._root: + self._root = DriveNode( + self, self.get_node_data("FOLDER::com.apple.CloudDocs::root") + ) + return self._root + + def __getattr__(self, attr): + return getattr(self.root, attr) + + def __getitem__(self, key): + return self.root[key] + + +class DriveNode: + """Drive node.""" + + def __init__(self, conn, data): + self.data = data + self.connection = conn + self._children = None + + @property + def name(self): + """Gets the node name.""" + if "extension" in self.data: + return f'{self.data["name"]}.{self.data["extension"]}' + return self.data["name"] + + @property + def type(self): + """Gets the node type.""" + node_type = self.data.get("type") + return node_type and node_type.lower() + + def get_children(self): + """Gets the node children.""" + if not self._children: + if "items" not in self.data: + self.data.update(self.connection.get_node_data(self.data["drivewsid"])) + if "items" not in self.data: + raise KeyError(f'No items in folder, status: {self.data["status"]}') + self._children = [ + DriveNode(self.connection, item_data) + for item_data in self.data["items"] + ] + return self._children + + @property + def size(self): + """Gets the node size.""" + size = self.data.get("size") # Folder does not have size + if not size: + return None + return int(size) + + @property + def date_created(self): + """Gets the node created date (in UTC).""" + return _date_to_utc(self.data.get("dateCreated")) + + @property + def date_changed(self): + """Gets the node changed date (in UTC).""" + return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date + + @property + def date_modified(self): + """Gets the node modified date (in UTC).""" + return _date_to_utc(self.data.get("dateModified")) # Folder does not have date + + @property + def date_last_open(self): + """Gets the node last open date (in UTC).""" + return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date + + def open(self, **kwargs): + """Gets the node file.""" + # iCloud returns 400 Bad Request for 0-byte files + if self.data["size"] == 0: + response = Response() + response.raw = io.BytesIO() + return response + return self.connection.get_file( + self.data["docwsid"], zone=self.data["zone"], **kwargs + ) + + def upload(self, file_object, **kwargs): + """ "Upload a new file.""" + return self.connection.send_file( + self.data["docwsid"], file_object, zone=self.data["zone"], **kwargs + ) + + def dir(self): + """Gets the node list of directories.""" + if self.type == "file": + return None + return [child.name for child in self.get_children()] + + def mkdir(self, folder): + """Create a new directory directory.""" + # remove cached entries information first so that it will be re-read on next get_children() + self._children = None + if "items" in self.data: + self.data.pop("items") + return self.connection.create_folders(self.data["drivewsid"], folder) + + def rename(self, name): + """Rename an iCloud Drive item.""" + return self.connection.rename_items( + self.data["drivewsid"], self.data["etag"], name + ) + + def delete(self): + """Delete an iCloud Drive item.""" + return self.connection.move_items_to_trash( + self.data["drivewsid"], self.data["etag"] + ) + + def get(self, name): + """Gets the node child.""" + if self.type == "file": + return None + return [child for child in self.get_children() if child.name == name][0] + + def __getitem__(self, key): + try: + return self.get(key) + except IndexError as error: + raise KeyError(f"No child named '{key}' exists") from error + + def __unicode__(self): + return f"{{type: {self.type}, name: {self.name}}}" + + def __str__(self): + as_unicode = self.__unicode__() + if PY2: + return as_unicode.encode("utf-8", "ignore") + return as_unicode + + def __repr__(self): + return f"<{type(self).__name__}: {str(self)}>" + + +def _date_to_utc(date): + if not date: + return None + # jump through hoops to return time in UTC rather than California time + match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date) + if not match: + # Already in UTC + return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") + base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S") + diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3))) + return base - diff diff --git a/icloudpy/services/work.py b/icloudpy/services/work.py new file mode 100644 index 0000000..4e9e828 --- /dev/null +++ b/icloudpy/services/work.py @@ -0,0 +1,92 @@ +"""Work service.""" +import io +import json +import mimetypes +import os +import time +import shutil + +from datetime import datetime, timedelta +from re import search + +from requests import Response +from six import PY2 +from src import LOGGER, config_parser + +class WorkService: + + def __init__(self, document_root, session, params, client_id, dsid): + + self._document_root = document_root + self.session = session + self.params = dict(params) + self.client_id = client_id + self.dsid = dsid + self._root = None + + def export_response(self, document_id, secret, zone): + file_params = dict(self.params) + file_params.update({"clientBuildNumber": "current"}) + file_params.update({"clientMasteringNumber": "Mcurrent"}) + file_params.update({"dsid": self.dsid}) + request = self.session.post( + self._document_root + f"/iw/export-ws/{self.dsid}/export_document", + params=file_params, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "primary": "primary", + "document_type": "numbers", + "format": "org.openxmlformats.spreadsheetml.sheet", + "locale": "en", + "encrypt_result": "N", + "secret": secret, + "document_id": document_id, + "zone": zone, + } + , + ) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + + return request.json()["job_id"] + + def check_job(self, job_id): + url = self._document_root + f"/iw/export-ws/{self.dsid}/check_export_status" + file_params = dict(self.params) + file_params.update({"build": "primary"}) + file_params.update({"clientBuildNumber": "current"}) + file_params.update({"clientMasteringNumber": "Mcurrent"}) + file_params.update({"job_id": job_id}) + file_params.update({"dsid": self.dsid}) + request = self.session.post( + url, + params=file_params, + headers={"Content-Type": "text/plain"}, + data={}, + ) + if not request.ok: + self.session.raise_error(request.status_code, request.reason) + if request.json()["job_status"] == "failure": + raise Exception(request.json()["job_status"]) + if request.json()["job_status"] == "success": + return True + else: + return False + + def download_file(self, job_id, destination_path, name): + url = self._document_root + f"/iw/export-ws/{self.dsid}/download_exported_document" + file_params = dict(self.params) + file_params.update({"build": "primary"}) + file_params.update({"file_name": name + f".xlsx"}) + file_params.update({"job_id": job_id}) + local_filename = os.path.join(destination_path, name + f".xlsx") + try: + response = self.session.get(url, params=file_params, stream=True) + with open(local_filename, 'wb') as out_file: + shutil.copyfileobj(response.raw, out_file) + except self.session.exceptions.RequestException as e: + raise Exception(f"Ошибка скачивания сконцентрированного файла {local_filename}: {str(e)}") + return local_filename + + + diff --git a/icloudpy/utils.py b/icloudpy/utils.py new file mode 100644 index 0000000..044cbb4 --- /dev/null +++ b/icloudpy/utils.py @@ -0,0 +1,63 @@ +"""Utils.""" +import getpass +from sys import stdout + +import keyring + +from .exceptions import ICloudPyNoStoredPasswordAvailableException + +KEYRING_SYSTEM = "icloudpy://icloud-password" + + +def get_password(username, interactive=stdout.isatty() if stdout else False): + """Get the password from a username.""" + try: + return get_password_from_keyring(username) + except ICloudPyNoStoredPasswordAvailableException: + if not interactive: + raise + + return getpass.getpass(f"Enter iCloud password for {username}: ") + + +def password_exists_in_keyring(username): + """Return true if the password of a username exists in the keyring.""" + try: + get_password_from_keyring(username) + except ICloudPyNoStoredPasswordAvailableException: + return False + + return True + + +def get_password_from_keyring(username): + """Get the password from a username.""" + result = keyring.get_password(KEYRING_SYSTEM, username) + if result is None: + raise ICloudPyNoStoredPasswordAvailableException( + f"No iCloudPy password for {username} could be found " + "in the system keychain. Use the `--store-in-keyring` " + "command-line option for storing a password for this " + "username." + ) + + return result + + +def store_password_in_keyring(username, password): + """Store the password of a username.""" + return keyring.set_password(KEYRING_SYSTEM, username, password) + + +def delete_password_in_keyring(username): + """Delete the password of a username.""" + return keyring.delete_password(KEYRING_SYSTEM, username) + + +def underscore_to_camelcase(word, initial_capital=False): + """Transform a word to camelCase.""" + words = [x.capitalize() or "_" for x in word.split("_")] + if not initial_capital: + words[0] = words[0].lower() + + return "".join(words) diff --git a/main.py b/main.py new file mode 100644 index 0000000..81db07c --- /dev/null +++ b/main.py @@ -0,0 +1,4 @@ +from src import sync + +if __name__ == "__main__": + sync.sync() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..eb21938 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,13 @@ +requests==2.32.3 +keyring==23.11.0 +keyrings.alt==4.2.0 +click==8.1.7 +six==1.16.0 +tzlocal==5.2 +pytz==2024.2 +certifi==2024.8.30 +future==1.0.0 +icloudpy==0.7.0 +ruamel.yaml==0.16.12 +python-magic==0.4.27 +libmagic==1.0 \ No newline at end of file diff --git a/secret.gif b/secret.gif new file mode 100644 index 0000000..b6f9565 Binary files /dev/null and b/secret.gif differ diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..be0593c --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,127 @@ +import logging +import os +import sys +import warnings + +from ruamel.yaml import YAML + +DEFAULT_ROOT_DESTINATION = "./icloud" +DEFAULT_DRIVE_DESTINATION = "drive" +DEFAULT_DRIVE_DESTINATION_EXPORT = "drive_export" +DEFAULT_RETRY_LOGIN_INTERVAL_SEC = 600 # 10 minutes +DEFAULT_SYNC_INTERVAL_SEC = 1800 # 30 minutes +DEFAULT_CONFIG_FILE_NAME = "config.yaml" +ENV_ICLOUD_PASSWORD_KEY = "ENV_ICLOUD_PASSWORD" +ENV_CONFIG_FILE_PATH_KEY = "ENV_CONFIG_FILE_PATH" +DEFAULT_LOGGER_LEVEL = "info" +DEFAULT_LOG_FILE_NAME = "icloud.log" +DEFAULT_CONFIG_FILE_PATH = os.path.join( + os.path.dirname(os.path.dirname(__file__)), DEFAULT_CONFIG_FILE_NAME +) +DEFAULT_COOKIE_DIRECTORY = "session_data" + +warnings.filterwarnings("ignore", category=DeprecationWarning) + + +def read_config(config_path=DEFAULT_CONFIG_FILE_PATH): + if not (config_path and os.path.exists(config_path)): + print(f"Файл конфигурации не найден по адресу {config_path}.") + return None + with open(file=config_path, encoding="utf-8") as config_file: + config = YAML().load(config_file) + config["app"]["credentials"]["username"] = ( + config["app"]["credentials"]["username"].strip() + if config["app"]["credentials"]["username"] is not None + else "" + ) + return config + + +def get_logger_config(config): + logger_config = {} + if "logger" not in config["app"]: + return None + config_app_logger = config["app"]["logger"] + logger_config["level"] = ( + config_app_logger["level"].strip().lower() + if "level" in config_app_logger + else DEFAULT_LOGGER_LEVEL + ) + logger_config["filename"] = ( + config_app_logger["filename"].strip().lower() + if "filename" in config_app_logger + else DEFAULT_LOG_FILE_NAME + ) + return logger_config + + +def log_handler_exists(logger, handler_type, **kwargs): + for handler in logger.handlers: + if isinstance(handler, handler_type): + if handler_type is logging.FileHandler: + if handler.baseFilename.endswith(kwargs["filename"]): + return True + elif handler_type is logging.StreamHandler: + if handler.stream is kwargs["stream"]: + return True + return False + + +class ColorfulConsoleFormatter(logging.Formatter): + + grey = "\x1b[38;21m" + blue = "\x1b[38;5;39m" + yellow = "\x1b[38;5;226m" + red = "\x1b[38;5;196m" + bold_red = "\x1b[31;1m" + reset = "\x1b[0m" + + def __init__(self, fmt): + super().__init__() + self.fmt = fmt + self.formats = { + logging.DEBUG: self.grey + self.fmt + self.reset, + logging.INFO: self.blue + self.fmt + self.reset, + logging.WARNING: self.yellow + self.fmt + self.reset, + logging.ERROR: self.red + self.fmt + self.reset, + logging.CRITICAL: self.bold_red + self.fmt + self.reset, + } + + def format(self, record): + log_fmt = self.formats.get(record.levelno) + formatter = logging.Formatter(log_fmt) + return formatter.format(record) + + +def get_logger(): + logger = logging.getLogger() + logger_config = get_logger_config(config=read_config()) + if logger_config: + level_name = logging.getLevelName(level=logger_config["level"].upper()) + logger.setLevel(level=level_name) + if not log_handler_exists( + logger=logger, + handler_type=logging.FileHandler, + filename=logger_config["filename"], + ): + file_handler = logging.FileHandler(logger_config["filename"]) + file_handler.setFormatter( + logging.Formatter( + "%(asctime)s :: %(levelname)s :: %(name)s :: %(filename)s :: %(lineno)d :: %(message)s" + ) + ) + logger.addHandler(file_handler) + if not log_handler_exists( + logger=logger, handler_type=logging.StreamHandler, stream=sys.stdout + ): + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter( + ColorfulConsoleFormatter( + "%(asctime)s :: %(levelname)s :: %(name)s :: %(filename)s :: %(lineno)d :: %(message)s" + ) + ) + logger.addHandler(console_handler) + return logger + + +LOGGER = get_logger() diff --git a/src/config_parser.py b/src/config_parser.py new file mode 100644 index 0000000..4fa583d --- /dev/null +++ b/src/config_parser.py @@ -0,0 +1,159 @@ +import os + +from src import ( + DEFAULT_DRIVE_DESTINATION, + DEFAULT_DRIVE_DESTINATION_EXPORT, + DEFAULT_RETRY_LOGIN_INTERVAL_SEC, + DEFAULT_ROOT_DESTINATION, + DEFAULT_SYNC_INTERVAL_SEC, + LOGGER, +) + +def config_path_to_string(config_path): + return " > ".join(config_path) + +def traverse_config_path(config, config_path: list[str]) -> bool: + if len(config_path) == 0: + return True + if not (config and config_path[0] in config): + return False + return traverse_config_path(config[config_path[0]], config_path=config_path[1:]) + +def get_config_value(config, config_path): + if len(config_path) == 1: + return config[config_path[0]] + return get_config_value(config=config[config_path[0]], config_path=config_path[1:]) + +def get_username(config): + username = None + config_path = ["app", "credentials", "username"] + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.error( + f"username отсутствует в {config_path_to_string(config_path)}. Пожалуйста, установите имя пользователя." + ) + else: + username = get_config_value(config=config, config_path=config_path) + username = username.strip() + if len(username) == 0: + username = None + LOGGER.error(f"username пустое в {config_path_to_string(config_path)}.") + return username + +def get_retry_login_interval(config): + retry_login_interval = DEFAULT_RETRY_LOGIN_INTERVAL_SEC + config_path = ["app", "credentials", "retry_login_interval"] + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.warning( + f"retry_login_interval не найден в {config_path_to_string(config_path=config_path)}." + + f" Использование по умолчанию {retry_login_interval} секунд ..." + ) + else: + retry_login_interval = get_config_value(config=config, config_path=config_path) + LOGGER.info(f"Повторная попытка входа каждые {retry_login_interval} секунд.") + return retry_login_interval + +def get_drive_sync_interval(config): + sync_interval = DEFAULT_SYNC_INTERVAL_SEC + config_path = ["drive", "sync_interval"] + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.warning( + f"sync_interval не найден в {config_path_to_string(config_path=config_path)}." + + f" Использование sync_interval по умолчанию: {sync_interval} секунд ..." + ) + else: + sync_interval = get_config_value(config=config, config_path=config_path) + return sync_interval + +def prepare_root_destination(config): + LOGGER.debug("Проверка root ...") + root_destination = DEFAULT_ROOT_DESTINATION + config_path = ["app", "root"] + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.warning( + f"Предупреждение: root отсутствует в {config_path_to_string(config_path)}." + + f" Использование root по умолчанию: {root_destination}", + ) + else: + root_destination = get_config_value(config=config, config_path=config_path) + root_destination_path = os.path.abspath(root_destination) + os.makedirs(root_destination_path, exist_ok=True) + return root_destination_path + +def prepare_drive_destination(config): + LOGGER.debug("Проверка пути сохранения фалов...") + config_path = ["drive", "destination"] + drive_destination = DEFAULT_DRIVE_DESTINATION + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.warning( + f"Внимание: путь сохранения фалов отсутствует в {config_path_to_string(config_path)}." + + f" Использование путь сохранения фалов по умолчанию: {drive_destination}." + ) + else: + drive_destination = get_config_value(config=config, config_path=config_path) + drive_destination_path = os.path.abspath( + os.path.join(prepare_root_destination(config=config), drive_destination) + ) + os.makedirs(drive_destination_path, exist_ok=True) + return drive_destination_path + +def prepare_drive_destination_export(config): + LOGGER.debug("Проверка пути конвертации фалов ...") + config_path = ["drive", "destination_export"] + drive_destination_export = DEFAULT_DRIVE_DESTINATION_EXPORT + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.warning( + f"Внимание: путь конвертации фалов отсутствует в {config_path_to_string(config_path)}." + + f" Использование путь конвертации фалов по умолчанию: {drive_destination_export}." + ) + else: + drive_destination_export = get_config_value(config=config, config_path=config_path) + drive_destination_export_path = os.path.abspath( + os.path.join(prepare_root_destination(config=config), drive_destination_export) + ) + os.makedirs(drive_destination_export_path, exist_ok=True) + return drive_destination_export_path + + +def get_drive_remove_obsolete(config): + drive_remove_obsolete = False + config_path = ["drive", "remove_obsolete"] + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.warning( + f"Предупреждение: remove_obsolete не найден в {config_path_to_string(config_path)}." + + " Используется параметр не удалять устаревшие файлы и папки." + ) + else: + drive_remove_obsolete = get_config_value(config=config, config_path=config_path) + return drive_remove_obsolete + +def get_region(config): + region = "global" + config_path = ["app", "region"] + if not traverse_config_path(config=config, config_path=config_path): + LOGGER.warning( + f"{config_path_to_string(config_path=config_path)} не найдено. Использует значение по умолчанию - global ..." + ) + else: + region = get_config_value(config=config, config_path=config_path) + if region not in ["global", "china"]: + LOGGER.error( + f"{config_path_to_string(config_path=config_path)} недействительно. \ + Допустимые значения - global или china. Использование значения по умолчанию - global ..." + ) + region = "global" + + return region + + + + + + + + + + + + + + diff --git a/src/sync.py b/src/sync.py new file mode 100644 index 0000000..43dcc7d --- /dev/null +++ b/src/sync.py @@ -0,0 +1,126 @@ +import datetime +import os +from time import sleep + +from icloudpy import ICloudPyService, exceptions, utils + +from src import ( + DEFAULT_CONFIG_FILE_PATH, + DEFAULT_COOKIE_DIRECTORY, + ENV_CONFIG_FILE_PATH_KEY, + ENV_ICLOUD_PASSWORD_KEY, + LOGGER, + config_parser, + read_config, + sync_drive +) + + +def get_api_instance( + username, + password, + cookie_directory=DEFAULT_COOKIE_DIRECTORY, + server_region="global", +): + return ( + ICloudPyService( + apple_id=username, + password=password, + cookie_directory=cookie_directory, + home_endpoint="https://www.icloud.com.cn", + setup_endpoint="https://setup.icloud.com.cn/setup/ws/1", + ) + if server_region == "china" + else ICloudPyService( + apple_id=username, + password=password, + cookie_directory=cookie_directory, + ) + ) + +def sync(): + last_send = None + enable_sync_drive = True + drive_sync_interval = 0 + sleep_for = 10 + while True: + config = read_config( + config_path=os.environ.get( + ENV_CONFIG_FILE_PATH_KEY, DEFAULT_CONFIG_FILE_PATH + ) + ) + username = config_parser.get_username(config=config) + if username: + try: + if ENV_ICLOUD_PASSWORD_KEY in os.environ: + password = os.environ.get(ENV_ICLOUD_PASSWORD_KEY) + utils.store_password_in_keyring( + username=username, password=password + ) + else: + password = utils.get_password_from_keyring(username=username) + server_region = config_parser.get_region(config=config) + api = get_api_instance( + username=username, password=password, server_region=server_region + ) + + + if not api.requires_2sa: + + if "drive" in config and enable_sync_drive: + LOGGER.info("Синхронизация drive...") + vvvv = sync_drive.sync_drive(config=config, drive=api.drive, work=api.work) + LOGGER.info("Drive синхронизирован") + my_file = open("last_update.txt", "w") + my_file.write(datetime.datetime.now().isoformat()) + my_file.close() + drive_sync_interval = config_parser.get_drive_sync_interval( + config=config + ) + + if "drive" not in config: + LOGGER.warning( + "Нечего синхронизировать. Добавьте раздел drive в файл config.yaml." + ) + else: + LOGGER.error("Ошибка: требуется 2FA. Пожалуйста, войдите в систему.") + + sleep_for = config_parser.get_retry_login_interval(config=config) + next_sync = ( + datetime.datetime.now() + datetime.timedelta(seconds=sleep_for) + ).strftime("%c") + if sleep_for < 0: + LOGGER.info("retry_login_interval is < 0, exiting ...") + break + LOGGER.info(f"Повторная попытка входа в {next_sync} ...") + sleep(sleep_for) + continue + except exceptions.ICloudPyNoStoredPasswordAvailableException: + LOGGER.error( + "Пароль не сохранен в связке ключей. Пожалуйста, сохраните пароль в связке ключей." + ) + sleep_for = config_parser.get_retry_login_interval(config=config) + next_sync = ( + datetime.datetime.now() + datetime.timedelta(seconds=sleep_for) + ).strftime("%c") + LOGGER.info(f"Повторная попытка входа в систему {next_sync} ...") + + sleep(sleep_for) + continue + + if "drive" in config: + sleep_for = drive_sync_interval + enable_sync_drive = True + enable_sync_photos = False + + next_sync = ( + datetime.datetime.now() + datetime.timedelta(seconds=sleep_for) + ).strftime("%c") + LOGGER.info(f"Повторная синхронизация в {next_sync} ...") + if ( + config_parser.get_drive_sync_interval(config=config) < 0 + if "drive" in config + else True and config_parser.get_photos_sync_interval(config=config) < 0 + ): + break + sleep(sleep_for) diff --git a/src/sync_drive.py b/src/sync_drive.py new file mode 100644 index 0000000..961d0b5 --- /dev/null +++ b/src/sync_drive.py @@ -0,0 +1,353 @@ +import gzip +import os +import re +import time +import unicodedata +import zipfile +from pathlib import Path, PurePath +from shutil import copyfileobj, rmtree +from time import sleep +import json +from icloudpy import exceptions + +from src import LOGGER, config_parser + + +def wanted_file(filters, ignore, file_path): + if not file_path: + return False + if ignore: + if ignored_path(ignore, file_path): + LOGGER.debug(f"Пропуск ненужного файла {file_path}") + return False + if not filters or len(filters) == 0: + return True + for file_extension in filters: + if re.search(f"{file_extension}$", file_path, re.IGNORECASE): + return True + LOGGER.debug(f"Пропуск ненужного файла {file_path}") + return False + +def wanted_file_name(filters, item): + if not filters or len(filters) == 0: + return True + for file_name in filters: + if item.data['name'] == file_name: + return True + LOGGER.debug(f"Пропуск ненужного файла {file_path}") + return False + + +def wanted_folder(filters, ignore, root, folder_path): + if ignore: + if ignored_path(ignore, folder_path): + return False + + if not filters or not folder_path or not root or len(filters) == 0: + return True + folder_path = Path(folder_path) + for folder in filters: + child_path = Path( + os.path.join( + os.path.abspath(root), str(folder).removeprefix("/").removesuffix("/") + ) + ) + if ( + folder_path in child_path.parents + or child_path in folder_path.parents + or folder_path == child_path + ): + return True + return False + + +def ignored_path(ignore_list, path): + for ignore in ignore_list: + if PurePath(path).match(ignore + "*" if ignore.endswith("/") else ignore): + return True + return False + +def wanted_parent_folder(filters, ignore, root, folder_path): + if not filters or not folder_path or not root or len(filters) == 0: + return True + folder_path = Path(folder_path) + for folder in filters: + child_path = Path( + os.path.join( + os.path.abspath(root), folder.removeprefix("/").removesuffix("/") + ) + ) + if child_path in folder_path.parents or folder_path == child_path: + return True + return False + +def process_folder(item, destination_path, filters, ignore, root): + if not (item and destination_path and root): + return None + new_directory = os.path.join(destination_path, item.name) + new_directory_norm = unicodedata.normalize("NFC", new_directory) + if not wanted_folder( + filters=filters, ignore=ignore, folder_path=new_directory_norm, root=root + ): + LOGGER.debug(f"Пропуск ненужной папки {new_directory} ...") + return None + os.makedirs(new_directory_norm, exist_ok=True) + return new_directory + + +def package_exists(item, local_package_path): + if item and local_package_path and os.path.isdir(local_package_path): + local_package_modified_time = int(os.path.getmtime(local_package_path)) + remote_package_modified_time = int(item.date_modified.timestamp()) + local_package_size = sum( + f.stat().st_size + for f in Path(local_package_path).glob("**/*") + if f.is_file() + ) + remote_package_size = item.size + if ( + local_package_modified_time == remote_package_modified_time + and local_package_size == remote_package_size + ): + LOGGER.debug( + f"Изменений не обнаружено. Пропуск пакета {local_package_path} ..." + ) + return True + else: + LOGGER.info( + f"Обнаружены изменения: local_modified_time равно {local_package_modified_time}, " + + f"remote_modified_time равно {remote_package_modified_time}, " + + f"local_package_size равен {local_package_size} и remote_package_size равен {remote_package_size}." + ) + rmtree(local_package_path) + else: + LOGGER.debug(f"Пакет {local_package_path} локально не существует.") + return False + + +def file_exists(item, local_file): + if item and local_file and os.path.isfile(local_file): + local_file_modified_time = int(os.path.getmtime(local_file)) + remote_file_modified_time = int(item.date_modified.timestamp()) + local_file_size = os.path.getsize(local_file) + remote_file_size = item.size + + if local_file_modified_time == remote_file_modified_time: + LOGGER.debug(f"Изменений не обнаружено. Файл {local_file} пропущен...") + return True + else: + LOGGER.debug( + f"Обнаружены изменения: local_modified_time равно {local_file_modified_time}, " + + f"remote_modified_time равно {remote_file_modified_time}, " + + f"local_file_size равен {local_file_size} и remote_file_size равен {remote_file_size}." + ) + else: + LOGGER.debug(f"Файл {local_file} локально не существует.") + return False + + +def process_package(local_file): + return local_file + + +def is_package(item): + file_is_a_package = False + with item.open(stream=True) as response: + file_is_a_package = response.url and "/packageDownload?" in response.url + return file_is_a_package + + +def download_file(item, local_file): + if not (item and local_file): + return False + LOGGER.info(f"Загрузка {local_file} ...") + try: + with item.open(stream=True) as response: + with open(local_file, "wb") as file_out: + for chunk in response.iter_content(4 * 1024 * 1024): + file_out.write(chunk) + if response.url and "/packageDownload?" in response.url: + local_file = process_package(local_file=local_file) + item_modified_time = time.mktime(item.date_modified.timetuple()) + os.utime(local_file, (item_modified_time, item_modified_time)) + except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e: + LOGGER.error(f"Ошибка скачивания {local_file}: {str(e)}") + return False + return local_file + + +def process_file(item, destination_path, destination_path_export, filters, filters_name, ignore, files, work, convert): + if not (item and destination_path and files is not None): + return False + local_file = os.path.join(destination_path, item.name) + local_file = unicodedata.normalize("NFC", local_file) + if not wanted_file(filters=filters, ignore=ignore, file_path=local_file): + return False + if not wanted_file_name(filters=filters_name, item=item): + return False + files.add(local_file) + item_is_package = is_package(item=item) + if not item_is_package: + if package_exists(item=item, local_package_path=local_file): + for f in Path(local_file).glob("**/*"): + files.add(str(f)) + return False + elif file_exists(item=item, local_file=local_file): + return False + local_file = download_file(item=item, local_file=local_file) + if item_is_package: + for f in Path(local_file).glob("**/*"): + f = str(f) + f_normalized = unicodedata.normalize("NFD", f) + if os.path.exists(f): + os.rename(f, f_normalized) + files.add(f_normalized) + for convert_file in convert: + if item.data['name'] == convert_file['name']: + if item.data['extension'] == "numbers": + LOGGER.info(f"Конвертация в xlsx {local_file} ...") + secret = json.dumps({"Type":"wp","Data":convert_file['secret']}) + job_id = work.export_response(item.data['docwsid'], secret, item.data['zone']) + try: + while not work.check_job(job_id): + sleep(5) + work.download_file(job_id, destination_path_export, item.data['name']) + local_export_filename = os.path.join(destination_path_export, item.data['name'] + f".xlsx") + LOGGER.info(f"Сконвертированый файл успешно загружен {local_export_filename} ...") + except Exception as e: + LOGGER.error(f"Ошибка конвертации файла {local_file}: {str(e)}") + return True + + +def remove_obsolete(destination_path, files): + removed_paths = set() + if not (destination_path and files is not None): + return removed_paths + for path in Path(destination_path).rglob("*"): + local_file = str(path.absolute()) + if local_file not in files: + LOGGER.info(f"Удаление {local_file} ...") + if path.is_file(): + path.unlink(missing_ok=True) + removed_paths.add(local_file) + elif path.is_dir(): + rmtree(local_file) + removed_paths.add(local_file) + return removed_paths + + +def sync_directory( + drive, + work, + destination_path, + destination_path_export, + items, + root, + top=True, + filters=None, + convert=None, + ignore=None, + remove=False, + +): + files = set() + if drive and destination_path and items and root: + for i in items: + item = drive[i] + if item.type in ("folder", "app_library"): + new_folder = process_folder( + item=item, + destination_path=destination_path, + filters=filters["folders"] + if filters and "folders" in filters + else None, + ignore=ignore, + root=root, + ) + + new_folder_export = process_folder( + item=item, + destination_path=destination_path_export, + filters=filters["folders"] + if filters and "folders" in filters + else None, + ignore=ignore, + root=root, + ) + + if not new_folder: + continue + try: + files.add(unicodedata.normalize("NFC", new_folder)) + files.update( + sync_directory( + drive=item, + work=work, + destination_path=new_folder, + destination_path_export=new_folder_export, + items=item.dir(), + root=root, + top=False, + filters=filters, + convert=convert, + ignore=ignore, + + ) + ) + except Exception: + pass + elif item.type == "file": + if wanted_parent_folder( + filters=filters["folders"] + if filters and "folders" in filters + else None, + ignore=ignore, + root=root, + folder_path=destination_path, + ): + try: + process_file( + item=item, + destination_path=destination_path, + destination_path_export=destination_path_export, + filters=filters["file_extensions"] + if filters and "file_extensions" in filters + else None, + filters_name=filters["file_name"] + if filters and "file_name" in filters + else None, + ignore=ignore, + files=files, + work=work, + convert=convert, + ) + except Exception: + pass + if top and remove: + remove_obsolete(destination_path=destination_path, files=files) + return files + + +def sync_drive(config, drive, work): + destination_path = config_parser.prepare_drive_destination(config=config) + destination_path_export = config_parser.prepare_drive_destination_export(config=config) + return sync_directory( + drive=drive, + work=work, + destination_path=destination_path, + destination_path_export=destination_path_export, + root=destination_path, + items=drive.dir(), + top=True, + filters=config["drive"]["filters"] + if "drive" in config and "filters" in config["drive"] + else None, + convert=config["drive"]["convert"] + if "drive" in config and "convert" in config["drive"] + else None, + ignore=config["drive"]["ignore"] + if "drive" in config and "ignore" in config["drive"] + else None, + remove=config_parser.get_drive_remove_obsolete(config=config), + + )