Первая публикация

main
Ivan Komarov 2024-12-17 14:40:56 +04:00
commit 722be30443
19 changed files with 2618 additions and 0 deletions

143
.gitignore vendored Normal file
View File

@ -0,0 +1,143 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# Other
src/drive
allure-results
allure-report
.history
ignore-config.yaml
gh-pages
*.pub
session_data/
session_data_original/
icloud/
last_update.txt

5
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"cSpell.words": [
"Сконвертированый"
]
}

94
README.md Normal file
View File

@ -0,0 +1,94 @@
# iCloud Drive Sync
Скрипт для синхронизации фалов из iCloud Drive в локальное хранилище. Скрипт также поддерживает конвертацию фалов `.numbers` в формат `.xlsx`
## Требования
- Python 3.9, 3.10, 3.11, 3.12 и 3.13
Протестировано на Python 3.12.3
## Установка
Для установки всех необходимых зависимостей для работы скрипта нужно выполнить следующею команду:
```sh
pip install -r requirements.txt
```
## Настройка
Все настройки хранятся в файле `config.yaml`
```yaml
app:
logger:
level: "info" # Тип логирование в консоль доступно: DEBUG, INFO, WARNING, ERROR, CRITICAL
filename: "icloud.log" # Файл где, будут записываться логи
credentials:
username: "ki@atri-energo.ru" # Имя пользователя в ICloud
retry_login_interval: 900 # Интервал повторной попытки авторизации
root: "icloud" # Корневой каталог
region: global # Регион авторизации в ICloud, доступно global, china
drive:
destination: "drive" # Каталог в котором будут сохраняться синхронизированные фалы
destination_export: "drive_export" # Каталог в котором будут сохраняться конвертированные фалы
remove_obsolete: false # Удалять ли локальные файлы, которые больше не доступны в ICloud
sync_interval: 300 # Интервал синхронизации
filters:
file_name:
# Список файлов которые необходимо синхронизировать
- "Пустой"
- "Копия Тех_2022_осн директория"
folders:
# Список каталогов которые необходимо синхронизировать
- "folder1"
- "folder2"
- "folder3"
file_extensions:
#Список расширений файлов которые необходимо синхронизировать
- "pdf"
- "png"
- "jpg"
- "jpeg"
ignore:
# Список игнорируемых каталогов и файлов
# При указании путей к папкам добавляйте /*
- "node_modules/*"
- "*.md"
# Список конвертируемых файлов из формата numbers в формат xlsx
convert:
- name: "Пустой" # Имя файла
secret: "AzkiMDEYAetglYTt75QvN+hABQhpixoT3UWVxy3ELL8=:XQnMIaEj5Ov4oBqhpxAGIA==:DVRgHa1LjxlWnwYqdlN4VA==" # Секретный ключ файла, как его получить смотрите ниже
```
## Авторизация
Для того чтобы авторизоваться необходимо в первую очередь выполнить следующую команду
```sh
icloud --username=mail@example.com --session-directory="C:\icloud\session_data"
```
- `mail@example.com` это почта (имя пользователя) для авторизации в ICloud
- `C:\icloud\session_data` это полный путь до каталога session_data в корне каталога скрипта, где будет храниться сессия авторизации
Во время выполнения вам будет предложено ввести пароль от аккаунта ICloud и сохранить ключ авторизации в хранилище `keyring` нужно согласиться.
Затем, если включена двухфакторная авторизация, то попросит ввести код из SMS или устройства.
> Внимание: В последнее время SMS на Россииские номера приходя через раз или вообще могут не прийти, поэтому стоит повторить попытку. Как вариант можно дойти до этого шага и попробовать авторизоваться в ICloud через браузер, дойти до шага ввода кода и если он пришел то ввести его не в браузер в консоль скрипта.
> Внимание: Из-за политики Apple авторизация может слететь через пару месяцев, в этом случае необходимо выполнить команду авторизации заново.
## Запуск скрипта
Если авторизация прошла успешно и файл конфигурации заполнен, то можно выполнить команду запуска скрипта:
```sh
py main.py
```
Данная команда синхронизирует необходимы каталоги и файлы, и при необходимости производит конвертацию файлов конвертацию фалов `.numbers` в формат `.xlsx` результат можно увидеть в указанных в файле конфигурации каталогов по умолчанию в /icloud/drive и /icloud/drive_export соответственно.
## Получение секретного ключа для конвертации
![](https://git.ae-work.ru/darkeum/bars-icloud/raw/commit/6813ae02265db16e71724844acf2828925cbcacd/secret.gif)

26
config.yaml Normal file
View File

@ -0,0 +1,26 @@
app:
logger:
level: "info"
filename: "icloud.log"
credentials:
username: "ki@atri-energo.ru"
retry_login_interval: 900
root: "icloud"
region: global
drive:
destination: "drive"
destination_export: "drive_export"
remove_obsolete: false
sync_interval: 300
filters:
file_name:
- "Пустой"
- "Копия Тех_2022_осн директория"
convert:
-
name: "Пустой"
secret: "AzkiMDEYAetglYTt75QvN+hABQhpixoT3UWVxy3ELL8=:XQnMIaEj5Ov4oBqhpxAGIA==:DVRgHa1LjxlWnwYqdlN4VA=="
-
name: "Копия Тех_2022_осн директория"
secret: "ksbIPXw/yn5FX3CzY3x6AkcWYkk0ooazj1PMcVo9xYw=:qVfmzdZWtRoUBtjMw3MY2g==:Wevg/VFPzNyYoLqLfBCB3Q=="

6
icloudpy/__init__.py Normal file
View File

@ -0,0 +1,6 @@
"""The iCloudPy library."""
import logging
from icloudpy.base import ICloudPyService # pylint: disable=unused-import
logging.getLogger(__name__).addHandler(logging.NullHandler())

598
icloudpy/base.py Normal file
View File

@ -0,0 +1,598 @@
"""Library base file."""
import getpass
import http.cookiejar as cookielib
import inspect
import json
import logging
from os import mkdir, path
from re import match
from tempfile import gettempdir
from uuid import uuid1
from requests import Session
from six import PY2
from icloudpy.exceptions import (
ICloudPy2SARequiredException,
ICloudPyAPIResponseException,
ICloudPyFailedLoginException,
ICloudPyServiceNotActivatedException,
)
from icloudpy.services import (
DriveService,
WorkService
)
from icloudpy.utils import get_password_from_keyring
LOGGER = logging.getLogger(__name__)
HEADER_DATA = {
"X-Apple-ID-Account-Country": "account_country",
"X-Apple-ID-Session-Id": "session_id",
"X-Apple-Session-Token": "session_token",
"X-Apple-TwoSV-Trust-Token": "trust_token",
"scnt": "scnt",
}
class ICloudPyPasswordFilter(logging.Filter):
"""Password log hider."""
def __init__(self, password):
super().__init__(password)
def filter(self, record):
message = record.getMessage()
if self.name in message:
record.msg = message.replace(self.name, "*" * 8)
record.args = []
return True
class ICloudPySession(Session):
"""iCloud session."""
def __init__(self, service):
self.service = service
Session.__init__(self)
def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
# Charge logging to the right service endpoint
callee = inspect.stack()[2]
module = inspect.getmodule(callee[0])
request_logger = logging.getLogger(module.__name__).getChild("http")
if self.service.password_filter not in request_logger.filters:
request_logger.addFilter(self.service.password_filter)
request_logger.debug(f"{method} {url} {kwargs.get('data', '')}")
has_retried = kwargs.get("retried")
kwargs.pop("retried", None)
response = super().request(method, url, **kwargs)
content_type = response.headers.get("Content-Type", "").split(";")[0]
json_mimetypes = ["application/json", "text/json"]
for header, value in HEADER_DATA.items():
if response.headers.get(header):
session_arg = value
self.service.session_data.update(
{session_arg: response.headers.get(header)}
)
# Save session_data to file
with open(self.service.session_path, "w", encoding="utf-8") as outfile:
json.dump(self.service.session_data, outfile)
LOGGER.debug("Saved session data to file")
# Save cookies to file
if kwargs.get('save_cookie', True):
self.cookies.save(ignore_discard=True, ignore_expires=True)
LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
# print(response.text)
if not response.ok and (
content_type not in json_mimetypes
or response.status_code in [421, 450, 500]
):
try:
# pylint: disable=W0212
fmip_url = self.service._get_webservice_url("findme")
if (
has_retried is None
and response.status_code == 450
and fmip_url in url
):
# Handle re-authentication for Find My iPhone
LOGGER.debug("Re-authenticating Find My iPhone service")
try:
self.service.authenticate(True, "find")
except ICloudPyAPIResponseException:
LOGGER.debug("Re-authentication failed")
kwargs["retried"] = True
return self.request(method, url, **kwargs)
except Exception:
pass
if has_retried is None and response.status_code in [421, 450, 500]:
api_error = ICloudPyAPIResponseException(
response.reason, response.status_code, retry=True
)
request_logger.debug(api_error)
kwargs["retried"] = True
return self.request(method, url, **kwargs)
self._raise_error(response.status_code, response.reason)
if content_type not in json_mimetypes:
return response
try:
data = response.json()
except: # pylint: disable=bare-except
request_logger.warning("Failed to parse response with JSON mimetype")
return response
request_logger.debug(data)
if isinstance(data, dict):
reason = data.get("errorMessage")
reason = reason or data.get("reason")
reason = reason or data.get("errorReason")
if not reason and isinstance(data.get("error"), str):
reason = data.get("error")
if not reason and data.get("error"):
reason = "Unknown reason"
code = data.get("errorCode")
if not code and data.get("serverErrorCode"):
code = data.get("serverErrorCode")
if reason:
self._raise_error(code, reason)
return response
def _raise_error(self, code, reason):
if (
self.service.requires_2sa
and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie"
):
raise ICloudPy2SARequiredException(self.service.user["apple_id"])
if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"):
reason = (
reason + ". Please log into https://icloud.com/ to manually "
"finish setting up your iCloud service"
)
api_error = ICloudPyServiceNotActivatedException(reason, code)
LOGGER.error(api_error)
raise api_error
if code == "ACCESS_DENIED":
reason = (
reason + ". Please wait a few minutes then try again."
"The remote servers might be trying to throttle requests."
)
if code in [421, 450, 500]:
reason = "Authentication required for Account."
api_error = ICloudPyAPIResponseException(reason, code)
print(api_error)
LOGGER.error(api_error)
raise api_error
# Public method to resolve linting error
def raise_error(self, code, reason):
return self._raise_error(code=code, reason=reason)
class ICloudPyService:
"""
A base authentication class for the iCloud service. Handles the
authentication required to access iCloud services.
Usage:
from src import ICloudPyService
icloudpy = ICloudPyService('username@apple.com', 'password')
icloudpy.iphone.location()
"""
def __init__(
self,
apple_id,
password=None,
cookie_directory=None,
verify=True,
client_id=None,
with_family=True,
auth_endpoint="https://idmsa.apple.com/appleauth/auth",
# For China, use "https://www.icloud.com.cn"
home_endpoint="https://www.icloud.com",
# For China, use "https://setup.icloud.com.cn/setup/ws/1"
setup_endpoint="https://setup.icloud.com/setup/ws/1",
):
if password is None:
password = get_password_from_keyring(apple_id)
self.user = {"accountName": apple_id, "password": password}
self.data = {}
self.params = {}
self.client_id = client_id or (f"auth-{str(uuid1()).lower()}")
self.with_family = with_family
self.auth_endpoint = auth_endpoint
self.home_endpoint = home_endpoint
self.setup_endpoint = setup_endpoint
self.password_filter = ICloudPyPasswordFilter(password)
LOGGER.addFilter(self.password_filter)
if cookie_directory:
self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
if not path.exists(self._cookie_directory):
mkdir(self._cookie_directory, 0o700)
else:
topdir = path.join(gettempdir(), "icloudpy")
self._cookie_directory = path.join(topdir, getpass.getuser())
if not path.exists(topdir):
mkdir(topdir, 0o777)
if not path.exists(self._cookie_directory):
mkdir(self._cookie_directory, 0o700)
LOGGER.debug("Using session file %s", self.session_path)
self.session_data = {}
try:
with open(self.session_path, encoding="utf-8") as session_f:
self.session_data = json.load(session_f)
except: # pylint: disable=bare-except
LOGGER.info("Session file does not exist")
if self.session_data.get("client_id"):
self.client_id = self.session_data.get("client_id")
self.params["clientId"] = self.client_id
else:
self.session_data.update({"client_id": self.client_id})
self.params["clientId"] = self.client_id
self.session = ICloudPySession(self)
self.session.verify = verify
self.session.headers.update(
{"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"}
)
cookiejar_path = self.cookiejar_path
self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
if path.exists(cookiejar_path):
try:
self.session.cookies.load(ignore_discard=True, ignore_expires=True)
LOGGER.debug("Read cookies from %s", cookiejar_path)
except: # pylint: disable=bare-except
# Most likely a pickled cookiejar from earlier versions.
# The cookiejar will get replaced with a valid one after
# successful authentication.
LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
self.authenticate()
self._drive = None
self._work = None
self._files = None
self._photos = None
def authenticate(self, force_refresh=False, service=None):
"""
Handles authentication, and persists cookies so that
subsequent logins will not cause additional e-mails from Apple.
"""
login_successful = False
if self.session_data.get("session_token") and not force_refresh:
LOGGER.debug("Checking session token validity")
try:
self.data = self._validate_token()
login_successful = True
except ICloudPyAPIResponseException:
LOGGER.debug("Invalid authentication token, will log in from scratch.")
if not login_successful and service is not None:
app = self.data["apps"][service]
if (
"canLaunchWithOneFactor" in app
and app["canLaunchWithOneFactor"] is True
):
LOGGER.debug(
"Authenticating as %s for %s", self.user["accountName"], service
)
try:
self._authenticate_with_credentials_service(service)
login_successful = True
except Exception as error:
LOGGER.debug(
"Could not log into service. Attempting brand new login. %s",
str(error),
)
if not login_successful:
LOGGER.debug("Authenticating as %s", self.user["accountName"])
data = dict(self.user)
data["rememberMe"] = True
data["trustTokens"] = []
if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]
headers = self._get_auth_headers()
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
self.session.post(
f"{self.auth_endpoint}/signin",
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
except ICloudPyAPIResponseException as error:
msg = "Invalid email/password combination."
raise ICloudPyFailedLoginException(msg, error) from error
self._authenticate_with_token()
self._webservices = self.data["webservices"]
LOGGER.debug("Authentication completed successfully")
def _authenticate_with_token(self):
"""Authenticate using session token."""
data = {
"accountCountryCode": self.session_data.get("account_country"),
"dsWebAuthToken": self.session_data.get("session_token"),
"extended_login": True,
"trustToken": self.session_data.get("trust_token", ""),
}
try:
req = self.session.post(
f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
)
self.data = req.json()
except ICloudPyAPIResponseException as error:
msg = "Invalid authentication token."
raise ICloudPyFailedLoginException(msg, error) from error
def _authenticate_with_credentials_service(self, service):
"""Authenticate to a specific service using credentials."""
data = {
"appName": service,
"apple_id": self.user["accountName"],
"password": self.user["password"],
}
try:
self.session.post(
f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
)
self.data = self._validate_token()
except ICloudPyAPIResponseException as error:
msg = "Invalid email/password combination."
raise ICloudPyFailedLoginException(msg, error) from error
def _validate_token(self):
"""Checks if the current access token is still valid."""
LOGGER.debug("Checking session token validity")
try:
req = self.session.post(f"{self.setup_endpoint}/validate", data="null")
LOGGER.debug("Session token is still valid")
return req.json()
except ICloudPyAPIResponseException as err:
LOGGER.debug("Invalid authentication token")
raise err
def _get_auth_headers(self, overrides=None):
headers = {
"Accept": "*/*",
"Content-Type": "application/json",
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
"X-Apple-OAuth-Require-Grant-Code": "true",
"X-Apple-OAuth-Response-Mode": "web_message",
"X-Apple-OAuth-Response-Type": "code",
"X-Apple-OAuth-State": self.client_id,
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
}
if overrides:
headers.update(overrides)
return headers
@property
def cookiejar_path(self):
"""Get path for cookiejar file."""
return path.join(
self._cookie_directory,
"".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
)
@property
def session_path(self):
"""Get path for session data file."""
return path.join(
self._cookie_directory,
"".join([c for c in self.user.get("accountName") if match(r"\w", c)])
+ ".session",
)
@property
def requires_2sa(self):
"""Returns True if two-step authentication is required."""
return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
)
@property
def requires_2fa(self):
"""Returns True if two-factor authentication is required."""
return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
)
@property
def is_trusted_session(self):
"""Returns True if the session is trusted."""
return self.data.get("hsaTrustedBrowser", False)
@property
def trusted_devices(self):
"""Returns devices trusted for two-step authentication."""
request = self.session.get(
f"{self.setup_endpoint}/listDevices", params=self.params
)
return request.json().get("devices")
def send_verification_code(self, device):
"""Requests that a verification code is sent to the given device."""
data = json.dumps(device)
request = self.session.post(
f"{self.setup_endpoint}/sendVerificationCode",
params=self.params,
data=data,
)
return request.json().get("success", False)
def validate_verification_code(self, device, code):
"""Verifies a verification code received on a trusted device."""
device.update({"verificationCode": code, "trustBrowser": True})
data = json.dumps(device)
try:
self.session.post(
f"{self.setup_endpoint}/validateVerificationCode",
params=self.params,
data=data,
)
except ICloudPyAPIResponseException as error:
if error.code == -21669:
# Wrong verification code
return False
raise
self.trust_session()
return not self.requires_2sa
def validate_2fa_code(self, code):
"""Verifies a verification code received via Apple's 2FA system (HSA2)."""
data = {"securityCode": {"code": code}}
headers = self._get_auth_headers({"Accept": "application/json"})
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
self.session.post(
f"{self.auth_endpoint}/verify/trusteddevice/securitycode",
data=json.dumps(data),
headers=headers,
)
except ICloudPyAPIResponseException as error:
if error.code == -21669:
# Wrong verification code
LOGGER.error("Code verification failed.")
return False
raise
LOGGER.debug("Code verification successful.")
self.trust_session()
return not self.requires_2sa
def trust_session(self):
"""Request session trust to avoid user log in going forward."""
headers = self._get_auth_headers()
if self.session_data.get("scnt"):
headers["scnt"] = self.session_data.get("scnt")
if self.session_data.get("session_id"):
headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
try:
self.session.get(
f"{self.auth_endpoint}/2sv/trust",
headers=headers,
)
self._authenticate_with_token()
return True
except ICloudPyAPIResponseException:
LOGGER.error("Session trust failed.")
return False
def _get_webservice_url(self, ws_key):
"""Get webservice URL, raise an exception if not exists."""
if self._webservices.get(ws_key) is None:
raise ICloudPyServiceNotActivatedException(
"Webservice not available", ws_key
)
return self._webservices[ws_key]["url"]
@property
def devices(self):
"""Returns all devices."""
service_root = self._get_webservice_url("findme")
return FindMyiPhoneServiceManager(
service_root, self.session, self.params, self.with_family
)
@property
def iphone(self):
"""Returns the iPhone."""
return self.devices[0]
@property
def drive(self):
"""Gets the 'Drive' service."""
if not self._drive:
self._drive = DriveService(
service_root=self._get_webservice_url("drivews"),
document_root=self._get_webservice_url("docws"),
session=self.session,
params=self.params,
)
return self._drive
@property
def work(self):
"""Gets the 'Work' service."""
if not self._work:
self._work = WorkService(
document_root=self._get_webservice_url("iworkexportws"),
session=self.session,
params=self.params,
client_id=self.client_id,
dsid=self.data["dsInfo"].get("dsid"),
)
return self._work
def __unicode__(self):
return f"iCloud API: {self.user.get('accountName')}"
def __str__(self):
as_unicode = self.__unicode__()
if PY2:
return as_unicode.encode("utf-8", "ignore")
return as_unicode
def __repr__(self):
return f"<{str(self)}>"

