"""Work service.""" import io import json import mimetypes import os import time import shutil from datetime import datetime, timedelta from re import search from requests import Response from six import PY2 from src import LOGGER, config_parser class WorkService: def __init__(self, document_root, session, params, client_id, dsid): self._document_root = document_root self.session = session self.params = dict(params) self.client_id = client_id self.dsid = dsid self._root = None def export_response(self, document_id, secret, zone): file_params = dict(self.params) file_params.update({"clientBuildNumber": "current"}) file_params.update({"clientMasteringNumber": "Mcurrent"}) file_params.update({"dsid": self.dsid}) request = self.session.post( self._document_root + f"/iw/export-ws/{self.dsid}/export_document", params=file_params, headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "primary": "primary", "document_type": "numbers", "format": "org.openxmlformats.spreadsheetml.sheet", "locale": "en", "encrypt_result": "N", "secret": secret, "document_id": document_id, "zone": zone, } , ) if not request.ok: self.session.raise_error(request.status_code, request.reason) return request.json()["job_id"] def check_job(self, job_id): url = self._document_root + f"/iw/export-ws/{self.dsid}/check_export_status" file_params = dict(self.params) file_params.update({"build": "primary"}) file_params.update({"clientBuildNumber": "current"}) file_params.update({"clientMasteringNumber": "Mcurrent"}) file_params.update({"job_id": job_id}) file_params.update({"dsid": self.dsid}) request = self.session.post( url, params=file_params, headers={"Content-Type": "text/plain"}, data={}, ) if not request.ok: self.session.raise_error(request.status_code, request.reason) if request.json()["job_status"] == "failure": raise Exception(request.json()["job_status"]) if request.json()["job_status"] == "success": return True else: return False def download_file(self, job_id, destination_path, name): url = self._document_root + f"/iw/export-ws/{self.dsid}/download_exported_document" file_params = dict(self.params) file_params.update({"build": "primary"}) file_params.update({"file_name": name + f".xlsx"}) file_params.update({"job_id": job_id}) local_filename = os.path.join(destination_path, name + f".xlsx") try: response = self.session.get(url, params=file_params, stream=True) with open(local_filename, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) except self.session.exceptions.RequestException as e: raise Exception(f"Ошибка скачивания сконцентрированного файла {local_filename}: {str(e)}") return local_filename