bars-icloud-drive/icloudpy/services/drive.py

386 lines
13 KiB
Python
Raw Permalink Normal View History

2024-12-17 10:40:56 +00:00
"""Drive service."""
import io
import json
import mimetypes
import os
import time
from datetime import datetime, timedelta
from re import search
from requests import Response
from six import PY2
class DriveService:
"""The 'Drive' iCloud service."""
def __init__(self, service_root, document_root, session, params):
self._service_root = service_root
self._document_root = document_root
self.session = session
self.params = dict(params)
self._root = None
def _get_token_from_cookie(self):
for cookie in self.session.cookies:
if cookie.name == "X-APPLE-WEBAUTH-VALIDATE":
match = search(r"\bt=([^:]+)", cookie.value)
if match is None:
raise Exception(f"Can't extract token from {cookie.value}")
return {"token": match.group(1)}
raise Exception("Token cookie not found")
def get_node_data(self, drivewsid):
"""Returns the node data."""
request = self.session.post(
self._service_root + "/retrieveItemDetailsInFolders",
params=self.params,
data=json.dumps(
[
{
"drivewsid": drivewsid,
"partialData": False,
}
]
),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()[0]
def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs):
"""Returns iCloud Drive file."""
file_params = dict(self.params)
file_params.update({"document_id": file_id})
response = self.session.get(
self._document_root + f"/ws/{zone}/download/by_id",
params=file_params,
)
if not response.ok:
self.session.raise_error(response.status_code, response.reason)
package_token = response.json().get("package_token")
data_token = response.json().get("data_token")
if data_token and data_token.get("url"):
return self.session.get(data_token["url"], params=self.params, **kwargs)
elif package_token and package_token.get("url"):
return self.session.get(package_token["url"], params=self.params, **kwargs)
else:
raise KeyError("'data_token' nor 'package_token' found in response.")
def get_app_data(self):
"""Returns the app library (previously ubiquity)."""
request = self.session.get(
self._service_root + "/retrieveAppLibraries", params=self.params
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()["items"]
def get_app_node(self, app_id, folder="documents"):
"""Returns the node of the app (ubiquity)"""
return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder))
def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"):
"""Get the contentWS endpoint URL to add a new file."""
content_type = mimetypes.guess_type(file_object.name)[0]
if content_type is None:
content_type = ""
# Get filesize from file object
orig_pos = file_object.tell()
file_object.seek(0, os.SEEK_END)
file_size = file_object.tell()
file_object.seek(orig_pos, os.SEEK_SET)
file_params = self.params
file_params.update(self._get_token_from_cookie())
request = self.session.post(
self._document_root + f"/ws/{zone}/upload/web",
params=file_params,
headers={"Content-Type": "text/plain"},
data=json.dumps(
{
"filename": file_object.name,
"type": "FILE",
"content_type": content_type,
"size": file_size,
}
),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return (request.json()[0]["document_id"], request.json()[0]["url"])
def _update_contentws(
self, folder_id, sf_info, document_id, file_object, zone="com.apple.CloudDocs"
):
data = {
"data": {
"signature": sf_info["fileChecksum"],
"wrapping_key": sf_info["wrappingKey"],
"reference_signature": sf_info["referenceChecksum"],
"size": sf_info["size"],
},
"command": "add_file",
"create_short_guid": True,
"document_id": document_id,
"path": {
"starting_document_id": folder_id,
"path": os.path.basename(file_object.name),
},
"allow_conflict": True,
"file_flags": {
"is_writable": True,
"is_executable": False,
"is_hidden": False,
},
"mtime": int(time.time() * 1000),
"btime": int(time.time() * 1000),
}
# Add the receipt if we have one. Will be absent for 0-sized files
if sf_info.get("receipt"):
data["data"].update({"receipt": sf_info["receipt"]})
request = self.session.post(
self._document_root + f"/ws/{zone}/update/documents",
params=self.params,
headers={"Content-Type": "text/plain"},
data=json.dumps(data),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()
def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"):
"""Send new file to iCloud Drive."""
document_id, content_url = self._get_upload_contentws_url(file_object, zone)
request = self.session.post(content_url, files={file_object.name: file_object})
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
content_response = request.json()["singleFile"]
self._update_contentws(
folder_id, content_response, document_id, file_object, zone
)
def create_folders(self, parent, name):
"""Creates a new iCloud Drive folder"""
request = self.session.post(
self._service_root + "/createFolders",
params=self.params,
headers={"Content-Type": "text/plain"},
data=json.dumps(
{
"destinationDrivewsId": parent,
"folders": [
{
"clientId": self.params["clientId"],
"name": name,
}
],
}
),
)
return request.json()
def rename_items(self, node_id, etag, name):
"""Renames an iCloud Drive node"""
request = self.session.post(
self._service_root + "/renameItems",
params=self.params,
data=json.dumps(
{
"items": [
{
"drivewsid": node_id,
"etag": etag,
"name": name,
}
],
}
),
)
return request.json()
def move_items_to_trash(self, node_id, etag):
"""Moves an iCloud Drive node to the trash bin"""
request = self.session.post(
self._service_root + "/moveItemsToTrash",
params=self.params,
data=json.dumps(
{
"items": [
{
"drivewsid": node_id,
"etag": etag,
"clientId": self.params["clientId"],
}
],
}
),
)
if not request.ok:
self.session.raise_error(request.status_code, request.reason)
return request.json()
@property
def root(self):
"""Returns the root node."""
if not self._root:
self._root = DriveNode(
self, self.get_node_data("FOLDER::com.apple.CloudDocs::root")
)
return self._root
def __getattr__(self, attr):
return getattr(self.root, attr)
def __getitem__(self, key):
return self.root[key]
class DriveNode:
"""Drive node."""
def __init__(self, conn, data):
self.data = data
self.connection = conn
self._children = None
@property
def name(self):
"""Gets the node name."""
if "extension" in self.data:
return f'{self.data["name"]}.{self.data["extension"]}'
return self.data["name"]
@property
def type(self):
"""Gets the node type."""
node_type = self.data.get("type")
return node_type and node_type.lower()
def get_children(self):
"""Gets the node children."""
if not self._children:
if "items" not in self.data:
self.data.update(self.connection.get_node_data(self.data["drivewsid"]))
if "items" not in self.data:
raise KeyError(f'No items in folder, status: {self.data["status"]}')
self._children = [
DriveNode(self.connection, item_data)
for item_data in self.data["items"]
]
return self._children
@property
def size(self):
"""Gets the node size."""
size = self.data.get("size") # Folder does not have size
if not size:
return None
return int(size)
@property
def date_created(self):
"""Gets the node created date (in UTC)."""
return _date_to_utc(self.data.get("dateCreated"))
@property
def date_changed(self):
"""Gets the node changed date (in UTC)."""
return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date
@property
def date_modified(self):
"""Gets the node modified date (in UTC)."""
return _date_to_utc(self.data.get("dateModified")) # Folder does not have date
@property
def date_last_open(self):
"""Gets the node last open date (in UTC)."""
return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date
def open(self, **kwargs):
"""Gets the node file."""
# iCloud returns 400 Bad Request for 0-byte files
if self.data["size"] == 0:
response = Response()
response.raw = io.BytesIO()
return response
return self.connection.get_file(
self.data["docwsid"], zone=self.data["zone"], **kwargs
)
def upload(self, file_object, **kwargs):
""" "Upload a new file."""
return self.connection.send_file(
self.data["docwsid"], file_object, zone=self.data["zone"], **kwargs
)
def dir(self):
"""Gets the node list of directories."""
if self.type == "file":
return None
return [child.name for child in self.get_children()]
def mkdir(self, folder):
"""Create a new directory directory."""
# remove cached entries information first so that it will be re-read on next get_children()
self._children = None
if "items" in self.data:
self.data.pop("items")
return self.connection.create_folders(self.data["drivewsid"], folder)
def rename(self, name):
"""Rename an iCloud Drive item."""
return self.connection.rename_items(
self.data["drivewsid"], self.data["etag"], name
)
def delete(self):
"""Delete an iCloud Drive item."""
return self.connection.move_items_to_trash(
self.data["drivewsid"], self.data["etag"]
)
def get(self, name):
"""Gets the node child."""
if self.type == "file":
return None
return [child for child in self.get_children() if child.name == name][0]
def __getitem__(self, key):
try:
return self.get(key)
except IndexError as error:
raise KeyError(f"No child named '{key}' exists") from error
def __unicode__(self):
return f"{{type: {self.type}, name: {self.name}}}"
def __str__(self):
as_unicode = self.__unicode__()
if PY2:
return as_unicode.encode("utf-8", "ignore")
return as_unicode
def __repr__(self):
return f"<{type(self).__name__}: {str(self)}>"
def _date_to_utc(date):
if not date:
return None
# jump through hoops to return time in UTC rather than California time
match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date)
if not match:
# Already in UTC
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S")
diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3)))
return base - diff