374
icloudpy/cmdline.py Normal file
View File

@ -0,0 +1,374 @@
#! /usr/bin/env python
"""
A Command Line Wrapper to allow easy use of iCloudPy for
command line scripts, and related.
"""
# from builtins import input
import argparse
import pickle
import sys
from click import confirm
from icloudpy import ICloudPyService, utils
from icloudpy.exceptions import ICloudPyFailedLoginException
DEVICE_ERROR = "Please use the --device switch to indicate which device to use."
def create_pickled_data(idevice, filename):
"""
This helper will output the idevice to a pickled file named
after the passed filename.
This allows the data to be used without resorting to screen / pipe
scrapping.
"""
pickle_file = open(filename, "wb")
pickle.dump(idevice.content, pickle_file, protocol=pickle.HIGHEST_PROTOCOL)
pickle_file.close()
def main(args=None):
"""Main commandline entrypoint."""
if args is None:
args = sys.argv[1:]
parser = argparse.ArgumentParser(description="Find My iPhone CommandLine Tool")
parser.add_argument(
"--username",
action="store",
dest="username",
default="",
help="Apple ID to Use",
)
parser.add_argument(
"--password",
action="store",
dest="password",
default="",
help=(
"Apple ID Password to Use; if unspecified, password will be "
"fetched from the system keyring."
),
)
parser.add_argument(
"-n",
"--non-interactive",
action="store_false",
dest="interactive",
default=True,
help="Disable interactive prompts.",
)
parser.add_argument(
"--delete-from-keyring",
action="store_true",
dest="delete_from_keyring",
default=False,
help="Delete stored password in system keyring for this username.",
)
parser.add_argument(
"--list",
action="store_true",
dest="list",
default=False,
help="Short Listings for Device(s) associated with account",
)
parser.add_argument(
"--llist",
action="store_true",
dest="longlist",
default=False,
help="Detailed Listings for Device(s) associated with account",
)
parser.add_argument(
"--locate",
action="store_true",
dest="locate",
default=False,
help="Retrieve Location for the iDevice (non-exclusive).",
)
# Restrict actions to a specific devices UID / DID
parser.add_argument(
"--device",
action="store",
dest="device_id",
default=False,
help="Only effect this device",
)
# Trigger Sound Alert
parser.add_argument(
"--sound",
action="store_true",
dest="sound",
default=False,
help="Play a sound on the device",
)
# Trigger Message w/Sound Alert
parser.add_argument(
"--message",
action="store",
dest="message",
default=False,
help="Optional Text Message to display with a sound",
)
# Trigger Message (without Sound) Alert
parser.add_argument(
"--silentmessage",
action="store",
dest="silentmessage",
default=False,
help="Optional Text Message to display with no sounds",
)
# Lost Mode
parser.add_argument(
"--lostmode",
action="store_true",
dest="lostmode",
default=False,
help="Enable Lost mode for the device",
)
parser.add_argument(
"--lostphone",
action="store",
dest="lost_phone",
default=False,
help="Phone Number allowed to call when lost mode is enabled",
)
parser.add_argument(
"--lostpassword",
action="store",
dest="lost_password",
default=False,
help="Forcibly active this passcode on the idevice",
)
parser.add_argument(
"--lostmessage",
action="store",
dest="lost_message",
default="",
help="Forcibly display this message when activating lost mode.",
)
# Output device data to an pickle file
parser.add_argument(
"--outputfile",
action="store_true",
dest="output_to_file",
default="",
help="Save device data to a file in the current directory.",
)
# Path to session directory
parser.add_argument(
"--session-directory",
action="store",
dest="session_directory",
default=None,
help="Path to save session information",
)
# Server region - global or china
parser.add_argument(
"--region",
action="store",
dest="region",
default="global",
help="Server region - global or china",
)
command_line = parser.parse_args(args)
username = command_line.username
password = command_line.password
session_directory = command_line.session_directory
server_region = command_line.region
if username and command_line.delete_from_keyring:
utils.delete_password_in_keyring(username)
failure_count = 0
while True:
# Which password we use is determined by your username, so we
# do need to check for this first and separately.
if not username:
parser.error("No username supplied")
if not password:
password = utils.get_password(
username, interactive=command_line.interactive
)
if not password:
parser.error("No password supplied")
try:
api = (
ICloudPyService(
apple_id=username.strip(),
password=password.strip(),
cookie_directory=session_directory,
home_endpoint="https://www.icloud.com.cn",
setup_endpoint="https://setup.icloud.com.cn/setup/ws/1",
)
if server_region == "china"
else ICloudPyService(
apple_id=username.strip(),
password=password.strip(),
cookie_directory=session_directory,
)
)
if (
not utils.password_exists_in_keyring(username)
and command_line.interactive
and confirm("Save password in keyring?")
):
utils.store_password_in_keyring(username, password)
if api.requires_2fa:
# fmt: off
print(
"\nTwo-step authentication required.",
"\nPlease enter validation code"
)
# fmt: on
code = input("(string) --> ")
if not api.validate_2fa_code(code):
print("Failed to verify verification code")
sys.exit(1)
print("")
elif api.requires_2sa:
# fmt: off
print(
"\nTwo-step authentication required.",
"\nYour trusted devices are:"
)
# fmt: on
devices = api.trusted_devices
for i, device in enumerate(devices):
print(
f' {i}: {device.get("deviceName", "SMS to " + device.get("phoneNumber"))}'
)
print("\nWhich device would you like to use?")
device = int(input("(number) --> "))
device = devices[device]
if not api.send_verification_code(device):
print("Failed to send verification code")
sys.exit(1)
print("\nPlease enter validation code")
code = input("(string) --> ")
if not api.validate_verification_code(device, code):
print("Failed to verify verification code")
sys.exit(1)
print("")
break
except ICloudPyFailedLoginException as error:
# If they have a stored password; we just used it and
# it did not work; let's delete it if there is one.
if utils.password_exists_in_keyring(username):
utils.delete_password_in_keyring(username)
message = f"Bad username or password for {username}"
password = None
failure_count += 1
if failure_count >= 3:
raise RuntimeError(message) from error
print(message, file=sys.stderr)
for dev in api.devices:
if not command_line.device_id or (
command_line.device_id.strip().lower() == dev.content["id"].strip().lower()
):
# List device(s)
if command_line.locate:
dev.location()
if command_line.output_to_file:
create_pickled_data(
dev,
filename=(dev.content["name"].strip().lower() + ".fmip_snapshot"),
)
contents = dev.content
if command_line.longlist:
print("-" * 30)
print(contents["name"])
for key in contents:
print(f"{key} - {contents[key]}")
elif command_line.list:
print("-" * 30)
print(f"Name - {contents['name']}")
print(f"Display Name - {contents['deviceDisplayName']}")
print(f"Location - {contents['location']}")
print(f"Battery Level - {contents['batteryLevel']}")
print(f"Battery Status- {contents['batteryStatus']}")
print(f"Device Class - {contents['deviceClass']}")
print(f"Device Model - {contents['deviceModel']}")
# Play a Sound on a device
if command_line.sound:
if command_line.device_id:
dev.play_sound()
else:
raise RuntimeError(
f"\n\n\t\tSounds can only be played on a singular device. {DEVICE_ERROR}\n\n"
)
# Display a Message on the device
if command_line.message:
if command_line.device_id:
dev.display_message(
subject="A Message", message=command_line.message, sounds=True
)
else:
raise RuntimeError(
f"Messages can only be played on a singular device. {DEVICE_ERROR}"
)
# Display a Silent Message on the device
if command_line.silentmessage:
if command_line.device_id:
dev.display_message(
subject="A Silent Message",
message=command_line.silentmessage,
sounds=False,
)
else:
raise RuntimeError(
f"Silent Messages can only be played on a singular device. {DEVICE_ERROR}"
)
# Enable Lost mode
if command_line.lostmode:
if command_line.device_id:
dev.lost_device(
number=command_line.lost_phone.strip(),
text=command_line.lost_message.strip(),
newpasscode=command_line.lost_password.strip(),
)
else:
raise RuntimeError(
f"Lost Mode can only be activated on a singular device. {DEVICE_ERROR}"
)
sys.exit(0)
if __name__ == "__main__":
main()

