# This file was auto-generated by Fern from our API Definition. import typing from ..core.client_wrapper import AsyncClientWrapper, SyncClientWrapper from ..core.request_options import RequestOptions from .raw_client import AsyncRawDataExportClient, RawDataExportClient from .types.data_export import DataExport from .types.data_export_export_reporting_data_response import DataExportExportReportingDataResponse # this is used as the default value for optional parameters OMIT = typing.cast(typing.Any, ...) class DataExportClient: def __init__(self, *, client_wrapper: SyncClientWrapper): self._raw_client = RawDataExportClient(client_wrapper=client_wrapper) @property def with_raw_response(self) -> RawDataExportClient: """ Retrieves a raw implementation of this client that returns raw responses. Returns ------- RawDataExportClient """ return self._raw_client def export_reporting_data( self, job_identifier: str, *, app_id: str, client_id: str, request_options: typing.Optional[RequestOptions] = None, ) -> DataExportExportReportingDataResponse: """ Parameters ---------- job_identifier : str Unique identifier of the job. app_id : str The Intercom defined code of the workspace the company is associated to. client_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExportExportReportingDataResponse Job status returned successfully Examples -------- from intercom import Intercom client = Intercom( token="YOUR_TOKEN", ) client.data_export.export_reporting_data( job_identifier="job_identifier", app_id="app_id", client_id="client_id", ) """ _response = self._raw_client.export_reporting_data( job_identifier, app_id=app_id, client_id=client_id, request_options=request_options ) return _response.data def download_reporting_data_export( self, job_identifier: str, *, app_id: str, request_options: typing.Optional[RequestOptions] = None ) -> None: """ Download the data from a completed reporting data export job. > Octet header required > > You will have to specify the header Accept: `application/octet-stream` when hitting this endpoint. Parameters ---------- job_identifier : str app_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- None Examples -------- from intercom import Intercom client = Intercom( token="YOUR_TOKEN", ) client.data_export.download_reporting_data_export( job_identifier="job_identifier", app_id="app_id", ) """ _response = self._raw_client.download_reporting_data_export( job_identifier, app_id=app_id, request_options=request_options ) return _response.data def create( self, *, created_at_after: int, created_at_before: int, request_options: typing.Optional[RequestOptions] = None ) -> DataExport: """ To create your export job, you need to send a `POST` request to the export endpoint `https://api.intercom.io/export/content/data`. The only parameters you need to provide are the range of dates that you want exported. >🚧 Limit of one active job > > You can only have one active job per workspace. You will receive a HTTP status code of 429 with the message Exceeded rate limit of 1 pending message data export jobs if you attempt to create a second concurrent job. >❗️ Updated_at not included > > It should be noted that the timeframe only includes messages sent during the time period and not messages that were only updated during this period. For example, if a message was updated yesterday but sent two days ago, you would need to set the created_at_after date before the message was sent to include that in your retrieval job. >📘 Date ranges are inclusive > > Requesting data for 2018-06-01 until 2018-06-30 will get all data for those days including those specified - e.g. 2018-06-01 00:00:00 until 2018-06-30 23:59:99. Parameters ---------- created_at_after : int The start date that you request data for. It must be formatted as a unix timestamp. created_at_before : int The end date that you request data for. It must be formatted as a unix timestamp. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExport successful Examples -------- from intercom import Intercom client = Intercom( token="YOUR_TOKEN", ) client.data_export.create( created_at_after=1734519776, created_at_before=1734537776, ) """ _response = self._raw_client.create( created_at_after=created_at_after, created_at_before=created_at_before, request_options=request_options ) return _response.data def find(self, job_identifier: str, *, request_options: typing.Optional[RequestOptions] = None) -> DataExport: """ You can view the status of your job by sending a `GET` request to the URL `https://api.intercom.io/export/content/data/{job_identifier}` - the `{job_identifier}` is the value returned in the response when you first created the export job. More on it can be seen in the Export Job Model. > 🚧 Jobs expire after two days > All jobs that have completed processing (and are thus available to download from the provided URL) will have an expiry limit of two days from when the export ob completed. After this, the data will no longer be available. Parameters ---------- job_identifier : str job_identifier request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExport successful Examples -------- from intercom import Intercom client = Intercom( token="YOUR_TOKEN", ) client.data_export.find( job_identifier="job_identifier", ) """ _response = self._raw_client.find(job_identifier, request_options=request_options) return _response.data def cancel(self, job_identifier: str, *, request_options: typing.Optional[RequestOptions] = None) -> DataExport: """ You can cancel your job Parameters ---------- job_identifier : str job_identifier request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExport successful Examples -------- from intercom import Intercom client = Intercom( token="YOUR_TOKEN", ) client.data_export.cancel( job_identifier="job_identifier", ) """ _response = self._raw_client.cancel(job_identifier, request_options=request_options) return _response.data def download(self, job_identifier: str, *, request_options: typing.Optional[RequestOptions] = None) -> None: """ When a job has a status of complete, and thus a filled download_url, you can download your data by hitting that provided URL, formatted like so: https://api.intercom.io/download/content/data/xyz1234. Your exported message data will be streamed continuously back down to you in a gzipped CSV format. > 📘 Octet header required > > You will have to specify the header Accept: `application/octet-stream` when hitting this endpoint. Parameters ---------- job_identifier : str job_identifier request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- None Examples -------- from intercom import Intercom client = Intercom( token="YOUR_TOKEN", ) client.data_export.download( job_identifier="job_identifier", ) """ _response = self._raw_client.download(job_identifier, request_options=request_options) return _response.data class AsyncDataExportClient: def __init__(self, *, client_wrapper: AsyncClientWrapper): self._raw_client = AsyncRawDataExportClient(client_wrapper=client_wrapper) @property def with_raw_response(self) -> AsyncRawDataExportClient: """ Retrieves a raw implementation of this client that returns raw responses. Returns ------- AsyncRawDataExportClient """ return self._raw_client async def export_reporting_data( self, job_identifier: str, *, app_id: str, client_id: str, request_options: typing.Optional[RequestOptions] = None, ) -> DataExportExportReportingDataResponse: """ Parameters ---------- job_identifier : str Unique identifier of the job. app_id : str The Intercom defined code of the workspace the company is associated to. client_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExportExportReportingDataResponse Job status returned successfully Examples -------- import asyncio from intercom import AsyncIntercom client = AsyncIntercom( token="YOUR_TOKEN", ) async def main() -> None: await client.data_export.export_reporting_data( job_identifier="job_identifier", app_id="app_id", client_id="client_id", ) asyncio.run(main()) """ _response = await self._raw_client.export_reporting_data( job_identifier, app_id=app_id, client_id=client_id, request_options=request_options ) return _response.data async def download_reporting_data_export( self, job_identifier: str, *, app_id: str, request_options: typing.Optional[RequestOptions] = None ) -> None: """ Download the data from a completed reporting data export job. > Octet header required > > You will have to specify the header Accept: `application/octet-stream` when hitting this endpoint. Parameters ---------- job_identifier : str app_id : str request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- None Examples -------- import asyncio from intercom import AsyncIntercom client = AsyncIntercom( token="YOUR_TOKEN", ) async def main() -> None: await client.data_export.download_reporting_data_export( job_identifier="job_identifier", app_id="app_id", ) asyncio.run(main()) """ _response = await self._raw_client.download_reporting_data_export( job_identifier, app_id=app_id, request_options=request_options ) return _response.data async def create( self, *, created_at_after: int, created_at_before: int, request_options: typing.Optional[RequestOptions] = None ) -> DataExport: """ To create your export job, you need to send a `POST` request to the export endpoint `https://api.intercom.io/export/content/data`. The only parameters you need to provide are the range of dates that you want exported. >🚧 Limit of one active job > > You can only have one active job per workspace. You will receive a HTTP status code of 429 with the message Exceeded rate limit of 1 pending message data export jobs if you attempt to create a second concurrent job. >❗️ Updated_at not included > > It should be noted that the timeframe only includes messages sent during the time period and not messages that were only updated during this period. For example, if a message was updated yesterday but sent two days ago, you would need to set the created_at_after date before the message was sent to include that in your retrieval job. >📘 Date ranges are inclusive > > Requesting data for 2018-06-01 until 2018-06-30 will get all data for those days including those specified - e.g. 2018-06-01 00:00:00 until 2018-06-30 23:59:99. Parameters ---------- created_at_after : int The start date that you request data for. It must be formatted as a unix timestamp. created_at_before : int The end date that you request data for. It must be formatted as a unix timestamp. request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExport successful Examples -------- import asyncio from intercom import AsyncIntercom client = AsyncIntercom( token="YOUR_TOKEN", ) async def main() -> None: await client.data_export.create( created_at_after=1734519776, created_at_before=1734537776, ) asyncio.run(main()) """ _response = await self._raw_client.create( created_at_after=created_at_after, created_at_before=created_at_before, request_options=request_options ) return _response.data async def find(self, job_identifier: str, *, request_options: typing.Optional[RequestOptions] = None) -> DataExport: """ You can view the status of your job by sending a `GET` request to the URL `https://api.intercom.io/export/content/data/{job_identifier}` - the `{job_identifier}` is the value returned in the response when you first created the export job. More on it can be seen in the Export Job Model. > 🚧 Jobs expire after two days > All jobs that have completed processing (and are thus available to download from the provided URL) will have an expiry limit of two days from when the export ob completed. After this, the data will no longer be available. Parameters ---------- job_identifier : str job_identifier request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExport successful Examples -------- import asyncio from intercom import AsyncIntercom client = AsyncIntercom( token="YOUR_TOKEN", ) async def main() -> None: await client.data_export.find( job_identifier="job_identifier", ) asyncio.run(main()) """ _response = await self._raw_client.find(job_identifier, request_options=request_options) return _response.data async def cancel( self, job_identifier: str, *, request_options: typing.Optional[RequestOptions] = None ) -> DataExport: """ You can cancel your job Parameters ---------- job_identifier : str job_identifier request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- DataExport successful Examples -------- import asyncio from intercom import AsyncIntercom client = AsyncIntercom( token="YOUR_TOKEN", ) async def main() -> None: await client.data_export.cancel( job_identifier="job_identifier", ) asyncio.run(main()) """ _response = await self._raw_client.cancel(job_identifier, request_options=request_options) return _response.data async def download(self, job_identifier: str, *, request_options: typing.Optional[RequestOptions] = None) -> None: """ When a job has a status of complete, and thus a filled download_url, you can download your data by hitting that provided URL, formatted like so: https://api.intercom.io/download/content/data/xyz1234. Your exported message data will be streamed continuously back down to you in a gzipped CSV format. > 📘 Octet header required > > You will have to specify the header Accept: `application/octet-stream` when hitting this endpoint. Parameters ---------- job_identifier : str job_identifier request_options : typing.Optional[RequestOptions] Request-specific configuration. Returns ------- None Examples -------- import asyncio from intercom import AsyncIntercom client = AsyncIntercom( token="YOUR_TOKEN", ) async def main() -> None: await client.data_export.download( job_identifier="job_identifier", ) asyncio.run(main()) """ _response = await self._raw_client.download(job_identifier, request_options=request_options) return _response.data