Quick Start - Python

Get started with Importly in your Python application using popular libraries and frameworks.

Setup

Install the required dependencies:

bash
1pip install requests python-dotenv

For async support (optional):

bash
1pip install aiohttp asyncio

1. Get Your API Key

  1. Create an Account at Importly.io
  2. Find your API Key via the api key page
  3. Set it as an environment variable:
bash
1# .env
2IMPORTLY_API_KEY=your_api_key_here
3IMPORTLY_API_URL=https://api.importly.io
4WEBHOOK_URL=https://your-domain.com/webhook/importly

2. Create Importly Client

Create a synchronous client using requests:

python
1# importly_client.py
2import os
3import time
4import requests
5from typing import Optional, Dict, Any
6from dotenv import load_dotenv
7
8load_dotenv()
9
10class ImportlyClient:
11 def __init__(self, api_key: Optional[str] = None):
12 self.api_key = api_key or os.getenv('IMPORTLY_API_KEY')
13 if not self.api_key:
14 raise ValueError("API key is required")
15
16 self.base_url = os.getenv('IMPORTLY_API_URL')
17 self.session = requests.Session()
18 self.session.headers.update({
19 'Authorization': f'Bearer {self.api_key}',
20 'Content-Type': 'application/json'
21 })
22 self.session.timeout = 30
23
24 def import_media(self, url: str, **options) -> Dict[str, Any]:
25 """Import media from a URL"""
26 data = {
27 'url': url,
28 'includeVideo': options.get('include_video', True),
29 'includeAudio': options.get('include_audio', True),
30 'videoQuality': options.get('video_quality', '1080p'),
31 'audioQuality': options.get('audio_quality', 'medium'),
32 }
33
34 webhook_url = options.get('webhook_url')
35 if webhook_url:
36 data['webhookUrl'] = webhook_url
37
38 try:
39 response = self.session.post(f"{self.base_url}/import", json=data)
40 response.raise_for_status()
41 return response.json()
42 except requests.RequestException as e:
43 raise self._handle_error(e)
44
45 def get_metadata(self, url: str, webhook_url: Optional[str] = None) -> Dict[str, Any]:
46 """Get metadata for a URL"""
47 params = {'url': url}
48 if webhook_url:
49 params['webhookUrl'] = webhook_url
50
51 try:
52 response = self.session.get(f"{self.base_url}/metadata", params=params)
53 response.raise_for_status()
54 return response.json()
55 except requests.RequestException as e:
56 raise self._handle_error(e)
57
58 def check_import_status(self, import_id: str) -> Dict[str, Any]:
59 """Check the status of an import"""
60 try:
61 response = self.session.get(f"{self.base_url}/import/status",
62 params={'id': import_id})
63 response.raise_for_status()
64 return response.json()
65 except requests.RequestException as e:
66 raise self._handle_error(e)
67
68 def check_metadata_status(self, job_id: str) -> Dict[str, Any]:
69 """Check the status of a metadata job"""
70 try:
71 response = self.session.get(f"{self.base_url}/metadata/status",
72 params={'id': job_id})
73 response.raise_for_status()
74 return response.json()
75 except requests.RequestException as e:
76 raise self._handle_error(e)
77
78 def wait_for_completion(self, id: str, job_type: str = 'import',
79 max_wait_time: int = 300, poll_interval: int = 5) -> Dict[str, Any]:
80 """Wait for an import or metadata job to complete"""
81 start_time = time.time()
82 check_status = self.check_import_status if job_type == 'import' else self.check_metadata_status
83
84 while time.time() - start_time < max_wait_time:
85 try:
86 result = check_status(id)
87 status = result['data']['status']
88
89 if status == 'completed':
90 return result
91 elif status in ['failed', 'cancelled']:
92 error_msg = result['data'].get('error', 'Unknown error')
93 raise Exception(f"{job_type} {status}: {error_msg}")
94
95 time.sleep(poll_interval)
96 except requests.RequestException as e:
97 if hasattr(e, 'response') and e.response.status_code == 404:
98 raise Exception(f"{job_type} not found")
99 raise
100
101 raise Exception(f"{job_type} timed out after {max_wait_time} seconds")
102
103 def _handle_error(self, error: requests.RequestException) -> Exception:
104 """Handle and format API errors"""
105 if hasattr(error, 'response') and error.response is not None:
106 status_code = error.response.status_code
107 try:
108 error_data = error.response.json()
109 message = error_data.get('message', error_data.get('error', 'API request failed'))
110 except ValueError:
111 message = error.response.text or 'API request failed'
112
113 if status_code == 401:
114 return Exception('Invalid API key')
115 elif status_code == 402:
116 return Exception('Insufficient credits')
117 elif status_code == 429:
118 return Exception('Rate limit exceeded')
119 elif status_code == 400:
120 return Exception(f'Bad request: {message}')
121 else:
122 return Exception(f'API error ({status_code}): {message}')
123 else:
124 return Exception(f'Network error: {str(error)}')