47
icloudpy/exceptions.py Normal file
View File

@ -0,0 +1,47 @@
"""Library exceptions."""
class ICloudPyException(Exception):
"""Generic iCloud exception."""
# API
class ICloudPyAPIResponseException(ICloudPyException):
"""iCloud response exception."""
def __init__(self, reason, code=None, retry=False):
self.reason = reason
self.code = code
message = reason or ""
if code:
message += f" ({code})"
if retry:
message += ". Retrying ..."
super().__init__(message)
class ICloudPyServiceNotActivatedException(ICloudPyAPIResponseException):
"""iCloud service not activated exception."""
# Login
class ICloudPyFailedLoginException(ICloudPyException):
"""iCloud failed login exception."""
class ICloudPy2SARequiredException(ICloudPyException):
"""iCloud 2SA required exception."""
def __init__(self, apple_id):
message = f"Two-step authentication required for account:{apple_id}"
super().__init__(message)
class ICloudPyNoStoredPasswordAvailableException(ICloudPyException):
"""iCloud no stored password exception."""
# Webservice specific
class ICloudPyNoDevicesException(ICloudPyException):
"""iCloud no device exception."""

View File

@ -0,0 +1,3 @@
from icloudpy.services.drive import DriveService # pylint: disable=unused-import
from icloudpy.services.work import WorkService # pylint: disable=unused-import

