1+ import boto3
2+ import json
3+ import sys
4+ import time
5+ import urllib .parse
6+
7+ class ProcessType :
8+ DETECTION = 1
9+ ANALYSIS = 2
10+
11+
12+ class DocumentProcessor :
13+ jobId = ''
14+ textract = boto3 .client ('textract' )
15+ sqs = boto3 .client ('sqs' )
16+ sns = boto3 .client ('sns' )
17+
18+ roleArn = ''
19+ bucket = ''
20+ document = ''
21+
22+ sqsQueueUrl = ''
23+ snsTopicArn = ''
24+ processType = ''
25+
26+ def main (self , bucketName , documentName ):
27+ self .roleArn = 'arn:aws:iam::357171621133:role/ETLlambdaAccessRole'
28+
29+ self .bucket = bucketName
30+ self .document = documentName
31+
32+ self .CreateTopicandQueue ()
33+ self .ProcessDocument (ProcessType .ANALYSIS )
34+ self .DeleteTopicandQueue ()
35+
36+ def ProcessDocument (self , type ):
37+ jobFound = False
38+
39+ self .processType = type
40+ validType = False
41+
42+ # Determine which type of processing to perform
43+ if self .processType == ProcessType .DETECTION :
44+ response = self .textract .start_document_text_detection (DocumentLocation = {'S3Object' : {'Bucket' : self .bucket , 'Name' : self .document }},
45+ NotificationChannel = {'RoleArn' : self .roleArn , 'SNSTopicArn' : self .snsTopicArn })
46+ print ('Processing type: Detection' )
47+ validType = True
48+
49+ if self .processType == ProcessType .ANALYSIS :
50+ response = self .textract .start_document_analysis (DocumentLocation = {'S3Object' : {'Bucket' : self .bucket , 'Name' : self .document }},
51+ FeatureTypes = [
52+ "TABLES" ],
53+ NotificationChannel = {'RoleArn' : self .roleArn , 'SNSTopicArn' : self .snsTopicArn })
54+ print ('Processing type: Analysis' )
55+ validType = True
56+
57+ if validType == False :
58+ print ("Invalid processing type. Choose Detection or Analysis." )
59+ return
60+
61+ print ('Start Job Id: ' + response ['JobId' ])
62+ dotLine = 0
63+ while jobFound == False :
64+ sqsResponse = self .sqs .receive_message (QueueUrl = self .sqsQueueUrl , MessageAttributeNames = ['ALL' ],
65+ MaxNumberOfMessages = 10 )
66+
67+ if sqsResponse :
68+
69+ if 'Messages' not in sqsResponse :
70+ if dotLine < 40 :
71+ print ('.' , end = '' )
72+ dotLine = dotLine + 1
73+ else :
74+ print ()
75+ dotLine = 0
76+ sys .stdout .flush ()
77+ time .sleep (5 )
78+ continue
79+
80+ for message in sqsResponse ['Messages' ]:
81+ notification = json .loads (message ['Body' ])
82+ textMessage = json .loads (notification ['Message' ])
83+ print (textMessage ['JobId' ])
84+ print (textMessage ['Status' ])
85+ if str (textMessage ['JobId' ]) == response ['JobId' ]:
86+ print ('Matching Job Found:' + textMessage ['JobId' ])
87+ jobFound = True
88+ results = self .GetResults (textMessage ['JobId' ])
89+ self .StoreInS3 (results )
90+ self .sqs .delete_message (QueueUrl = self .sqsQueueUrl ,
91+ ReceiptHandle = message ['ReceiptHandle' ])
92+ else :
93+ print ("Job didn't match:" +
94+ str (textMessage ['JobId' ]) + ' : ' + str (response ['JobId' ]))
95+ # Delete the unknown message. Consider sending to dead letter queue
96+ self .sqs .delete_message (QueueUrl = self .sqsQueueUrl ,
97+ ReceiptHandle = message ['ReceiptHandle' ])
98+
99+ print ('Done!' )
100+
101+ # Store the result in a S3 bucket
102+ def StoreInS3 (self , response ):
103+ print ('registering in s3 bucket...' )
104+ outputInJsonText = str (response )
105+ pdfTextExtractionS3ObjectName = self .document .replace ('.pdf' , '' )
106+ pdfTextExtractionS3Bucket = self .bucket
107+
108+ s3 = boto3 .client ('s3' )
109+
110+ outputFileName = pdfTextExtractionS3ObjectName + '.json'
111+ s3 .put_object (Body = outputInJsonText ,
112+ Bucket = pdfTextExtractionS3Bucket , Key = outputFileName )
113+ print ('file ' + outputFileName + ' registered successfully!' )
114+
115+ def CreateTopicandQueue (self ):
116+
117+ millis = str (int (round (time .time () * 1000 )))
118+
119+ # Create SNS topic
120+ snsTopicName = "AmazonTextractTopic" + millis
121+
122+ topicResponse = self .sns .create_topic (Name = snsTopicName )
123+ self .snsTopicArn = topicResponse ['TopicArn' ]
124+
125+ # create SQS queue
126+ sqsQueueName = "AmazonTextractQueue" + millis
127+ self .sqs .create_queue (QueueName = sqsQueueName )
128+ self .sqsQueueUrl = self .sqs .get_queue_url (
129+ QueueName = sqsQueueName )['QueueUrl' ]
130+
131+ attribs = self .sqs .get_queue_attributes (QueueUrl = self .sqsQueueUrl ,
132+ AttributeNames = ['QueueArn' ])['Attributes' ]
133+
134+ sqsQueueArn = attribs ['QueueArn' ]
135+
136+ # Subscribe SQS queue to SNS topic
137+ self .sns .subscribe (
138+ TopicArn = self .snsTopicArn ,
139+ Protocol = 'sqs' ,
140+ Endpoint = sqsQueueArn )
141+
142+ # Authorize SNS to write SQS queue
143+ policy = """{{
144+ "Version":"2012-10-17",
145+ "Statement":[
146+ {{
147+ "Sid":"MyPolicy",
148+ "Effect":"Allow",
149+ "Principal" : {{"AWS" : "*"}},
150+ "Action":"SQS:SendMessage",
151+ "Resource": "{}",
152+ "Condition":{{
153+ "ArnEquals":{{
154+ "aws:SourceArn": "{}"
155+ }}
156+ }}
157+ }}
158+ ]
159+ }}""" .format (sqsQueueArn , self .snsTopicArn )
160+
161+ response = self .sqs .set_queue_attributes (
162+ QueueUrl = self .sqsQueueUrl ,
163+ Attributes = {
164+ 'Policy' : policy
165+ })
166+
167+ def DeleteTopicandQueue (self ):
168+ self .sqs .delete_queue (QueueUrl = self .sqsQueueUrl )
169+ self .sns .delete_topic (TopicArn = self .snsTopicArn )
170+
171+ def GetResults (self , jobId ):
172+ maxResults = 1000
173+ paginationToken = None
174+ finished = False
175+ pages = []
176+
177+ while finished == False :
178+
179+ response = None
180+
181+ if self .processType == ProcessType .ANALYSIS :
182+ if paginationToken == None :
183+ response = self .textract .get_document_analysis (JobId = jobId ,
184+ MaxResults = maxResults )
185+ else :
186+ response = self .textract .get_document_analysis (JobId = jobId ,
187+ MaxResults = maxResults ,
188+ NextToken = paginationToken )
189+
190+ if self .processType == ProcessType .DETECTION :
191+ if paginationToken == None :
192+ response = self .textract .get_document_text_detection (JobId = jobId ,
193+ MaxResults = maxResults )
194+ else :
195+ response = self .textract .get_document_text_detection (JobId = jobId ,
196+ MaxResults = maxResults ,
197+ NextToken = paginationToken )
198+
199+ # Put response on pages list
200+ pages .append (response )
201+ print ('Document Detected.' )
202+
203+ if 'NextToken' in response :
204+ paginationToken = response ['NextToken' ]
205+ else :
206+ finished = True
207+
208+ # convert pages as JSON pattern
209+ pages = json .dumps (pages )
210+ return pages
211+
212+
213+ def lambda_handler (event , context ):
214+ analyzer = DocumentProcessor ()
215+
216+ # Get the object from the event and show its content type
217+ bucket = event ['Records' ][0 ]['s3' ]['bucket' ]['name' ]
218+ key = urllib .parse .unquote_plus (event ['Records' ][0 ]['s3' ]['object' ]['key' ], encoding = 'utf-8' )
219+ try :
220+ analyzer .main (bucket , key )
221+
222+ return 'Processing Done!'
223+
224+ except Exception as e :
225+ print (e )
226+ print ('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.' .format (key , bucket ))
227+ raise e
0 commit comments