3. Async Client (Optional)

For async applications, create an async client:

python
1# async_importly_client.py
2import os
3import asyncio
4import aiohttp
5from typing import Optional, Dict, Any
6from dotenv import load_dotenv
7
8load_dotenv()
9
10class AsyncImportlyClient:
11 def __init__(self, api_key: Optional[str] = None):
12 self.api_key = api_key or os.getenv('IMPORTLY_API_KEY')
13 if not self.api_key:
14 raise ValueError("API key is required")
15
16 self.base_url = "https://api.importly.io"
17 self.headers = {
18 'Authorization': f'Bearer {self.api_key}',
19 'Content-Type': 'application/json'
20 }
21
22 async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
23 """Make an HTTP request"""
24 url = f"{self.base_url}{endpoint}"
25
26 async with aiohttp.ClientSession(headers=self.headers, timeout=aiohttp.ClientTimeout(30)) as session:
27 async with session.request(method, url, **kwargs) as response:
28 if response.status >= 400:
29 error_text = await response.text()
30 raise aiohttp.ClientError(f"HTTP {response.status}: {error_text}")
31
32 return await response.json()
33
34 async def import_media(self, url: str, **options) -> Dict[str, Any]:
35 """Import media from a URL"""
36 data = {
37 'url': url,
38 'includeVideo': options.get('include_video', True),
39 'includeAudio': options.get('include_audio', True),
40 'videoQuality': options.get('video_quality', '1080p'),
41 'audioQuality': options.get('audio_quality', 'medium'),
42 }
43
44 webhook_url = options.get('webhook_url')
45 if webhook_url:
46 data['webhookUrl'] = webhook_url
47
48 return await self._make_request('POST', '/import', json=data)
49
50 async def get_metadata(self, url: str, webhook_url: Optional[str] = None) -> Dict[str, Any]:
51 """Get metadata for a URL"""
52 params = {'url': url}
53 if webhook_url:
54 params['webhookUrl'] = webhook_url
55
56 return await self._make_request('GET', '/metadata', params=params)
57
58 async def check_import_status(self, import_id: str) -> Dict[str, Any]:
59 """Check the status of an import"""
60 return await self._make_request('GET', '/import/status', params={'id': import_id})
61
62 async def check_metadata_status(self, job_id: str) -> Dict[str, Any]:
63 """Check the status of a metadata job"""
64 return await self._make_request('GET', '/metadata/status', params={'id': job_id})
65
66 async def wait_for_completion(self, id: str, job_type: str = 'import',
67 max_wait_time: int = 300, poll_interval: int = 5) -> Dict[str, Any]:
68 """Wait for an import or metadata job to complete"""
69 start_time = asyncio.get_event_loop().time()
70 check_status = self.check_import_status if job_type == 'import' else self.check_metadata_status
71
72 while asyncio.get_event_loop().time() - start_time < max_wait_time:
73 try:
74 result = await check_status(id)
75 status = result['data']['status']
76
77 if status == 'completed':
78 return result
79 elif status in ['failed', 'cancelled']:
80 error_msg = result['data'].get('error', 'Unknown error')
81 raise Exception(f"{job_type} {status}: {error_msg}")
82
83 await asyncio.sleep(poll_interval)
84 except aiohttp.ClientError as e:
85 if '404' in str(e):
86 raise Exception(f"{job_type} not found")
87 raise
88
89 raise Exception(f"{job_type} timed out after {max_wait_time} seconds")