385
icloudpy/services/drive.py Normal file
View File

@ -0,0 +1,385 @@
"""Drive service."""
import io
import json
import mimetypes
import os
import time
from datetime import datetime, timedelta
from re import search
from requests import Response
from six import PY2
class DriveService:
"""The 'Drive' iCloud service."""
def __init__(self, service_root, document_root, session, params):
self._service_root = service_root
self._document_root = document_root
self.session = session
self.params = dict(params)
self._root = None
def _get_token_from_cookie(self):
for cookie in self.session.cookies:
if cookie.name == "X-APPLE-WEBAUTH-VALIDATE":
match = search(r"\bt=([^:]+)", cookie.value)
if match is None:
raise Exception(f"Can't extract token from {cookie.value}")
return {"token": match.group(1)}
raise Exception("Token cookie not found")
def get_node_data(self, drivewsid):
"""Returns the node data."""
request = self.session.post(
self._service_root + "/retrieveItemDetailsInFolders",
params=self.params,
data=json.dumps(
[
{
"drivewsid": drivewsid,
"partialData": False,
}
]
),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()[0]
def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs):
"""Returns iCloud Drive file."""
file_params = dict(self.params)
file_params.update({"document_id": file_id})
response = self.session.get(
self._document_root + f"/ws/{zone}/download/by_id",
params=file_params,
)
if not response.ok:
self.session.raise_error(response.status_code, response.reason)
package_token = response.json().get("package_token")
data_token = response.json().get("data_token")
if data_token and data_token.get("url"):
return self.session.get(data_token["url"], params=self.params, **kwargs)
elif package_token and package_token.get("url"):
return self.session.get(package_token["url"], params=self.params, **kwargs)
else:
raise KeyError("'data_token' nor 'package_token' found in response.")
def get_app_data(self):
"""Returns the app library (previously ubiquity)."""
request = self.session.get(
self._service_root + "/retrieveAppLibraries", params=self.params
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()["items"]
def get_app_node(self, app_id, folder="documents"):
"""Returns the node of the app (ubiquity)"""
return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder))
def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"):
"""Get the contentWS endpoint URL to add a new file."""
content_type = mimetypes.guess_type(file_object.name)[0]
if content_type is None:
content_type = ""
# Get filesize from file object
orig_pos = file_object.tell()
file_object.seek(0, os.SEEK_END)
file_size = file_object.tell()
file_object.seek(orig_pos, os.SEEK_SET)
file_params = self.params
file_params.update(self._get_token_from_cookie())
request = self.session.post(
self._document_root + f"/ws/{zone}/upload/web",
params=file_params,
headers={"Content-Type": "text/plain"},
data=json.dumps(
{
"filename": file_object.name,
"type": "FILE",
"content_type": content_type,
"size": file_size,
}
),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return (request.json()[0]["document_id"], request.json()[0]["url"])
def _update_contentws(
self, folder_id, sf_info, document_id, file_object, zone="com.apple.CloudDocs"
):
data = {
"data": {
"signature": sf_info["fileChecksum"],
"wrapping_key": sf_info["wrappingKey"],
"reference_signature": sf_info["referenceChecksum"],
"size": sf_info["size"],
},
"command": "add_file",
"create_short_guid": True,
"document_id": document_id,
"path": {
"starting_document_id": folder_id,
"path": os.path.basename(file_object.name),
},
"allow_conflict": True,
"file_flags": {
"is_writable": True,
"is_executable": False,
"is_hidden": False,
},
"mtime": int(time.time() * 1000),
"btime": int(time.time() * 1000),
}
# Add the receipt if we have one. Will be absent for 0-sized files
if sf_info.get("receipt"):
data["data"].update({"receipt": sf_info["receipt"]})
request = self.session.post(
self._document_root + f"/ws/{zone}/update/documents",
params=self.params,
headers={"Content-Type": "text/plain"},
data=json.dumps(data),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()
def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"):
"""Send new file to iCloud Drive."""
document_id, content_url = self._get_upload_contentws_url(file_object, zone)
request = self.session.post(content_url, files={file_object.name: file_object})
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
content_response = request.json()["singleFile"]
self._update_contentws(
folder_id, content_response, document_id, file_object, zone
)
def create_folders(self, parent, name):
"""Creates a new iCloud Drive folder"""
request = self.session.post(
self._service_root + "/createFolders",
params=self.params,
headers={"Content-Type": "text/plain"},
data=json.dumps(
{
"destinationDrivewsId": parent,
"folders": [
{
"clientId": self.params["clientId"],
"name": name,
}
],
}
),
)
return request.json()
def rename_items(self, node_id, etag, name):
"""Renames an iCloud Drive node"""
request = self.session.post(
self._service_root + "/renameItems",
params=self.params,
data=json.dumps(
{
"items": [
{
"drivewsid": node_id,
"etag": etag,
"name": name,
}
],
}
),
)
return request.json()
def move_items_to_trash(self, node_id, etag):
"""Moves an iCloud Drive node to the trash bin"""
request = self.session.post(
self._service_root + "/moveItemsToTrash",
params=self.params,
data=json.dumps(
{
"items": [
{
"drivewsid": node_id,
"etag": etag,
"clientId": self.params["clientId"],
}
],
}
),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()
@property
def root(self):
"""Returns the root node."""
if not self._root:
self._root = DriveNode(
self, self.get_node_data("FOLDER::com.apple.CloudDocs::root")
)
return self._root
def __getattr__(self, attr):
return getattr(self.root, attr)
def __getitem__(self, key):
return self.root[key]
class DriveNode:
"""Drive node."""
def __init__(self, conn, data):
self.data = data
self.connection = conn
self._children = None
@property
def name(self):
"""Gets the node name."""
if "extension" in self.data:
return f'{self.data["name"]}.{self.data["extension"]}'
return self.data["name"]
@property
def type(self):
"""Gets the node type."""
node_type = self.data.get("type")
return node_type and node_type.lower()
def get_children(self):
"""Gets the node children."""
if not self._children:
if "items" not in self.data:
self.data.update(self.connection.get_node_data(self.data["drivewsid"]))
if "items" not in self.data:
raise KeyError(f'No items in folder, status: {self.data["status"]}')
self._children = [
DriveNode(self.connection, item_data)
for item_data in self.data["items"]
]
return self._children
@property
def size(self):
"""Gets the node size."""
size = self.data.get("size") # Folder does not have size
if not size:
return None
return int(size)
@property
def date_created(self):
"""Gets the node created date (in UTC)."""
return _date_to_utc(self.data.get("dateCreated"))
@property
def date_changed(self):
"""Gets the node changed date (in UTC)."""
return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date
@property
def date_modified(self):
"""Gets the node modified date (in UTC)."""
return _date_to_utc(self.data.get("dateModified")) # Folder does not have date
@property
def date_last_open(self):
"""Gets the node last open date (in UTC)."""
return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date
def open(self, **kwargs):
"""Gets the node file."""
# iCloud returns 400 Bad Request for 0-byte files
if self.data["size"] == 0:
response = Response()
response.raw = io.BytesIO()
return response
return self.connection.get_file(
self.data["docwsid"], zone=self.data["zone"], **kwargs
)
def upload(self, file_object, **kwargs):
""" "Upload a new file."""
return self.connection.send_file(
self.data["docwsid"], file_object, zone=self.data["zone"], **kwargs
)
def dir(self):
"""Gets the node list of directories."""
if self.type == "file":
return None
return [child.name for child in self.get_children()]
def mkdir(self, folder):
"""Create a new directory directory."""
# remove cached entries information first so that it will be re-read on next get_children()
self._children = None
if "items" in self.data:
self.data.pop("items")
return self.connection.create_folders(self.data["drivewsid"], folder)
def rename(self, name):
"""Rename an iCloud Drive item."""
return self.connection.rename_items(
self.data["drivewsid"], self.data["etag"], name
)
def delete(self):
"""Delete an iCloud Drive item."""
return self.connection.move_items_to_trash(
self.data["drivewsid"], self.data["etag"]
)
def get(self, name):
"""Gets the node child."""
if self.type == "file":
return None
return [child for child in self.get_children() if child.name == name][0]
def __getitem__(self, key):
try:
return self.get(key)
except IndexError as error:
raise KeyError(f"No child named '{key}' exists") from error
def __unicode__(self):
return f"{{type: {self.type}, name: {self.name}}}"
def __str__(self):
as_unicode = self.__unicode__()
if PY2:
return as_unicode.encode("utf-8", "ignore")
return as_unicode
def __repr__(self):
return f"<{type(self).__name__}: {str(self)}>"
def _date_to_utc(date):
if not date:
return None
# jump through hoops to return time in UTC rather than California time
match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date)
if not match:
# Already in UTC
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S")
diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3)))
return base - diff

