354 lines
13 KiB
Python
354 lines
13 KiB
Python
import gzip
|
|
import os
|
|
import re
|
|
import time
|
|
import unicodedata
|
|
import zipfile
|
|
from pathlib import Path, PurePath
|
|
from shutil import copyfileobj, rmtree
|
|
from time import sleep
|
|
import json
|
|
from icloudpy import exceptions
|
|
|
|
from src import LOGGER, config_parser
|
|
|
|
|
|
def wanted_file(filters, ignore, file_path):
|
|
if not file_path:
|
|
return False
|
|
if ignore:
|
|
if ignored_path(ignore, file_path):
|
|
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
|
|
return False
|
|
if not filters or len(filters) == 0:
|
|
return True
|
|
for file_extension in filters:
|
|
if re.search(f"{file_extension}$", file_path, re.IGNORECASE):
|
|
return True
|
|
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
|
|
return False
|
|
|
|
def wanted_file_name(filters, item):
|
|
if not filters or len(filters) == 0:
|
|
return True
|
|
for file_name in filters:
|
|
if item.data['name'] == file_name:
|
|
return True
|
|
LOGGER.debug(f"Пропуск ненужного файла {file_path}")
|
|
return False
|
|
|
|
|
|
def wanted_folder(filters, ignore, root, folder_path):
|
|
if ignore:
|
|
if ignored_path(ignore, folder_path):
|
|
return False
|
|
|
|
if not filters or not folder_path or not root or len(filters) == 0:
|
|
return True
|
|
folder_path = Path(folder_path)
|
|
for folder in filters:
|
|
child_path = Path(
|
|
os.path.join(
|
|
os.path.abspath(root), str(folder).removeprefix("/").removesuffix("/")
|
|
)
|
|
)
|
|
if (
|
|
folder_path in child_path.parents
|
|
or child_path in folder_path.parents
|
|
or folder_path == child_path
|
|
):
|
|
return True
|
|
return False
|
|
|
|
|
|
def ignored_path(ignore_list, path):
|
|
for ignore in ignore_list:
|
|
if PurePath(path).match(ignore + "*" if ignore.endswith("/") else ignore):
|
|
return True
|
|
return False
|
|
|
|
def wanted_parent_folder(filters, ignore, root, folder_path):
|
|
if not filters or not folder_path or not root or len(filters) == 0:
|
|
return True
|
|
folder_path = Path(folder_path)
|
|
for folder in filters:
|
|
child_path = Path(
|
|
os.path.join(
|
|
os.path.abspath(root), folder.removeprefix("/").removesuffix("/")
|
|
)
|
|
)
|
|
if child_path in folder_path.parents or folder_path == child_path:
|
|
return True
|
|
return False
|
|
|
|
def process_folder(item, destination_path, filters, ignore, root):
|
|
if not (item and destination_path and root):
|
|
return None
|
|
new_directory = os.path.join(destination_path, item.name)
|
|
new_directory_norm = unicodedata.normalize("NFC", new_directory)
|
|
if not wanted_folder(
|
|
filters=filters, ignore=ignore, folder_path=new_directory_norm, root=root
|
|
):
|
|
LOGGER.debug(f"Пропуск ненужной папки {new_directory} ...")
|
|
return None
|
|
os.makedirs(new_directory_norm, exist_ok=True)
|
|
return new_directory
|
|
|
|
|
|
def package_exists(item, local_package_path):
|
|
if item and local_package_path and os.path.isdir(local_package_path):
|
|
local_package_modified_time = int(os.path.getmtime(local_package_path))
|
|
remote_package_modified_time = int(item.date_modified.timestamp())
|
|
local_package_size = sum(
|
|
f.stat().st_size
|
|
for f in Path(local_package_path).glob("**/*")
|
|
if f.is_file()
|
|
)
|
|
remote_package_size = item.size
|
|
if (
|
|
local_package_modified_time == remote_package_modified_time
|
|
and local_package_size == remote_package_size
|
|
):
|
|
LOGGER.debug(
|
|
f"Изменений не обнаружено. Пропуск пакета {local_package_path} ..."
|
|
)
|
|
return True
|
|
else:
|
|
LOGGER.info(
|
|
f"Обнаружены изменения: local_modified_time равно {local_package_modified_time}, "
|
|
+ f"remote_modified_time равно {remote_package_modified_time}, "
|
|
+ f"local_package_size равен {local_package_size} и remote_package_size равен {remote_package_size}."
|
|
)
|
|
rmtree(local_package_path)
|
|
else:
|
|
LOGGER.debug(f"Пакет {local_package_path} локально не существует.")
|
|
return False
|
|
|
|
|
|
def file_exists(item, local_file):
|
|
if item and local_file and os.path.isfile(local_file):
|
|
local_file_modified_time = int(os.path.getmtime(local_file))
|
|
remote_file_modified_time = int(item.date_modified.timestamp())
|
|
local_file_size = os.path.getsize(local_file)
|
|
remote_file_size = item.size
|
|
|
|
if local_file_modified_time == remote_file_modified_time:
|
|
LOGGER.debug(f"Изменений не обнаружено. Файл {local_file} пропущен...")
|
|
return True
|
|
else:
|
|
LOGGER.debug(
|
|
f"Обнаружены изменения: local_modified_time равно {local_file_modified_time}, "
|
|
+ f"remote_modified_time равно {remote_file_modified_time}, "
|
|
+ f"local_file_size равен {local_file_size} и remote_file_size равен {remote_file_size}."
|
|
)
|
|
else:
|
|
LOGGER.debug(f"Файл {local_file} локально не существует.")
|
|
return False
|
|
|
|
|
|
def process_package(local_file):
|
|
return local_file
|
|
|
|
|
|
def is_package(item):
|
|
file_is_a_package = False
|
|
with item.open(stream=True) as response:
|
|
file_is_a_package = response.url and "/packageDownload?" in response.url
|
|
return file_is_a_package
|
|
|
|
|
|
def download_file(item, local_file):
|
|
if not (item and local_file):
|
|
return False
|
|
LOGGER.info(f"Загрузка {local_file} ...")
|
|
try:
|
|
with item.open(stream=True) as response:
|
|
with open(local_file, "wb") as file_out:
|
|
for chunk in response.iter_content(4 * 1024 * 1024):
|
|
file_out.write(chunk)
|
|
if response.url and "/packageDownload?" in response.url:
|
|
local_file = process_package(local_file=local_file)
|
|
item_modified_time = time.mktime(item.date_modified.timetuple())
|
|
os.utime(local_file, (item_modified_time, item_modified_time))
|
|
except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e:
|
|
LOGGER.error(f"Ошибка скачивания {local_file}: {str(e)}")
|
|
return False
|
|
return local_file
|
|
|
|
|
|
def process_file(item, destination_path, destination_path_export, filters, filters_name, ignore, files, work, convert):
|
|
if not (item and destination_path and files is not None):
|
|
return False
|
|
local_file = os.path.join(destination_path, item.name)
|
|
local_file = unicodedata.normalize("NFC", local_file)
|
|
if not wanted_file(filters=filters, ignore=ignore, file_path=local_file):
|
|
return False
|
|
if not wanted_file_name(filters=filters_name, item=item):
|
|
return False
|
|
files.add(local_file)
|
|
item_is_package = is_package(item=item)
|
|
if not item_is_package:
|
|
if package_exists(item=item, local_package_path=local_file):
|
|
for f in Path(local_file).glob("**/*"):
|
|
files.add(str(f))
|
|
return False
|
|
elif file_exists(item=item, local_file=local_file):
|
|
return False
|
|
local_file = download_file(item=item, local_file=local_file)
|
|
if item_is_package:
|
|
for f in Path(local_file).glob("**/*"):
|
|
f = str(f)
|
|
f_normalized = unicodedata.normalize("NFD", f)
|
|
if os.path.exists(f):
|
|
os.rename(f, f_normalized)
|
|
files.add(f_normalized)
|
|
for convert_file in convert:
|
|
if item.data['name'] == convert_file['name']:
|
|
if item.data['extension'] == "numbers":
|
|
LOGGER.info(f"Конвертация в xlsx {local_file} ...")
|
|
secret = json.dumps({"Type":"wp","Data":convert_file['secret']})
|
|
job_id = work.export_response(item.data['docwsid'], secret, item.data['zone'])
|
|
try:
|
|
while not work.check_job(job_id):
|
|
sleep(5)
|
|
work.download_file(job_id, destination_path_export, item.data['name'])
|
|
local_export_filename = os.path.join(destination_path_export, item.data['name'] + f".xlsx")
|
|
LOGGER.info(f"Сконвертированый файл успешно загружен {local_export_filename} ...")
|
|
except Exception as e:
|
|
LOGGER.error(f"Ошибка конвертации файла {local_file}: {str(e)}")
|
|
return True
|
|
|
|
|
|
def remove_obsolete(destination_path, files):
|
|
removed_paths = set()
|
|
if not (destination_path and files is not None):
|
|
return removed_paths
|
|
for path in Path(destination_path).rglob("*"):
|
|
local_file = str(path.absolute())
|
|
if local_file not in files:
|
|
LOGGER.info(f"Удаление {local_file} ...")
|
|
if path.is_file():
|
|
path.unlink(missing_ok=True)
|
|
removed_paths.add(local_file)
|
|
elif path.is_dir():
|
|
rmtree(local_file)
|
|
removed_paths.add(local_file)
|
|
return removed_paths
|
|
|
|
|
|
def sync_directory(
|
|
drive,
|
|
work,
|
|
destination_path,
|
|
destination_path_export,
|
|
items,
|
|
root,
|
|
top=True,
|
|
filters=None,
|
|
convert=None,
|
|
ignore=None,
|
|
remove=False,
|
|
|
|
):
|
|
files = set()
|
|
if drive and destination_path and items and root:
|
|
for i in items:
|
|
item = drive[i]
|
|
if item.type in ("folder", "app_library"):
|
|
new_folder = process_folder(
|
|
item=item,
|
|
destination_path=destination_path,
|
|
filters=filters["folders"]
|
|
if filters and "folders" in filters
|
|
else None,
|
|
ignore=ignore,
|
|
root=root,
|
|
)
|
|
|
|
new_folder_export = process_folder(
|
|
item=item,
|
|
destination_path=destination_path_export,
|
|
filters=filters["folders"]
|
|
if filters and "folders" in filters
|
|
else None,
|
|
ignore=ignore,
|
|
root=root,
|
|
)
|
|
|
|
if not new_folder:
|
|
continue
|
|
try:
|
|
files.add(unicodedata.normalize("NFC", new_folder))
|
|
files.update(
|
|
sync_directory(
|
|
drive=item,
|
|
work=work,
|
|
destination_path=new_folder,
|
|
destination_path_export=new_folder_export,
|
|
items=item.dir(),
|
|
root=root,
|
|
top=False,
|
|
filters=filters,
|
|
convert=convert,
|
|
ignore=ignore,
|
|
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
elif item.type == "file":
|
|
if wanted_parent_folder(
|
|
filters=filters["folders"]
|
|
if filters and "folders" in filters
|
|
else None,
|
|
ignore=ignore,
|
|
root=root,
|
|
folder_path=destination_path,
|
|
):
|
|
try:
|
|
process_file(
|
|
item=item,
|
|
destination_path=destination_path,
|
|
destination_path_export=destination_path_export,
|
|
filters=filters["file_extensions"]
|
|
if filters and "file_extensions" in filters
|
|
else None,
|
|
filters_name=filters["file_name"]
|
|
if filters and "file_name" in filters
|
|
else None,
|
|
ignore=ignore,
|
|
files=files,
|
|
work=work,
|
|
convert=convert,
|
|
)
|
|
except Exception:
|
|
pass
|
|
if top and remove:
|
|
remove_obsolete(destination_path=destination_path, files=files)
|
|
return files
|
|
|
|
|
|
def sync_drive(config, drive, work):
|
|
destination_path = config_parser.prepare_drive_destination(config=config)
|
|
destination_path_export = config_parser.prepare_drive_destination_export(config=config)
|
|
return sync_directory(
|
|
drive=drive,
|
|
work=work,
|
|
destination_path=destination_path,
|
|
destination_path_export=destination_path_export,
|
|
root=destination_path,
|
|
items=drive.dir(),
|
|
top=True,
|
|
filters=config["drive"]["filters"]
|
|
if "drive" in config and "filters" in config["drive"]
|
|
else None,
|
|
convert=config["drive"]["convert"]
|
|
if "drive" in config and "convert" in config["drive"]
|
|
else None,
|
|
ignore=config["drive"]["ignore"]
|
|
if "drive" in config and "ignore" in config["drive"]
|
|
else None,
|
|
remove=config_parser.get_drive_remove_obsolete(config=config),
|
|
|
|
)
|