4. Flask Web Application

Create a Flask app with webhook support:

python
1# app.py
2from flask import Flask, request, jsonify
3from importly_client import ImportlyClient
4import os
5from datetime import datetime
6
7app = Flask(__name__)
8
9# Initialize Importly client
10importly = ImportlyClient()
11
12# In-memory storage (use database in production)
13imports_db = {}
14metadata_db = {}
15
16@app.route('/import', methods=['POST'])
17def import_media():
18 """Start a media import"""
19 try:
20 data = request.get_json()
21 url = data.get('url')
22
23 if not url:
24 return jsonify({'error': 'URL is required'}), 400
25
26 # Start import with webhook
27 result = importly.import_media(
28 url,
29 video_quality=data.get('videoQuality', '1080p'),
30 audio_quality=data.get('audioQuality', 'medium'),
31 webhook_url=os.getenv('WEBHOOK_URL')
32 )
33
34 import_id = result['data']['jobId']
35
36 # Store import info
37 imports_db[import_id] = {
38 'id': import_id,
39 'url': url,
40 'status': 'queued',
41 'created_at': datetime.utcnow().isoformat(),
42 'video_quality': data.get('videoQuality', '1080p'),
43 'audio_quality': data.get('audioQuality', 'medium')
44 }
45
46 return jsonify({
47 'success': True,
48 'import_id': import_id,
49 'status': 'queued',
50 'message': 'Import started successfully'
51 })
52
53 except Exception as e:
54 return jsonify({'error': str(e)}), 500
55
56@app.route('/metadata', methods=['POST'])
57def get_metadata():
58 """Get metadata for a URL"""
59 try:
60 data = request.get_json()
61 url = data.get('url')
62
63 if not url:
64 return jsonify({'error': 'URL is required'}), 400
65
66 result = importly.get_metadata(url, os.getenv('WEBHOOK_URL'))
67 job_id = result['data']['jobId']
68
69 # Store metadata info
70 metadata_db[job_id] = {
71 'id': job_id,
72 'url': url,
73 'status': 'queued',
74 'created_at': datetime.utcnow().isoformat()
75 }
76
77 return jsonify({
78 'success': True,
79 'job_id': job_id,
80 'status': 'queued',
81 'message': 'Metadata request started successfully'
82 })
83
84 except Exception as e:
85 return jsonify({'error': str(e)}), 500
86
87@app.route('/import/<import_id>/status', methods=['GET'])
88def get_import_status(import_id):
89 """Get import status"""
90 try:
91 # Check local storage first
92 local_import = imports_db.get(import_id)
93 if local_import and local_import['status'] == 'completed':
94 return jsonify({'success': True, 'data': local_import})
95
96 # Check with Importly API
97 result = importly.check_import_status(import_id)
98
99 # Update local storage
100 if import_id in imports_db:
101 imports_db[import_id].update(result['data'])
102
103 return jsonify(result)
104
105 except Exception as e:
106 return jsonify({'error': str(e)}), 500
107
108@app.route('/metadata/<job_id>/status', methods=['GET'])
109def get_metadata_status(job_id):
110 """Get metadata status"""
111 try:
112 # Check local storage first
113 local_metadata = metadata_db.get(job_id)
114 if local_metadata and local_metadata['status'] == 'completed':
115 return jsonify({'success': True, 'data': local_metadata})
116
117 # Check with Importly API
118 result = importly.check_metadata_status(job_id)
119
120 # Update local storage
121 if job_id in metadata_db:
122 metadata_db[job_id].update(result['data'])
123
124 return jsonify(result)
125
126 except Exception as e:
127 return jsonify({'error': str(e)}), 500
128
129@app.route('/webhook/importly', methods=['POST'])
130def handle_webhook():
131 """Handle Importly webhooks"""
132 try:
133 data = request.get_json()
134 webhook_type = data.get('type')
135 webhook_data = data.get('data')
136
137 print(f"Received webhook: {webhook_type}")
138
139 if webhook_type == 'import.completed':
140 handle_import_completed(webhook_data)
141 elif webhook_type == 'import.failed':
142 handle_import_failed(webhook_data)
143 elif webhook_type == 'metadata.completed':
144 handle_metadata_completed(webhook_data)
145 elif webhook_type == 'metadata.failed':
146 handle_metadata_failed(webhook_data)
147 else:
148 print(f"Unknown webhook type: {webhook_type}")
149
150 return jsonify({'received': True})
151
152 except Exception as e:
153 print(f"Webhook error: {e}")
154 return jsonify({'error': 'Webhook processing failed'}), 500
155
156def handle_import_completed(data):
157 """Handle completed import webhook"""
158 import_id = data['jobId']
159 result = data['result']
160
161 if import_id in imports_db:
162 imports_db[import_id].update({
163 'status': 'completed',
164 'result': result,
165 'completed_at': datetime.utcnow().isoformat()
166 })
167
168 print(f"Import completed: {import_id}")
169
170 # Add your custom logic here:
171 # - Send notifications
172 # - Update database
173 # - Process the media file
174
175def handle_import_failed(data):
176 """Handle failed import webhook"""
177 import_id = data['jobId']
178 error = data.get('error', 'Unknown error')
179
180 if import_id in imports_db:
181 imports_db[import_id].update({
182 'status': 'failed',
183 'error': error,
184 'failed_at': datetime.utcnow().isoformat()
185 })
186
187 print(f"Import failed: {import_id} - {error}")
188
189def handle_metadata_completed(data):
190 """Handle completed metadata webhook"""
191 job_id = data['jobId']
192 result = data['result']
193
194 if job_id in metadata_db:
195 metadata_db[job_id].update({
196 'status': 'completed',
197 'result': result,
198 'completed_at': datetime.utcnow().isoformat()
199 })
200
201 print(f"Metadata completed: {job_id}")
202
203def handle_metadata_failed(data):
204 """Handle failed metadata webhook"""
205 job_id = data['jobId']
206 error = data.get('error', 'Unknown error')
207
208 if job_id in metadata_db:
209 metadata_db[job_id].update({
210 'status': 'failed',
211 'error': error,
212 'failed_at': datetime.utcnow().isoformat()
213 })
214
215 print(f"Metadata failed: {job_id} - {error}")
216
217@app.route('/imports', methods=['GET'])
218def list_imports():
219 """List all imports"""
220 imports_list = sorted(
221 imports_db.values(),
222 key=lambda x: x['created_at'],
223 reverse=True
224 )
225 return jsonify({'imports': imports_list})
226
227@app.route('/metadata', methods=['GET'])
228def list_metadata():
229 """List all metadata requests"""
230 metadata_list = sorted(
231 metadata_db.values(),
232 key=lambda x: x['created_at'],
233 reverse=True
234 )
235 return jsonify({'metadata': metadata_list})
236
237@app.route('/health', methods=['GET'])
238def health_check():
239 """Health check endpoint"""
240 return jsonify({
241 'status': 'ok',
242 'timestamp': datetime.utcnow().isoformat(),
243 'imports': len(imports_db),
244 'metadata': len(metadata_db)
245 })
246
247if __name__ == '__main__':
248 app.run(debug=True, port=5000)