92
icloudpy/services/work.py Normal file
View File

@ -0,0 +1,92 @@
"""Work service."""
import io
import json
import mimetypes
import os
import time
import shutil
from datetime import datetime, timedelta
from re import search
from requests import Response
from six import PY2
from src import LOGGER, config_parser
class WorkService:
def __init__(self, document_root, session, params, client_id, dsid):
self._document_root = document_root
self.session = session
self.params = dict(params)
self.client_id = client_id
self.dsid = dsid
self._root = None
def export_response(self, document_id, secret, zone):
file_params = dict(self.params)
file_params.update({"clientBuildNumber": "current"})
file_params.update({"clientMasteringNumber": "Mcurrent"})
file_params.update({"dsid": self.dsid})
request = self.session.post(
self._document_root + f"/iw/export-ws/{self.dsid}/export_document",
params=file_params,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"primary": "primary",
"document_type": "numbers",
"format": "org.openxmlformats.spreadsheetml.sheet",
"locale": "en",
"encrypt_result": "N",
"secret": secret,
"document_id": document_id,
"zone": zone,
}
,
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()["job_id"]
def check_job(self, job_id):
url = self._document_root + f"/iw/export-ws/{self.dsid}/check_export_status"
file_params = dict(self.params)
file_params.update({"build": "primary"})
file_params.update({"clientBuildNumber": "current"})
file_params.update({"clientMasteringNumber": "Mcurrent"})
file_params.update({"job_id": job_id})
file_params.update({"dsid": self.dsid})
request = self.session.post(
url,
params=file_params,
headers={"Content-Type": "text/plain"},
data={},
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
if request.json()["job_status"] == "failure":
raise Exception(request.json()["job_status"])
if request.json()["job_status"] == "success":
return True
else:
return False
def download_file(self, job_id, destination_path, name):
url = self._document_root + f"/iw/export-ws/{self.dsid}/download_exported_document"
file_params = dict(self.params)
file_params.update({"build": "primary"})
file_params.update({"file_name": name + f".xlsx"})
file_params.update({"job_id": job_id})
local_filename = os.path.join(destination_path, name + f".xlsx")
try:
response = self.session.get(url, params=file_params, stream=True)
with open(local_filename, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
except self.session.exceptions.RequestException as e:
raise Exception(f"Ошибка скачивания сконцентрированного файла {local_filename}: {str(e)}")
return local_filename

63
icloudpy/utils.py Normal file
View File

@ -0,0 +1,63 @@
"""Utils."""
import getpass
from sys import stdout
import keyring
from .exceptions import ICloudPyNoStoredPasswordAvailableException
KEYRING_SYSTEM = "icloudpy://icloud-password"
def get_password(username, interactive=stdout.isatty() if stdout else False):
"""Get the password from a username."""
try:
return get_password_from_keyring(username)
except ICloudPyNoStoredPasswordAvailableException:
if not interactive:
raise
return getpass.getpass(f"Enter iCloud password for {username}: ")
def password_exists_in_keyring(username):
"""Return true if the password of a username exists in the keyring."""
try:
get_password_from_keyring(username)
except ICloudPyNoStoredPasswordAvailableException:
return False
return True
def get_password_from_keyring(username):
"""Get the password from a username."""
result = keyring.get_password(KEYRING_SYSTEM, username)
if result is None:
raise ICloudPyNoStoredPasswordAvailableException(
f"No iCloudPy password for {username} could be found "
"in the system keychain. Use the `--store-in-keyring` "
"command-line option for storing a password for this "
"username."
)
return result
def store_password_in_keyring(username, password):
"""Store the password of a username."""
return keyring.set_password(KEYRING_SYSTEM, username, password)
def delete_password_in_keyring(username):
"""Delete the password of a username."""
return keyring.delete_password(KEYRING_SYSTEM, username)
def underscore_to_camelcase(word, initial_capital=False):
"""Transform a word to camelCase."""
words = [x.capitalize() or "_" for x in word.split("_")]
if not initial_capital:
words[0] = words[0].lower()
return "".join(words)

4
main.py Normal file
View File

@ -0,0 +1,4 @@
from src import sync
if __name__ == "__main__":
sync.sync()

13
requirements.txt Normal file
View File

@ -0,0 +1,13 @@
requests==2.32.3
keyring==23.11.0
keyrings.alt==4.2.0
click==8.1.7
six==1.16.0
tzlocal==5.2
pytz==2024.2
certifi==2024.8.30
future==1.0.0
icloudpy==0.7.0
ruamel.yaml==0.16.12
python-magic==0.4.27
libmagic==1.0

BIN
secret.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.1 MiB

127
src/__init__.py Normal file
View File

@ -0,0 +1,127 @@
import logging
import os
import sys
import warnings
from ruamel.yaml import YAML
DEFAULT_ROOT_DESTINATION = "./icloud"
DEFAULT_DRIVE_DESTINATION = "drive"
DEFAULT_DRIVE_DESTINATION_EXPORT = "drive_export"
DEFAULT_RETRY_LOGIN_INTERVAL_SEC = 600 # 10 minutes
DEFAULT_SYNC_INTERVAL_SEC = 1800 # 30 minutes
DEFAULT_CONFIG_FILE_NAME = "config.yaml"
ENV_ICLOUD_PASSWORD_KEY = "ENV_ICLOUD_PASSWORD"
ENV_CONFIG_FILE_PATH_KEY = "ENV_CONFIG_FILE_PATH"
DEFAULT_LOGGER_LEVEL = "info"
DEFAULT_LOG_FILE_NAME = "icloud.log"
DEFAULT_CONFIG_FILE_PATH = os.path.join(
os.path.dirname(os.path.dirname(__file__)), DEFAULT_CONFIG_FILE_NAME
)
DEFAULT_COOKIE_DIRECTORY = "session_data"
warnings.filterwarnings("ignore", category=DeprecationWarning)
def read_config(config_path=DEFAULT_CONFIG_FILE_PATH):
if not (config_path and os.path.exists(config_path)):
print(f"Файл конфигурации не найден по адресу {config_path}.")
return None
with open(file=config_path, encoding="utf-8") as config_file:
config = YAML().load(config_file)
config["app"]["credentials"]["username"] = (
config["app"]["credentials"]["username"].strip()
if config["app"]["credentials"]["username"] is not None
else ""
)
return config
def get_logger_config(config):
logger_config = {}
if "logger" not in config["app"]:
return None
config_app_logger = config["app"]["logger"]
logger_config["level"] = (
config_app_logger["level"].strip().lower()
if "level" in config_app_logger
else DEFAULT_LOGGER_LEVEL
)
logger_config["filename"] = (
config_app_logger["filename"].strip().lower()
if "filename" in config_app_logger
else DEFAULT_LOG_FILE_NAME
)
return logger_config
def log_handler_exists(logger, handler_type, **kwargs):
for handler in logger.handlers:
if isinstance(handler, handler_type):
if handler_type is logging.FileHandler:
if handler.baseFilename.endswith(kwargs["filename"]):
return True
elif handler_type is logging.StreamHandler:
if handler.stream is kwargs["stream"]:
return True
return False
class ColorfulConsoleFormatter(logging.Formatter):
grey = "\x1b[38;21m"
blue = "\x1b[38;5;39m"
yellow = "\x1b[38;5;226m"
red = "\x1b[38;5;196m"
bold_red = "\x1b[31;1m"
reset = "\x1b[0m"
def __init__(self, fmt):
super().__init__()
self.fmt = fmt
self.formats = {
logging.DEBUG: self.grey + self.fmt + self.reset,
logging.INFO: self.blue + self.fmt + self.reset,
logging.WARNING: self.yellow + self.fmt + self.reset,
logging.ERROR: self.red + self.fmt + self.reset,
logging.CRITICAL: self.bold_red + self.fmt + self.reset,
}
def format(self, record):
log_fmt = self.formats.get(record.levelno)
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
def get_logger():
logger = logging.getLogger()
logger_config = get_logger_config(config=read_config())
if logger_config:
level_name = logging.getLevelName(level=logger_config["level"].upper())
logger.setLevel(level=level_name)
if not log_handler_exists(
logger=logger,
handler_type=logging.FileHandler,
filename=logger_config["filename"],
):
file_handler = logging.FileHandler(logger_config["filename"])
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s :: %(levelname)s :: %(name)s :: %(filename)s :: %(lineno)d :: %(message)s"
)
)
logger.addHandler(file_handler)
if not log_handler_exists(
logger=logger, handler_type=logging.StreamHandler, stream=sys.stdout
):
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(
ColorfulConsoleFormatter(
"%(asctime)s :: %(levelname)s :: %(name)s :: %(filename)s :: %(lineno)d :: %(message)s"
)
)
logger.addHandler(console_handler)
return logger
LOGGER = get_logger()

