bars-icloud-drive/src/sync_drive.py

354 lines
13 KiB
Python

import gzip
import os
import re
import time
import unicodedata
import zipfile
from pathlib import Path, PurePath
from shutil import copyfileobj, rmtree
from time import sleep
import json
from icloudpy import exceptions
from src import LOGGER, config_parser
def wanted_file(filters, ignore, file_path):
if not file_path:
return False
if ignore:
if ignored_path(ignore, file_path):
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
return False
if not filters or len(filters) == 0:
return True
for file_extension in filters:
if re.search(f"{file_extension}$", file_path, re.IGNORECASE):
return True
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
return False
def wanted_file_name(filters, item):
if not filters or len(filters) == 0:
return True
for file_name in filters:
if item.data['name'] == file_name:
return True
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
return False
def wanted_folder(filters, ignore, root, folder_path):
if ignore:
if ignored_path(ignore, folder_path):
return False
if not filters or not folder_path or not root or len(filters) == 0:
return True
folder_path = Path(folder_path)
for folder in filters:
child_path = Path(
os.path.join(
os.path.abspath(root), str(folder).removeprefix("/").removesuffix("/")
)
)
if (
folder_path in child_path.parents
or child_path in folder_path.parents
or folder_path == child_path
):
return True
return False
def ignored_path(ignore_list, path):
for ignore in ignore_list:
if PurePath(path).match(ignore + "*" if ignore.endswith("/") else ignore):
return True
return False
def wanted_parent_folder(filters, ignore, root, folder_path):
if not filters or not folder_path or not root or len(filters) == 0:
return True
folder_path = Path(folder_path)
for folder in filters:
child_path = Path(
os.path.join(
os.path.abspath(root), folder.removeprefix("/").removesuffix("/")
)
)
if child_path in folder_path.parents or folder_path == child_path:
return True
return False
def process_folder(item, destination_path, filters, ignore, root):
if not (item and destination_path and root):
return None
new_directory = os.path.join(destination_path, item.name)
new_directory_norm = unicodedata.normalize("NFC", new_directory)
if not wanted_folder(
filters=filters, ignore=ignore, folder_path=new_directory_norm, root=root
):
LOGGER.debug(f"Пропуск ненужной папки {new_directory} ...")
return None
os.makedirs(new_directory_norm, exist_ok=True)
return new_directory
def package_exists(item, local_package_path):
if item and local_package_path and os.path.isdir(local_package_path):
local_package_modified_time = int(os.path.getmtime(local_package_path))
remote_package_modified_time = int(item.date_modified.timestamp())
local_package_size = sum(
f.stat().st_size
for f in Path(local_package_path).glob("**/*")
if f.is_file()
)
remote_package_size = item.size
if (
local_package_modified_time == remote_package_modified_time
and local_package_size == remote_package_size
):
LOGGER.debug(
f"Изменений не обнаружено. Пропуск пакета {local_package_path} ..."
)
return True
else:
LOGGER.info(
f"Обнаружены изменения: local_modified_time равно {local_package_modified_time}, "
+ f"remote_modified_time равно {remote_package_modified_time}, "
+ f"local_package_size равен {local_package_size} и remote_package_size равен {remote_package_size}."
)
rmtree(local_package_path)
else:
LOGGER.debug(f"Пакет {local_package_path} локально не существует.")
return False
def file_exists(item, local_file):
if item and local_file and os.path.isfile(local_file):
local_file_modified_time = int(os.path.getmtime(local_file))
remote_file_modified_time = int(item.date_modified.timestamp())
local_file_size = os.path.getsize(local_file)
remote_file_size = item.size
if local_file_modified_time == remote_file_modified_time:
LOGGER.debug(f"Изменений не обнаружено. Файл {local_file} пропущен...")
return True
else:
LOGGER.debug(
f"Обнаружены изменения: local_modified_time равно {local_file_modified_time}, "
+ f"remote_modified_time равно {remote_file_modified_time}, "
+ f"local_file_size равен {local_file_size} и remote_file_size равен {remote_file_size}."
)
else:
LOGGER.debug(f"Файл {local_file} локально не существует.")
return False
def process_package(local_file):
return local_file
def is_package(item):
file_is_a_package = False
with item.open(stream=True) as response:
file_is_a_package = response.url and "/packageDownload?" in response.url
return file_is_a_package
def download_file(item, local_file):
if not (item and local_file):
return False
LOGGER.info(f"Загрузка {local_file} ...")
try:
with item.open(stream=True) as response:
with open(local_file, "wb") as file_out:
for chunk in response.iter_content(4 * 1024 * 1024):
file_out.write(chunk)
if response.url and "/packageDownload?" in response.url:
local_file = process_package(local_file=local_file)
item_modified_time = time.mktime(item.date_modified.timetuple())
os.utime(local_file, (item_modified_time, item_modified_time))
except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e:
LOGGER.error(f"Ошибка скачивания {local_file}: {str(e)}")
return False
return local_file
def process_file(item, destination_path, destination_path_export, filters, filters_name, ignore, files, work, convert):
if not (item and destination_path and files is not None):
return False
local_file = os.path.join(destination_path, item.name)
local_file = unicodedata.normalize("NFC", local_file)
if not wanted_file(filters=filters, ignore=ignore, file_path=local_file):
return False
if not wanted_file_name(filters=filters_name, item=item):
return False
files.add(local_file)
item_is_package = is_package(item=item)
if not item_is_package:
if package_exists(item=item, local_package_path=local_file):
for f in Path(local_file).glob("**/*"):
files.add(str(f))
return False
elif file_exists(item=item, local_file=local_file):
return False
local_file = download_file(item=item, local_file=local_file)
if item_is_package:
for f in Path(local_file).glob("**/*"):
f = str(f)
f_normalized = unicodedata.normalize("NFD", f)
if os.path.exists(f):
os.rename(f, f_normalized)
files.add(f_normalized)
for convert_file in convert:
if item.data['name'] == convert_file['name']:
if item.data['extension'] == "numbers":
LOGGER.info(f"Конвертация в xlsx {local_file} ...")
secret = json.dumps({"Type":"wp","Data":convert_file['secret']})
job_id = work.export_response(item.data['docwsid'], secret, item.data['zone'])
try:
while not work.check_job(job_id):
sleep(5)
work.download_file(job_id, destination_path_export, item.data['name'])
local_export_filename = os.path.join(destination_path_export, item.data['name'] + f".xlsx")
LOGGER.info(f"Сконвертированый файл успешно загружен {local_export_filename} ...")
except Exception as e:
LOGGER.error(f"Ошибка конвертации файла {local_file}: {str(e)}")
return True
def remove_obsolete(destination_path, files):
removed_paths = set()
if not (destination_path and files is not None):
return removed_paths
for path in Path(destination_path).rglob("*"):
local_file = str(path.absolute())
if local_file not in files:
LOGGER.info(f"Удаление {local_file} ...")
if path.is_file():
path.unlink(missing_ok=True)
removed_paths.add(local_file)
elif path.is_dir():
rmtree(local_file)
removed_paths.add(local_file)
return removed_paths
def sync_directory(
drive,
work,
destination_path,
destination_path_export,
items,
root,
top=True,
filters=None,
convert=None,
ignore=None,
remove=False,
):
files = set()
if drive and destination_path and items and root:
for i in items:
item = drive[i]
if item.type in ("folder", "app_library"):
new_folder = process_folder(
item=item,
destination_path=destination_path,
filters=filters["folders"]
if filters and "folders" in filters
else None,
ignore=ignore,
root=root,
)
new_folder_export = process_folder(
item=item,
destination_path=destination_path_export,
filters=filters["folders"]
if filters and "folders" in filters
else None,
ignore=ignore,
root=root,
)
if not new_folder:
continue
try:
files.add(unicodedata.normalize("NFC", new_folder))
files.update(
sync_directory(
drive=item,
work=work,
destination_path=new_folder,
destination_path_export=new_folder_export,
items=item.dir(),
root=root,
top=False,
filters=filters,
convert=convert,
ignore=ignore,
)
)
except Exception:
pass
elif item.type == "file":
if wanted_parent_folder(
filters=filters["folders"]
if filters and "folders" in filters
else None,
ignore=ignore,
root=root,
folder_path=destination_path,
):
try:
process_file(
item=item,
destination_path=destination_path,
destination_path_export=destination_path_export,
filters=filters["file_extensions"]
if filters and "file_extensions" in filters
else None,
filters_name=filters["file_name"]
if filters and "file_name" in filters
else None,
ignore=ignore,
files=files,
work=work,
convert=convert,
)
except Exception:
pass
if top and remove:
remove_obsolete(destination_path=destination_path, files=files)
return files
def sync_drive(config, drive, work):
destination_path = config_parser.prepare_drive_destination(config=config)
destination_path_export = config_parser.prepare_drive_destination_export(config=config)
return sync_directory(
drive=drive,
work=work,
destination_path=destination_path,
destination_path_export=destination_path_export,
root=destination_path,
items=drive.dir(),
top=True,
filters=config["drive"]["filters"]
if "drive" in config and "filters" in config["drive"]
else None,
convert=config["drive"]["convert"]
if "drive" in config and "convert" in config["drive"]
else None,
ignore=config["drive"]["ignore"]
if "drive" in config and "ignore" in config["drive"]
else None,
remove=config_parser.get_drive_remove_obsolete(config=config),
)