5. Command Line Tool

Create a CLI tool for easy imports:

python
1# cli.py
2#!/usr/bin/env python3
3
4import sys
5import argparse
6from importly_client import ImportlyClient
7
8def main():
9 parser = argparse.ArgumentParser(description='Importly CLI tool')
10 parser.add_argument('url', help='URL to import')
11 parser.add_argument('--quality', '-q', default='1080p',
12 choices=['480p', '720p', '1080p', '1440p', '2160p'],
13 help='Video quality (default: 1080p)')
14 parser.add_argument('--audio-quality', '-a', default='medium',
15 choices=['low', 'medium', 'high'],
16 help='Audio quality (default: medium)')
17 parser.add_argument('--metadata-only', '-m', action='store_true',
18 help='Only get metadata, don\'t import')
19 parser.add_argument('--no-wait', action='store_true',
20 help='Don\'t wait for completion')
21
22 args = parser.parse_args()
23
24 try:
25 client = ImportlyClient()
26
27 if args.metadata_only:
28 print(f"Getting metadata for: {args.url}")
29 result = client.get_metadata(args.url)
30 job_id = result['data']['jobId']
31 print(f"Metadata job started with ID: {job_id}")
32
33 if not args.no_wait:
34 print("Waiting for completion...")
35 completed = client.wait_for_completion(job_id, 'metadata')
36 print("Metadata completed!")
37 print(f"Title: {completed['data']['result'].get('title', 'N/A')}")
38 print(f"Duration: {completed['data']['result'].get('duration', 'N/A')}s")
39 else:
40 print(f"Starting import for: {args.url}")
41 print(f"Video quality: {args.quality}")
42 print(f"Audio quality: {args.audio_quality}")
43
44 result = client.import_media(
45 args.url,
46 video_quality=args.quality,
47 audio_quality=args.audio_quality
48 )
49
50 import_id = result['data']['jobId']
51 print(f"Import started with ID: {import_id}")
52
53 if not args.no_wait:
54 print("Waiting for completion...")
55 completed = client.wait_for_completion(import_id, 'import')
56
57 print("Import completed!")
58 result_data = completed['data']['result']
59 print(f"Media URL: {result_data['mediaUrl']}")
60 print(f"Credits used: {result_data['creditsUsed']}")
61 print(f"Duration: {result_data['duration']}s")
62 print(f"File size: {result_data['fileSizeBytes']} bytes")
63
64 except Exception as e:
65 print(f"Error: {e}", file=sys.stderr)
66 sys.exit(1)
67
68if __name__ == '__main__':
69 main()