159
src/config_parser.py Normal file
View File

@ -0,0 +1,159 @@
import os
from src import (
DEFAULT_DRIVE_DESTINATION,
DEFAULT_DRIVE_DESTINATION_EXPORT,
DEFAULT_RETRY_LOGIN_INTERVAL_SEC,
DEFAULT_ROOT_DESTINATION,
DEFAULT_SYNC_INTERVAL_SEC,
LOGGER,
)
def config_path_to_string(config_path):
return " > ".join(config_path)
def traverse_config_path(config, config_path: list[str]) -> bool:
if len(config_path) == 0:
return True
if not (config and config_path[0] in config):
return False
return traverse_config_path(config[config_path[0]], config_path=config_path[1:])
def get_config_value(config, config_path):
if len(config_path) == 1:
return config[config_path[0]]
return get_config_value(config=config[config_path[0]], config_path=config_path[1:])
def get_username(config):
username = None
config_path = ["app", "credentials", "username"]
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.error(
f"username отсутствует в {config_path_to_string(config_path)}. Пожалуйста, установите имя пользователя."
)
else:
username = get_config_value(config=config, config_path=config_path)
username = username.strip()
if len(username) == 0:
username = None
LOGGER.error(f"username пустое в {config_path_to_string(config_path)}.")
return username
def get_retry_login_interval(config):
retry_login_interval = DEFAULT_RETRY_LOGIN_INTERVAL_SEC
config_path = ["app", "credentials", "retry_login_interval"]
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.warning(
f"retry_login_interval не найден в {config_path_to_string(config_path=config_path)}."
+ f" Использование по умолчанию {retry_login_interval} секунд ..."
)
else:
retry_login_interval = get_config_value(config=config, config_path=config_path)
LOGGER.info(f"Повторная попытка входа каждые {retry_login_interval} секунд.")
return retry_login_interval
def get_drive_sync_interval(config):
sync_interval = DEFAULT_SYNC_INTERVAL_SEC
config_path = ["drive", "sync_interval"]
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.warning(
f"sync_interval не найден в {config_path_to_string(config_path=config_path)}."
+ f" Использование sync_interval по умолчанию: {sync_interval} секунд ..."
)
else:
sync_interval = get_config_value(config=config, config_path=config_path)
return sync_interval
def prepare_root_destination(config):
LOGGER.debug("Проверка root ...")
root_destination = DEFAULT_ROOT_DESTINATION
config_path = ["app", "root"]
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.warning(
f"Предупреждение: root отсутствует в {config_path_to_string(config_path)}."
+ f" Использование root по умолчанию: {root_destination}",
)
else:
root_destination = get_config_value(config=config, config_path=config_path)
root_destination_path = os.path.abspath(root_destination)
os.makedirs(root_destination_path, exist_ok=True)
return root_destination_path
def prepare_drive_destination(config):
LOGGER.debug("Проверка пути сохранения фалов...")
config_path = ["drive", "destination"]
drive_destination = DEFAULT_DRIVE_DESTINATION
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.warning(
f"Внимание: путь сохранения фалов отсутствует в {config_path_to_string(config_path)}."
+ f" Использование путь сохранения фалов по умолчанию: {drive_destination}."
)
else:
drive_destination = get_config_value(config=config, config_path=config_path)
drive_destination_path = os.path.abspath(
os.path.join(prepare_root_destination(config=config), drive_destination)
)
os.makedirs(drive_destination_path, exist_ok=True)
return drive_destination_path
def prepare_drive_destination_export(config):
LOGGER.debug("Проверка пути конвертации фалов ...")
config_path = ["drive", "destination_export"]
drive_destination_export = DEFAULT_DRIVE_DESTINATION_EXPORT
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.warning(
f"Внимание: путь конвертации фалов отсутствует в {config_path_to_string(config_path)}."
+ f" Использование путь конвертации фалов по умолчанию: {drive_destination_export}."
)
else:
drive_destination_export = get_config_value(config=config, config_path=config_path)
drive_destination_export_path = os.path.abspath(
os.path.join(prepare_root_destination(config=config), drive_destination_export)
)
os.makedirs(drive_destination_export_path, exist_ok=True)
return drive_destination_export_path
def get_drive_remove_obsolete(config):
drive_remove_obsolete = False
config_path = ["drive", "remove_obsolete"]
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.warning(
f"Предупреждение: remove_obsolete не найден в {config_path_to_string(config_path)}."
+ " Используется параметр не удалять устаревшие файлы и папки."
)
else:
drive_remove_obsolete = get_config_value(config=config, config_path=config_path)
return drive_remove_obsolete
def get_region(config):
region = "global"
config_path = ["app", "region"]
if not traverse_config_path(config=config, config_path=config_path):
LOGGER.warning(
f"{config_path_to_string(config_path=config_path)} не найдено. Использует значение по умолчанию - global ..."
)
else:
region = get_config_value(config=config, config_path=config_path)
if region not in ["global", "china"]:
LOGGER.error(
f"{config_path_to_string(config_path=config_path)} недействительно. \
Допустимые значения - global или china. Использование значения по умолчанию - global ..."
)
region = "global"
return region

