-
Notifications
You must be signed in to change notification settings - Fork 654
/
Copy paths3.py
112 lines (89 loc) · 3.37 KB
/
s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
S3 utility module for storing and retrieving images from AWS S3.
"""
import imghdr
import logging
from io import BytesIO
from typing import Optional
import boto3
import httpx
from botocore.exceptions import ClientError
from mypy_boto3_s3.client import S3Client
logger = logging.getLogger(__name__)
# Global variables for S3 configuration
_bucket: Optional[str] = None
_client: Optional[S3Client] = None
_prefix: Optional[str] = None
_cdn_url: Optional[str] = None
def init_s3(bucket: str, cdn_url: str, env: str) -> None:
"""
Initialize S3 configuration.
Args:
bucket: S3 bucket name
cdn_url: CDN URL for the S3 bucket
env: Environment name for the prefix
Raises:
ValueError: If bucket or cdn_url is empty
"""
global _bucket, _client, _prefix, _cdn_url
if not bucket:
raise ValueError("S3 bucket name cannot be empty")
if not cdn_url:
raise ValueError("S3 CDN URL cannot be empty")
_bucket = bucket
_cdn_url = cdn_url
_prefix = f"{env}/intentkit/"
_client = boto3.client("s3")
logger.info(f"S3 initialized with bucket: {bucket}, prefix: {_prefix}")
async def store_image(url: str, key: str) -> str:
"""
Store an image from a URL to S3 asynchronously.
Args:
url: Source URL of the image
key: Key to store the image under (without prefix)
Returns:
str: The CDN URL of the stored image, or the original URL if S3 is not initialized
Raises:
ClientError: If the upload fails
httpx.HTTPError: If the download fails
"""
if not _client or not _bucket or not _prefix or not _cdn_url:
# If S3 is not initialized, log and return the original URL
logger.info("S3 not initialized. Returning original URL.")
return url
try:
# Download the image from the URL asynchronously
async with httpx.AsyncClient() as client:
response = await client.get(url, follow_redirects=True)
response.raise_for_status()
# Prepare the S3 key with prefix
prefixed_key = f"{_prefix}{key}"
# Use BytesIO to create a file-like object that implements read
file_obj = BytesIO(response.content)
# Determine the correct content type
content_type = response.headers.get("Content-Type", "")
if content_type == "binary/octet-stream" or not content_type:
# Try to detect the image type from the content
img_type = imghdr.what(None, h=response.content)
if img_type:
content_type = f"image/{img_type}"
else:
# Default to JPEG if detection fails
content_type = "image/jpeg"
# Upload to S3
_client.upload_fileobj(
file_obj,
_bucket,
prefixed_key,
ExtraArgs={"ContentType": content_type, "ContentDisposition": "inline"},
)
# Return the CDN URL
cdn_url = f"{_cdn_url}/{prefixed_key}"
logger.info(f"Image uploaded successfully to {cdn_url}")
return cdn_url
except httpx.HTTPError as e:
logger.error(f"Failed to download image from URL {url}: {str(e)}")
raise
except ClientError as e:
logger.error(f"Failed to upload image to S3: {str(e)}")
raise