386 lines
13 KiB
Python
386 lines
13 KiB
Python
|
"""Drive service."""
|
||
|
import io
|
||
|
import json
|
||
|
import mimetypes
|
||
|
import os
|
||
|
import time
|
||
|
from datetime import datetime, timedelta
|
||
|
from re import search
|
||
|
|
||
|
from requests import Response
|
||
|
from six import PY2
|
||
|
|
||
|
|
||
|
class DriveService:
|
||
|
"""The 'Drive' iCloud service."""
|
||
|
|
||
|
def __init__(self, service_root, document_root, session, params):
|
||
|
self._service_root = service_root
|
||
|
self._document_root = document_root
|
||
|
self.session = session
|
||
|
self.params = dict(params)
|
||
|
self._root = None
|
||
|
|
||
|
def _get_token_from_cookie(self):
|
||
|
for cookie in self.session.cookies:
|
||
|
if cookie.name == "X-APPLE-WEBAUTH-VALIDATE":
|
||
|
match = search(r"\bt=([^:]+)", cookie.value)
|
||
|
if match is None:
|
||
|
raise Exception(f"Can't extract token from {cookie.value}")
|
||
|
return {"token": match.group(1)}
|
||
|
raise Exception("Token cookie not found")
|
||
|
|
||
|
def get_node_data(self, drivewsid):
|
||
|
"""Returns the node data."""
|
||
|
request = self.session.post(
|
||
|
self._service_root + "/retrieveItemDetailsInFolders",
|
||
|
params=self.params,
|
||
|
data=json.dumps(
|
||
|
[
|
||
|
{
|
||
|
"drivewsid": drivewsid,
|
||
|
"partialData": False,
|
||
|
}
|
||
|
]
|
||
|
),
|
||
|
)
|
||
|
if not request.ok:
|
||
|
self.session.raise_error(request.status_code, request.reason)
|
||
|
return request.json()[0]
|
||
|
|
||
|
def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs):
|
||
|
"""Returns iCloud Drive file."""
|
||
|
file_params = dict(self.params)
|
||
|
file_params.update({"document_id": file_id})
|
||
|
response = self.session.get(
|
||
|
self._document_root + f"/ws/{zone}/download/by_id",
|
||
|
params=file_params,
|
||
|
)
|
||
|
if not response.ok:
|
||
|
self.session.raise_error(response.status_code, response.reason)
|
||
|
package_token = response.json().get("package_token")
|
||
|
data_token = response.json().get("data_token")
|
||
|
if data_token and data_token.get("url"):
|
||
|
return self.session.get(data_token["url"], params=self.params, **kwargs)
|
||
|
elif package_token and package_token.get("url"):
|
||
|
return self.session.get(package_token["url"], params=self.params, **kwargs)
|
||
|
else:
|
||
|
raise KeyError("'data_token' nor 'package_token' found in response.")
|
||
|
|
||
|
def get_app_data(self):
|
||
|
"""Returns the app library (previously ubiquity)."""
|
||
|
request = self.session.get(
|
||
|
self._service_root + "/retrieveAppLibraries", params=self.params
|
||
|
)
|
||
|
if not request.ok:
|
||
|
self.session.raise_error(request.status_code, request.reason)
|
||
|
return request.json()["items"]
|
||
|
|
||
|
def get_app_node(self, app_id, folder="documents"):
|
||
|
"""Returns the node of the app (ubiquity)"""
|
||
|
return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder))
|
||
|
|
||
|
def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"):
|
||
|
"""Get the contentWS endpoint URL to add a new file."""
|
||
|
content_type = mimetypes.guess_type(file_object.name)[0]
|
||
|
if content_type is None:
|
||
|
content_type = ""
|
||
|
|
||
|
# Get filesize from file object
|
||
|
orig_pos = file_object.tell()
|
||
|
file_object.seek(0, os.SEEK_END)
|
||
|
file_size = file_object.tell()
|
||
|
file_object.seek(orig_pos, os.SEEK_SET)
|
||
|
|
||
|
file_params = self.params
|
||
|
file_params.update(self._get_token_from_cookie())
|
||
|
|
||
|
request = self.session.post(
|
||
|
self._document_root + f"/ws/{zone}/upload/web",
|
||
|
params=file_params,
|
||
|
headers={"Content-Type": "text/plain"},
|
||
|
data=json.dumps(
|
||
|
{
|
||
|
"filename": file_object.name,
|
||
|
"type": "FILE",
|
||
|
"content_type": content_type,
|
||
|
"size": file_size,
|
||
|
}
|
||
|
),
|
||
|
)
|
||
|
if not request.ok:
|
||
|
self.session.raise_error(request.status_code, request.reason)
|
||
|
return (request.json()[0]["document_id"], request.json()[0]["url"])
|
||
|
|
||
|
def _update_contentws(
|
||
|
self, folder_id, sf_info, document_id, file_object, zone="com.apple.CloudDocs"
|
||
|
):
|
||
|
data = {
|
||
|
"data": {
|
||
|
"signature": sf_info["fileChecksum"],
|
||
|
"wrapping_key": sf_info["wrappingKey"],
|
||
|
"reference_signature": sf_info["referenceChecksum"],
|
||
|
"size": sf_info["size"],
|
||
|
},
|
||
|
"command": "add_file",
|
||
|
"create_short_guid": True,
|
||
|
"document_id": document_id,
|
||
|
"path": {
|
||
|
"starting_document_id": folder_id,
|
||
|
"path": os.path.basename(file_object.name),
|
||
|
},
|
||
|
"allow_conflict": True,
|
||
|
"file_flags": {
|
||
|
"is_writable": True,
|
||
|
"is_executable": False,
|
||
|
"is_hidden": False,
|
||
|
},
|
||
|
"mtime": int(time.time() * 1000),
|
||
|
"btime": int(time.time() * 1000),
|
||
|
}
|
||
|
|
||
|
# Add the receipt if we have one. Will be absent for 0-sized files
|
||
|
if sf_info.get("receipt"):
|
||
|
data["data"].update({"receipt": sf_info["receipt"]})
|
||
|
|
||
|
request = self.session.post(
|
||
|
self._document_root + f"/ws/{zone}/update/documents",
|
||
|
params=self.params,
|
||
|
headers={"Content-Type": "text/plain"},
|
||
|
data=json.dumps(data),
|
||
|
)
|
||
|
if not request.ok:
|
||
|
self.session.raise_error(request.status_code, request.reason)
|
||
|
return request.json()
|
||
|
|
||
|
def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"):
|
||
|
"""Send new file to iCloud Drive."""
|
||
|
document_id, content_url = self._get_upload_contentws_url(file_object, zone)
|
||
|
|
||
|
request = self.session.post(content_url, files={file_object.name: file_object})
|
||
|
if not request.ok:
|
||
|
self.session.raise_error(request.status_code, request.reason)
|
||
|
content_response = request.json()["singleFile"]
|
||
|
|
||
|
self._update_contentws(
|
||
|
folder_id, content_response, document_id, file_object, zone
|
||
|
)
|
||
|
|
||
|
def create_folders(self, parent, name):
|
||
|
"""Creates a new iCloud Drive folder"""
|
||
|
request = self.session.post(
|
||
|
self._service_root + "/createFolders",
|
||
|
params=self.params,
|
||
|
headers={"Content-Type": "text/plain"},
|
||
|
data=json.dumps(
|
||
|
{
|
||
|
"destinationDrivewsId": parent,
|
||
|
"folders": [
|
||
|
{
|
||
|
"clientId": self.params["clientId"],
|
||
|
"name": name,
|
||
|
}
|
||
|
],
|
||
|
}
|
||
|
),
|
||
|
)
|
||
|
return request.json()
|
||
|
|
||
|
def rename_items(self, node_id, etag, name):
|
||
|
"""Renames an iCloud Drive node"""
|
||
|
request = self.session.post(
|
||
|
self._service_root + "/renameItems",
|
||
|
params=self.params,
|
||
|
data=json.dumps(
|
||
|
{
|
||
|
"items": [
|
||
|
{
|
||
|
"drivewsid": node_id,
|
||
|
"etag": etag,
|
||
|
"name": name,
|
||
|
}
|
||
|
],
|
||
|
}
|
||
|
),
|
||
|
)
|
||
|
return request.json()
|
||
|
|
||
|
def move_items_to_trash(self, node_id, etag):
|
||
|
"""Moves an iCloud Drive node to the trash bin"""
|
||
|
request = self.session.post(
|
||
|
self._service_root + "/moveItemsToTrash",
|
||
|
params=self.params,
|
||
|
data=json.dumps(
|
||
|
{
|
||
|
"items": [
|
||
|
{
|
||
|
"drivewsid": node_id,
|
||
|
"etag": etag,
|
||
|
"clientId": self.params["clientId"],
|
||
|
}
|
||
|
],
|
||
|
}
|
||
|
),
|
||
|
)
|
||
|
if not request.ok:
|
||
|
self.session.raise_error(request.status_code, request.reason)
|
||
|
return request.json()
|
||
|
|
||
|
@property
|
||
|
def root(self):
|
||
|
"""Returns the root node."""
|
||
|
if not self._root:
|
||
|
self._root = DriveNode(
|
||
|
self, self.get_node_data("FOLDER::com.apple.CloudDocs::root")
|
||
|
)
|
||
|
return self._root
|
||
|
|
||
|
def __getattr__(self, attr):
|
||
|
return getattr(self.root, attr)
|
||
|
|
||
|
def __getitem__(self, key):
|
||
|
return self.root[key]
|
||
|
|
||
|
|
||
|
class DriveNode:
|
||
|
"""Drive node."""
|
||
|
|
||
|
def __init__(self, conn, data):
|
||
|
self.data = data
|
||
|
self.connection = conn
|
||
|
self._children = None
|
||
|
|
||
|
@property
|
||
|
def name(self):
|
||
|
"""Gets the node name."""
|
||
|
if "extension" in self.data:
|
||
|
return f'{self.data["name"]}.{self.data["extension"]}'
|
||
|
return self.data["name"]
|
||
|
|
||
|
@property
|
||
|
def type(self):
|
||
|
"""Gets the node type."""
|
||
|
node_type = self.data.get("type")
|
||
|
return node_type and node_type.lower()
|
||
|
|
||
|
def get_children(self):
|
||
|
"""Gets the node children."""
|
||
|
if not self._children:
|
||
|
if "items" not in self.data:
|
||
|
self.data.update(self.connection.get_node_data(self.data["drivewsid"]))
|
||
|
if "items" not in self.data:
|
||
|
raise KeyError(f'No items in folder, status: {self.data["status"]}')
|
||
|
self._children = [
|
||
|
DriveNode(self.connection, item_data)
|
||
|
for item_data in self.data["items"]
|
||
|
]
|
||
|
return self._children
|
||
|
|
||
|
@property
|
||
|
def size(self):
|
||
|
"""Gets the node size."""
|
||
|
size = self.data.get("size") # Folder does not have size
|
||
|
if not size:
|
||
|
return None
|
||
|
return int(size)
|
||
|
|
||
|
@property
|
||
|
def date_created(self):
|
||
|
"""Gets the node created date (in UTC)."""
|
||
|
return _date_to_utc(self.data.get("dateCreated"))
|
||
|
|
||
|
@property
|
||
|
def date_changed(self):
|
||
|
"""Gets the node changed date (in UTC)."""
|
||
|
return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date
|
||
|
|
||
|
@property
|
||
|
def date_modified(self):
|
||
|
"""Gets the node modified date (in UTC)."""
|
||
|
return _date_to_utc(self.data.get("dateModified")) # Folder does not have date
|
||
|
|
||
|
@property
|
||
|
def date_last_open(self):
|
||
|
"""Gets the node last open date (in UTC)."""
|
||
|
return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date
|
||
|
|
||
|
def open(self, **kwargs):
|
||
|
"""Gets the node file."""
|
||
|
# iCloud returns 400 Bad Request for 0-byte files
|
||
|
if self.data["size"] == 0:
|
||
|
response = Response()
|
||
|
response.raw = io.BytesIO()
|
||
|
return response
|
||
|
return self.connection.get_file(
|
||
|
self.data["docwsid"], zone=self.data["zone"], **kwargs
|
||
|
)
|
||
|
|
||
|
def upload(self, file_object, **kwargs):
|
||
|
""" "Upload a new file."""
|
||
|
return self.connection.send_file(
|
||
|
self.data["docwsid"], file_object, zone=self.data["zone"], **kwargs
|
||
|
)
|
||
|
|
||
|
def dir(self):
|
||
|
"""Gets the node list of directories."""
|
||
|
if self.type == "file":
|
||
|
return None
|
||
|
return [child.name for child in self.get_children()]
|
||
|
|
||
|
def mkdir(self, folder):
|
||
|
"""Create a new directory directory."""
|
||
|
# remove cached entries information first so that it will be re-read on next get_children()
|
||
|
self._children = None
|
||
|
if "items" in self.data:
|
||
|
self.data.pop("items")
|
||
|
return self.connection.create_folders(self.data["drivewsid"], folder)
|
||
|
|
||
|
def rename(self, name):
|
||
|
"""Rename an iCloud Drive item."""
|
||
|
return self.connection.rename_items(
|
||
|
self.data["drivewsid"], self.data["etag"], name
|
||
|
)
|
||
|
|
||
|
def delete(self):
|
||
|
"""Delete an iCloud Drive item."""
|
||
|
return self.connection.move_items_to_trash(
|
||
|
self.data["drivewsid"], self.data["etag"]
|
||
|
)
|
||
|
|
||
|
def get(self, name):
|
||
|
"""Gets the node child."""
|
||
|
if self.type == "file":
|
||
|
return None
|
||
|
return [child for child in self.get_children() if child.name == name][0]
|
||
|
|
||
|
def __getitem__(self, key):
|
||
|
try:
|
||
|
return self.get(key)
|
||
|
except IndexError as error:
|
||
|
raise KeyError(f"No child named '{key}' exists") from error
|
||
|
|
||
|
def __unicode__(self):
|
||
|
return f"{{type: {self.type}, name: {self.name}}}"
|
||
|
|
||
|
def __str__(self):
|
||
|
as_unicode = self.__unicode__()
|
||
|
if PY2:
|
||
|
return as_unicode.encode("utf-8", "ignore")
|
||
|
return as_unicode
|
||
|
|
||
|
def __repr__(self):
|
||
|
return f"<{type(self).__name__}: {str(self)}>"
|
||
|
|
||
|
|
||
|
def _date_to_utc(date):
|
||
|
if not date:
|
||
|
return None
|
||
|
# jump through hoops to return time in UTC rather than California time
|
||
|
match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date)
|
||
|
if not match:
|
||
|
# Already in UTC
|
||
|
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
|
||
|
base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S")
|
||
|
diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3)))
|
||
|
return base - diff
|