126
src/sync.py Normal file
View File

@ -0,0 +1,126 @@
import datetime
import os
from time import sleep
from icloudpy import ICloudPyService, exceptions, utils
from src import (
DEFAULT_CONFIG_FILE_PATH,
DEFAULT_COOKIE_DIRECTORY,
ENV_CONFIG_FILE_PATH_KEY,
ENV_ICLOUD_PASSWORD_KEY,
LOGGER,
config_parser,
read_config,
sync_drive
)
def get_api_instance(
username,
password,
cookie_directory=DEFAULT_COOKIE_DIRECTORY,
server_region="global",
):
return (
ICloudPyService(
apple_id=username,
password=password,
cookie_directory=cookie_directory,
home_endpoint="https://www.icloud.com.cn",
setup_endpoint="https://setup.icloud.com.cn/setup/ws/1",
)
if server_region == "china"
else ICloudPyService(
apple_id=username,
password=password,
cookie_directory=cookie_directory,
)
)
def sync():
last_send = None
enable_sync_drive = True
drive_sync_interval = 0
sleep_for = 10
while True:
config = read_config(
config_path=os.environ.get(
ENV_CONFIG_FILE_PATH_KEY, DEFAULT_CONFIG_FILE_PATH
)
)
username = config_parser.get_username(config=config)
if username:
try:
if ENV_ICLOUD_PASSWORD_KEY in os.environ:
password = os.environ.get(ENV_ICLOUD_PASSWORD_KEY)
utils.store_password_in_keyring(
username=username, password=password
)
else:
password = utils.get_password_from_keyring(username=username)
server_region = config_parser.get_region(config=config)
api = get_api_instance(
username=username, password=password, server_region=server_region
)
if not api.requires_2sa:
if "drive" in config and enable_sync_drive:
LOGGER.info("Синхронизация drive...")
vvvv = sync_drive.sync_drive(config=config, drive=api.drive, work=api.work)
LOGGER.info("Drive синхронизирован")
my_file = open("last_update.txt", "w")
my_file.write(datetime.datetime.now().isoformat())
my_file.close()
drive_sync_interval = config_parser.get_drive_sync_interval(
config=config
)
if "drive" not in config:
LOGGER.warning(
"Нечего синхронизировать. Добавьте раздел drive в файл config.yaml."
)
else:
LOGGER.error("Ошибка: требуется 2FA. Пожалуйста, войдите в систему.")
sleep_for = config_parser.get_retry_login_interval(config=config)
next_sync = (
datetime.datetime.now() + datetime.timedelta(seconds=sleep_for)
).strftime("%c")
if sleep_for < 0:
LOGGER.info("retry_login_interval is < 0, exiting ...")
break
LOGGER.info(f"Повторная попытка входа в {next_sync} ...")
sleep(sleep_for)
continue
except exceptions.ICloudPyNoStoredPasswordAvailableException:
LOGGER.error(
"Пароль не сохранен в связке ключей. Пожалуйста, сохраните пароль в связке ключей."
)
sleep_for = config_parser.get_retry_login_interval(config=config)
next_sync = (
datetime.datetime.now() + datetime.timedelta(seconds=sleep_for)
).strftime("%c")
LOGGER.info(f"Повторная попытка входа в систему {next_sync} ...")
sleep(sleep_for)
continue
if "drive" in config:
sleep_for = drive_sync_interval
enable_sync_drive = True
enable_sync_photos = False
next_sync = (
datetime.datetime.now() + datetime.timedelta(seconds=sleep_for)
).strftime("%c")
LOGGER.info(f"Повторная синхронизация в {next_sync} ...")
if (
config_parser.get_drive_sync_interval(config=config) < 0
if "drive" in config
else True and config_parser.get_photos_sync_interval(config=config) < 0
):
break
sleep(sleep_for)

353
src/sync_drive.py Normal file
View File

