import gzip import os import re import time import unicodedata import zipfile from pathlib import Path, PurePath from shutil import copyfileobj, rmtree from time import sleep import json from icloudpy import exceptions from src import LOGGER, config_parser def wanted_file(filters, ignore, file_path): if not file_path: return False if ignore: if ignored_path(ignore, file_path): LOGGER.debug(f"Пропуск ненужного файла {file_path}") return False if not filters or len(filters) == 0: return True for file_extension in filters: if re.search(f"{file_extension}$", file_path, re.IGNORECASE): return True LOGGER.debug(f"Пропуск ненужного файла {file_path}") return False def wanted_file_name(filters, item): if not filters or len(filters) == 0: return True for file_name in filters: if item.data['name'] == file_name: return True LOGGER.debug(f"Пропуск ненужного файла {file_path}") return False def wanted_folder(filters, ignore, root, folder_path): if ignore: if ignored_path(ignore, folder_path): return False if not filters or not folder_path or not root or len(filters) == 0: return True folder_path = Path(folder_path) for folder in filters: child_path = Path( os.path.join( os.path.abspath(root), str(folder).removeprefix("/").removesuffix("/") ) ) if ( folder_path in child_path.parents or child_path in folder_path.parents or folder_path == child_path ): return True return False def ignored_path(ignore_list, path): for ignore in ignore_list: if PurePath(path).match(ignore + "*" if ignore.endswith("/") else ignore): return True return False def wanted_parent_folder(filters, ignore, root, folder_path): if not filters or not folder_path or not root or len(filters) == 0: return True folder_path = Path(folder_path) for folder in filters: child_path = Path( os.path.join( os.path.abspath(root), folder.removeprefix("/").removesuffix("/") ) ) if child_path in folder_path.parents or folder_path == child_path: return True return False def process_folder(item, destination_path, filters, ignore, root): if not (item and destination_path and root): return None new_directory = os.path.join(destination_path, item.name) new_directory_norm = unicodedata.normalize("NFC", new_directory) if not wanted_folder( filters=filters, ignore=ignore, folder_path=new_directory_norm, root=root ): LOGGER.debug(f"Пропуск ненужной папки {new_directory} ...") return None os.makedirs(new_directory_norm, exist_ok=True) return new_directory def package_exists(item, local_package_path): if item and local_package_path and os.path.isdir(local_package_path): local_package_modified_time = int(os.path.getmtime(local_package_path)) remote_package_modified_time = int(item.date_modified.timestamp()) local_package_size = sum( f.stat().st_size for f in Path(local_package_path).glob("**/*") if f.is_file() ) remote_package_size = item.size if ( local_package_modified_time == remote_package_modified_time and local_package_size == remote_package_size ): LOGGER.debug( f"Изменений не обнаружено. Пропуск пакета {local_package_path} ..." ) return True else: LOGGER.info( f"Обнаружены изменения: local_modified_time равно {local_package_modified_time}, " + f"remote_modified_time равно {remote_package_modified_time}, " + f"local_package_size равен {local_package_size} и remote_package_size равен {remote_package_size}." ) rmtree(local_package_path) else: LOGGER.debug(f"Пакет {local_package_path} локально не существует.") return False def file_exists(item, local_file): if item and local_file and os.path.isfile(local_file): local_file_modified_time = int(os.path.getmtime(local_file)) remote_file_modified_time = int(item.date_modified.timestamp()) local_file_size = os.path.getsize(local_file) remote_file_size = item.size if local_file_modified_time == remote_file_modified_time: LOGGER.debug(f"Изменений не обнаружено. Файл {local_file} пропущен...") return True else: LOGGER.debug( f"Обнаружены изменения: local_modified_time равно {local_file_modified_time}, " + f"remote_modified_time равно {remote_file_modified_time}, " + f"local_file_size равен {local_file_size} и remote_file_size равен {remote_file_size}." ) else: LOGGER.debug(f"Файл {local_file} локально не существует.") return False def process_package(local_file): return local_file def is_package(item): file_is_a_package = False with item.open(stream=True) as response: file_is_a_package = response.url and "/packageDownload?" in response.url return file_is_a_package def download_file(item, local_file): if not (item and local_file): return False LOGGER.info(f"Загрузка {local_file} ...") try: with item.open(stream=True) as response: with open(local_file, "wb") as file_out: for chunk in response.iter_content(4 * 1024 * 1024): file_out.write(chunk) if response.url and "/packageDownload?" in response.url: local_file = process_package(local_file=local_file) item_modified_time = time.mktime(item.date_modified.timetuple()) os.utime(local_file, (item_modified_time, item_modified_time)) except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e: LOGGER.error(f"Ошибка скачивания {local_file}: {str(e)}") return False return local_file def process_file(item, destination_path, destination_path_export, filters, filters_name, ignore, files, work, convert): if not (item and destination_path and files is not None): return False local_file = os.path.join(destination_path, item.name) local_file = unicodedata.normalize("NFC", local_file) if not wanted_file(filters=filters, ignore=ignore, file_path=local_file): return False if not wanted_file_name(filters=filters_name, item=item): return False files.add(local_file) item_is_package = is_package(item=item) if not item_is_package: if package_exists(item=item, local_package_path=local_file): for f in Path(local_file).glob("**/*"): files.add(str(f)) return False elif file_exists(item=item, local_file=local_file): return False local_file = download_file(item=item, local_file=local_file) if item_is_package: for f in Path(local_file).glob("**/*"): f = str(f) f_normalized = unicodedata.normalize("NFD", f) if os.path.exists(f): os.rename(f, f_normalized) files.add(f_normalized) for convert_file in convert: if item.data['name'] == convert_file['name']: if item.data['extension'] == "numbers": LOGGER.info(f"Конвертация в xlsx {local_file} ...") secret = json.dumps({"Type":"wp","Data":convert_file['secret']}) job_id = work.export_response(item.data['docwsid'], secret, item.data['zone']) try: while not work.check_job(job_id): sleep(5) work.download_file(job_id, destination_path_export, item.data['name']) local_export_filename = os.path.join(destination_path_export, item.data['name'] + f".xlsx") LOGGER.info(f"Сконвертированый файл успешно загружен {local_export_filename} ...") except Exception as e: LOGGER.error(f"Ошибка конвертации файла {local_file}: {str(e)}") return True def remove_obsolete(destination_path, files): removed_paths = set() if not (destination_path and files is not None): return removed_paths for path in Path(destination_path).rglob("*"): local_file = str(path.absolute()) if local_file not in files: LOGGER.info(f"Удаление {local_file} ...") if path.is_file(): path.unlink(missing_ok=True) removed_paths.add(local_file) elif path.is_dir(): rmtree(local_file) removed_paths.add(local_file) return removed_paths def sync_directory( drive, work, destination_path, destination_path_export, items, root, top=True, filters=None, convert=None, ignore=None, remove=False, ): files = set() if drive and destination_path and items and root: for i in items: item = drive[i] if item.type in ("folder", "app_library"): new_folder = process_folder( item=item, destination_path=destination_path, filters=filters["folders"] if filters and "folders" in filters else None, ignore=ignore, root=root, ) new_folder_export = process_folder( item=item, destination_path=destination_path_export, filters=filters["folders"] if filters and "folders" in filters else None, ignore=ignore, root=root, ) if not new_folder: continue try: files.add(unicodedata.normalize("NFC", new_folder)) files.update( sync_directory( drive=item, work=work, destination_path=new_folder, destination_path_export=new_folder_export, items=item.dir(), root=root, top=False, filters=filters, convert=convert, ignore=ignore, ) ) except Exception: pass elif item.type == "file": if wanted_parent_folder( filters=filters["folders"] if filters and "folders" in filters else None, ignore=ignore, root=root, folder_path=destination_path, ): try: process_file( item=item, destination_path=destination_path, destination_path_export=destination_path_export, filters=filters["file_extensions"] if filters and "file_extensions" in filters else None, filters_name=filters["file_name"] if filters and "file_name" in filters else None, ignore=ignore, files=files, work=work, convert=convert, ) except Exception: pass if top and remove: remove_obsolete(destination_path=destination_path, files=files) return files def sync_drive(config, drive, work): destination_path = config_parser.prepare_drive_destination(config=config) destination_path_export = config_parser.prepare_drive_destination_export(config=config) return sync_directory( drive=drive, work=work, destination_path=destination_path, destination_path_export=destination_path_export, root=destination_path, items=drive.dir(), top=True, filters=config["drive"]["filters"] if "drive" in config and "filters" in config["drive"] else None, convert=config["drive"]["convert"] if "drive" in config and "convert" in config["drive"] else None, ignore=config["drive"]["ignore"] if "drive" in config and "ignore" in config["drive"] else None, remove=config_parser.get_drive_remove_obsolete(config=config), )