Make it executable:

bash
1chmod +x cli.py

Use it:

bash
1python cli.py "https://example.com/video" --quality 1080p
2python cli.py "https://example.com/video" --metadata-only

6. Running the Application

Start your Flask app:

bash
1python app.py

Test the endpoints:

bash
1# Start an import
2curl -X POST http://localhost:5000/import \
3 -H "Content-Type: application/json" \
4 -d '{"url": "https://example.com/video", "videoQuality": "1080p"}'
5
6# Check status
7curl http://localhost:5000/import/YOUR_IMPORT_ID/status

Django Integration (Optional)

For Django projects, you can integrate Importly into your models and views:

python
1# models.py
2from django.db import models
3import uuid
4
5class MediaImport(models.Model):
6 STATUS_CHOICES = [
7 ('queued', 'Queued'),
8 ('processing', 'Processing'),
9 ('completed', 'Completed'),
10 ('failed', 'Failed'),
11 ]
12
13 id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
14 import_id = models.CharField(max_length=255, unique=True)
15 url = models.URLField()
16 status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='queued')
17 video_quality = models.CharField(max_length=10, default='1080p')
18 audio_quality = models.CharField(max_length=10, default='medium')
19 media_url = models.URLField(blank=True, null=True)
20 credits_used = models.IntegerField(blank=True, null=True)
21 error_message = models.TextField(blank=True, null=True)
22 created_at = models.DateTimeField(auto_now_add=True)
23 completed_at = models.DateTimeField(blank=True, null=True)
24
25 def __str__(self):
26 return f"Import {self.import_id} - {self.status}"

Why Python?

Python is excellent for Importly because:

  • Rich ecosystem - Great libraries for web development (Flask, Django, FastAPI)
  • Data processing - Perfect for handling media metadata and files
  • AI/ML integration - Easy to integrate with ML pipelines
  • Async support - Handle multiple imports concurrently
  • Simple syntax - Quick to implement and maintain

Best Practices

  1. Use virtual environments to manage dependencies
  2. Implement proper logging with Python's logging module
  3. Use databases instead of in-memory storage for production
  4. Add input validation with libraries like Pydantic
  5. Implement caching with Redis for frequently accessed data
  6. Use async/await for high-concurrency applications
  7. Add comprehensive error handling and retries

Production Considerations

  • Database: Use SQLAlchemy with PostgreSQL or similar
  • Task Queue: Implement Celery with Redis/RabbitMQ for background jobs
  • Web Framework: Consider FastAPI for async APIs or Django for full applications
  • Monitoring: Add logging, metrics, and health checks
  • Security: Implement authentication, input validation, and rate limiting

Complete Example

Check out our complete Python example on GitHub for a full implementation with Django/Flask integrations and advanced features.