@ -0,0 +1,353 @@
import gzip
import os
import re
import time
import unicodedata
import zipfile
from pathlib import Path, PurePath
from shutil import copyfileobj, rmtree
from time import sleep
import json
from icloudpy import exceptions
from src import LOGGER, config_parser
def wanted_file(filters, ignore, file_path):
if not file_path:
return False
if ignore:
if ignored_path(ignore, file_path):
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
return False
if not filters or len(filters) == 0:
return True
for file_extension in filters:
if re.search(f"{file_extension}$", file_path, re.IGNORECASE):
return True
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
return False
def wanted_file_name(filters, item):
if not filters or len(filters) == 0:
return True
for file_name in filters:
if item.data['name'] == file_name:
return True
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
return False
def wanted_folder(filters, ignore, root, folder_path):
if ignore:
if ignored_path(ignore, folder_path):
return False
if not filters or not folder_path or not root or len(filters) == 0:
return True
folder_path = Path(folder_path)
for folder in filters:
child_path = Path(
os.path.join(
os.path.abspath(root), str(folder).removeprefix("/").removesuffix("/")
)
)
if (
folder_path in child_path.parents
or child_path in folder_path.parents
or folder_path == child_path
):
return True
return False
def ignored_path(ignore_list, path):
for ignore in ignore_list:
if PurePath(path).match(ignore + "*" if ignore.endswith("/") else ignore):
return True
return False
def wanted_parent_folder(filters, ignore, root, folder_path):
if not filters or not folder_path or not root or len(filters) == 0:
return True
folder_path = Path(folder_path)
for folder in filters:
child_path = Path(
os.path.join(
os.path.abspath(root), folder.removeprefix("/").removesuffix("/")
)
)
if child_path in folder_path.parents or folder_path == child_path:
return True
return False
def process_folder(item, destination_path, filters, ignore, root):
if not (item and destination_path and root):
return None
new_directory = os.path.join(destination_path, item.name)
new_directory_norm = unicodedata.normalize("NFC", new_directory)
if not wanted_folder(
filters=filters, ignore=ignore, folder_path=new_directory_norm, root=root
):
LOGGER.debug(f"Пропуск ненужной папки {new_directory} ...")
return None
os.makedirs(new_directory_norm, exist_ok=True)
return new_directory
def package_exists(item, local_package_path):
if item and local_package_path and os.path.isdir(local_package_path):
local_package_modified_time = int(os.path.getmtime(local_package_path))
remote_package_modified_time = int(item.date_modified.timestamp())
local_package_size = sum(
f.stat().st_size
for f in Path(local_package_path).glob("**/*")
if f.is_file()
)
remote_package_size = item.size
if (
local_package_modified_time == remote_package_modified_time
and local_package_size == remote_package_size
):
LOGGER.debug(
f"Изменений не обнаружено. Пропуск пакета {local_package_path} ..."
)
return True
else:
LOGGER.info(
f"Обнаружены изменения: local_modified_time равно {local_package_modified_time}, "
+ f"remote_modified_time равно {remote_package_modified_time}, "
+ f"local_package_size равен {local_package_size} и remote_package_size равен {remote_package_size}."
)
rmtree(local_package_path)
else:
LOGGER.debug(f"Пакет {local_package_path} локально не существует.")
return False
def file_exists(item, local_file):
if item and local_file and os.path.isfile(local_file):
local_file_modified_time = int(os.path.getmtime(local_file))
remote_file_modified_time = int(item.date_modified.timestamp())
local_file_size = os.path.getsize(local_file)
remote_file_size = item.size
if local_file_modified_time == remote_file_modified_time:
LOGGER.debug(f"Изменений не обнаружено. Файл {local_file} пропущен...")
return True
else:
LOGGER.debug(
f"Обнаружены изменения: local_modified_time равно {local_file_modified_time}, "
+ f"remote_modified_time равно {remote_file_modified_time}, "
+ f"local_file_size равен {local_file_size} и remote_file_size равен {remote_file_size}."
)
else:
LOGGER.debug(f"Файл {local_file} локально не существует.")
return False
def process_package(local_file):
return local_file
def is_package(item):
file_is_a_package = False
with item.open(stream=True) as response:
file_is_a_package = response.url and "/packageDownload?" in response.url
return file_is_a_package
def download_file(item, local_file):
if not (item and local_file):
return False
LOGGER.info(f"Загрузка {local_file} ...")
try:
with item.open(stream=True) as response:
with open(local_file, "wb") as file_out:
for chunk in response.iter_content(4 * 1024 * 1024):
file_out.write(chunk)
if response.url and "/packageDownload?" in response.url:
local_file = process_package(local_file=local_file)
item_modified_time = time.mktime(item.date_modified.timetuple())
os.utime(local_file, (item_modified_time, item_modified_time))
except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e:
LOGGER.error(f"Ошибка скачивания {local_file}: {str(e)}")
return False
return local_file
def process_file(item, destination_path, destination_path_export, filters, filters_name, ignore, files, work, convert):
if not (item and destination_path and files is not None):
return False
local_file = os.path.join(destination_path, item.name)
local_file = unicodedata.normalize("NFC", local_file)
if not wanted_file(filters=filters, ignore=ignore, file_path=local_file):
return False
if not wanted_file_name(filters=filters_name, item=item):
return False
files.add(local_file)
item_is_package = is_package(item=item)
if not item_is_package:
if package_exists(item=item, local_package_path=local_file):
for f in Path(local_file).glob("**/*"):
files.add(str(f))
return False
elif file_exists(item=item, local_file=local_file):
return False
local_file = download_file(item=item, local_file=local_file)
if item_is_package:
for f in Path(local_file).glob("**/*"):
f = str(f)
f_normalized = unicodedata.normalize("NFD", f)
if os.path.exists(f):
os.rename(f, f_normalized)
files.add(f_normalized)
for convert_file in convert:
if item.data['name'] == convert_file['name']:
if item.data['extension'] == "numbers":
LOGGER.info(f"Конвертация в xlsx {local_file} ...")
secret = json.dumps({"Type":"wp","Data":convert_file['secret']})
job_id = work.export_response(item.data['docwsid'], secret, item.data['zone'])
try:
while not work.check_job(job_id):
sleep(5)
work.download_file(job_id, destination_path_export, item.data['name'])
local_export_filename = os.path.join(destination_path_export, item.data['name'] + f".xlsx")
LOGGER.info(f"Сконвертированый файл успешно загружен {local_export_filename} ...")
except Exception as e:
LOGGER.error(f"Ошибка конвертации файла {local_file}: {str(e)}")
return True
def remove_obsolete(destination_path, files):
removed_paths = set()
if not (destination_path and files is not None):
return removed_paths
for path in Path(destination_path).rglob("*"):
local_file = str(path.absolute())
if local_file not in files:
LOGGER.info(f"Удаление {local_file} ...")
if path.is_file():
path.unlink(missing_ok=True)
removed_paths.add(local_file)
elif path.is_dir():
rmtree(local_file)
removed_paths.add(local_file)
return removed_paths
def sync_directory(
drive,
work,
destination_path,
destination_path_export,
items,
root,
top=True,
filters=None,
convert=None,
ignore=None,
remove=False,
):
files = set()
if drive and destination_path and items and root:
for i in items:
item = drive[i]
if item.type in ("folder", "app_library"):
new_folder = process_folder(
item=item,
destination_path=destination_path,
filters=filters["folders"]
if filters and "folders" in filters
else None,
ignore=ignore,
root=root,
)
new_folder_export = process_folder(
item=item,
destination_path=destination_path_export,
filters=filters["folders"]
if filters and "folders" in filters
else None,
ignore=ignore,
root=root,
)
if not new_folder:
continue
try:
files.add(unicodedata.normalize("NFC", new_folder))
files.update(
sync_directory(
drive=item,
work=work,
destination_path=new_folder,
destination_path_export=new_folder_export,
items=item.dir(),
root=root,
top=False,
filters=filters,
convert=convert,
ignore=ignore,
)
)
except Exception:
pass
elif item.type == "file":
if wanted_parent_folder(
filters=filters["folders"]
if filters and "folders" in filters
else None,
ignore=ignore,
root=root,
folder_path=destination_path,
):
try:
process_file(
item=item,
destination_path=destination_path,
destination_path_export=destination_path_export,
filters=filters["file_extensions"]
if filters and "file_extensions" in filters
else None,
filters_name=filters["file_name"]
if filters and "file_name" in filters
else None,
ignore=ignore,
files=files,
work=work,
convert=convert,
)
except Exception:
pass
if top and remove:
remove_obsolete(destination_path=destination_path, files=files)
return files
def sync_drive(config, drive, work):
destination_path = config_parser.prepare_drive_destination(config=config)
destination_path_export = config_parser.prepare_drive_destination_export(config=config)
return sync_directory(
drive=drive,
work=work,
destination_path=destination_path,
destination_path_export=destination_path_export,
root=destination_path,
items=drive.dir(),
top=True,
filters=config["drive"]["filters"]
if "drive" in config and "filters" in config["drive"]
else None,
convert=config["drive"]["convert"]
if "drive" in config and "convert" in config["drive"]
else None,
ignore=config["drive"]["ignore"]
if "drive" in config and "ignore" in config["drive"]
else None,
remove=config_parser.get_drive_remove_obsolete(config=config),
)