Skip to content

Commit 28b2b91

Browse files
committed
Updates.
1 parent 4d8edb7 commit 28b2b91

File tree

7 files changed

+692
-454
lines changed

7 files changed

+692
-454
lines changed

ac-etl/cloud-functions/email-nfe-warehouse/README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ Example Cloud deployment
4242
# From ac-etl/cloud-functions/email-nfe-warehouse
4343
4444
gcloud functions deploy ac-email-processing \
45-
--entry-point processEmails
46-
--runtime nodejs18
47-
--gen2
48-
--trigger-topic=your-topic-id
49-
--env-vars-file .env.yaml
50-
--max-instances=1
45+
--entry-point processEmails \
46+
--runtime nodejs18 \
47+
--gen2 \
48+
--trigger-topic=your-topic-id \
49+
--env-vars-file .env.yaml \
50+
--max-instances=1 \
5151
--concurrency=1
5252
```
5353

ac-etl/cloud-functions/email-nfe-warehouse/package-lock.json

+497-254
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ac-etl/cloud-functions/email-nfe-warehouse/src/bigquery.ts

+30-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { BigQuery } from '@google-cloud/bigquery';
1+
import { BigQuery, RowsResponse } from '@google-cloud/bigquery';
22
import assert from 'assert';
33
import * as Ac from './util';
44

@@ -49,6 +49,34 @@ export async function getBigQueryClient(): Promise<BigQuery> {
4949
return bigQuery;
5050
}
5151

52+
export async function coundExistingByNfeId(nfeId: string): Promise<number> {
53+
const bigQuery = await getBigQueryClient();
54+
const datasetId: string = 'ac_ops_data';
55+
const invoiceTableId = 'base-nfe-supplier-invoice';
56+
57+
try {
58+
// Build the query to filter rows based on the column and value
59+
const query = `
60+
SELECT *
61+
FROM \`${datasetId}.${invoiceTableId}\`
62+
WHERE ${nfeId} = @filterValue
63+
`;
64+
const options = {
65+
query,
66+
params: { nfeId },
67+
};
68+
const [rows] = await bigQuery.query(options);
69+
const rowCount = rows.length;
70+
71+
console.log(`Number of rows retrieved: ${rowCount}`);
72+
return rowCount;
73+
}
74+
catch (error) {
75+
console.error('Error retrieving filtered rows from BigQuery:', error);
76+
throw error;
77+
}
78+
}
79+
5280
export async function insertInvoiceRecords(invoices: InvoiceRecord[], invoiceLines: InvoiceRecordLine[], logId: string) {
5381
bigQuery = await getBigQueryClient();
5482
const datasetId: string = 'ac_ops_data';
@@ -69,6 +97,7 @@ export async function insertInvoiceRecords(invoices: InvoiceRecord[], invoiceLin
6997
console.log(`${logId}/Inserted ${invoices.length} invoice headers`);
7098
}
7199
catch (err: any) {
100+
Ac.socialiseIt(`:grimacing: There was a BigQuery error, probably not good. (${logId})`);
72101
if (err.name === 'PartialFailureError') {
73102
console.error(`${logId}/BigQuery PartialFailureError` + JSON.stringify(err.errors));
74103
} else {

ac-etl/cloud-functions/email-nfe-warehouse/src/gmail.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ async function getAuthClient(keyFile: string, gmailUser: string, authScopes: str
4646
return await auth.getClient();
4747
}
4848
catch (error) {
49-
console.log([`Error getting Google Auth client for ${gmailUser}:`, error]);
49+
console.error([`Error getting Google Auth client for ${gmailUser}:`, error]);
5050
}
5151
}
5252

@@ -65,7 +65,7 @@ export async function getGmailClient(): Promise<gmail_v1.Gmail> {
6565
return gmail;
6666
}
6767
catch(error) {
68-
console.log(['Error getting Gmail client', error]);
68+
console.error(['Error getting Gmail client', error]);
6969
throw new Error('Error getting Gmail client');
7070
}
7171
}
@@ -108,7 +108,7 @@ export async function getMessages(query: string = '', label: string): Promise<Gm
108108
}
109109
catch (error) {
110110
if (res.data.resultSizeEstimate == undefined ) {
111-
console.log("Not iterable?", res.data);
111+
console.error("Not iterable?", res.data);
112112
throw new Error("Messages from Gmail were not iterable gmail messages. See log.");
113113
}
114114
return [];
@@ -147,14 +147,14 @@ export async function getMessages(query: string = '', label: string): Promise<Gm
147147
});
148148
// Not quite prepared to assert on this yet.
149149
if (!isGmailMessage(res.data)) {
150-
console.log(["Probably fatal.. there a gmail message in this response?", res]);
150+
console.error(["Probably fatal.. there a gmail message in this response?", res]);
151151
}
152152
return res.data;
153153
}) || [];
154154
return await Promise.all(messagePromises);
155155
}
156156
catch (error) {
157-
console.log(error);
157+
console.error(error);
158158
throw new Error("Failed to load all gmail messages.")
159159
}
160160
}

ac-etl/cloud-functions/email-nfe-warehouse/src/index.ts

+34-13
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import * as St from './storage';
66
import * as ff from '@google-cloud/functions-framework';
77
import { MessagePublishedData } from '@google/events/cloud/pubsub/v1/MessagePublishedData';
88
import NfeDocument, * as nfe from './nfe';
9-
// import BigQuery from '@google-cloud/bigquery';
10-
// import { isMemberName } from 'typescript';
119

1210
async function processEmailXml(cloudEvent: any): Promise<void> {
1311

@@ -41,23 +39,33 @@ async function processEmailXml(cloudEvent: any): Promise<void> {
4139
Ac.getEnv('GMAIL_LABEL_IN_QUEUE'),
4240
);
4341

44-
if (message.length == 0) {
42+
if (messages.length == 0) {
4543
console.log(`${logId}: Nothing found process, perhaps it recently ran.`);
4644
}
45+
else {
46+
Ac.socialiseIt(`:man-running: Started processing ${messages.length} tagged email(s) with batch sequence ${batchSequence}.`);
47+
}
48+
49+
let messagesProcessed = 0;
50+
let timePassed = 0;
4751

4852
// Keeping this basic and procedural while working out the kinks.
4953
for (const message of messages) {
5054
console.log(`${logId}: ${message.snippet}`);
5155

5256
const currentTime = new Date();
53-
if ((startTime.getTime() - currentTime.getTime()) > 3e4) {
57+
timePassed = currentTime.getTime() - startTime.getTime();
58+
console.log(`${logId}: Passed: ${timePassed}, processed: ${messagesProcessed}`);
59+
if (timePassed > 3e4) {
5460
// Only process for 30 seconds or so.
61+
console.log(`${logId}: Passed: ${timePassed}, processed: ${messagesProcessed}`);
5562
continue;
5663
}
5764

5865
if (message?.payload?.parts) {
5966
for (const part of message.payload.parts) {
6067
if (part?.body?.attachmentId && message.id) {
68+
const messageLink = `https://mail.google.com/mail/u/1/#inbox/${message.id}`;
6169
const attachment = await gmail.users.messages.attachments.get({
6270
id: part.body.attachmentId,
6371
messageId: message.id,
@@ -67,13 +75,20 @@ async function processEmailXml(cloudEvent: any): Promise<void> {
6775
if (attachment?.data?.data && part?.filename) {
6876
try {
6977
if (!part.filename.endsWith('.xml')) {
70-
console.log(`${logId}/IGNORE: ${part.filename}`);
78+
// console.log(`${logId}/IGNORE: ${part.filename}`);
7179
continue;
7280
}
7381
const xml = Buffer.from(attachment.data.data, 'base64').toString();
7482
const nfeDocument = new NfeDocument(xml, part?.filename || 'a file');
83+
const existingCount = await Bq.coundExistingByNfeId(nfeDocument.getDocumentId());
7584

76-
if (nfeDocument.isNfeValid()) {
85+
if (existingCount > 0) {
86+
console.log(`${logId}/SKIP: stubbornly refusing to re-add ${nfeDocument.getDocumentId()} for ${nfeDocument.getSupplierDisplayName()}`);
87+
Ac.socialiseIt(`:sweat_smile: oops looks like ${nfeDocument.getDocumentId()} for ${nfeDocument.getSupplierDisplayName()} was already processed!`);
88+
continue;
89+
}
90+
91+
if (nfeDocument.isNfeValid() && existingCount < 1) {
7792
console.log(`${logId}/VALID: ${part.filename} is a/an ${nfeDocument.getType()} record for ${nfeDocument.getSupplierDisplayName()}`);
7893
if (nfeDocument.getType() == 'invoice') {
7994
invoiceRows.push({
@@ -88,6 +103,7 @@ async function processEmailXml(cloudEvent: any): Promise<void> {
88103
verboseDescription: nfeDocument.getInvoiceDescription(),
89104
batchSequence: batchSequence
90105
});
106+
91107
}
92108
else {
93109
invoiceRows.push({
@@ -116,14 +132,17 @@ async function processEmailXml(cloudEvent: any): Promise<void> {
116132
}
117133
}
118134
catch (error) {
119-
console.log([`${logId}/Error processing ${part.filename}`, error]);
135+
console.error([`${logId}/Error processing`, error]);
136+
Ac.socialiseIt(`:thinking_face: Error processing NFe data for <${messageLink}|${part.filename}> (${logId})}`);
120137
}
121138
// console.log(`${logId}/Finished: ${part.filename}`);
122139
// Ac.writeAttachmentToBucket(bucket, part.filename, 'application/xml', attachment.data.data);
123140
}
124141
}
125142
}
126143
}
144+
messagesProcessed++;
145+
console.log(`${logId}/Error processing`)
127146
}
128147

129148
if (invoiceRows.length > 0 || invoiceLinesRows.length > 0) {
@@ -135,26 +154,28 @@ async function processEmailXml(cloudEvent: any): Promise<void> {
135154
}
136155
catch (error) {
137156
console.log("There was an error inserting records.", error);
138-
Ac.socialiseIt(`There was an error inserting invoices to the data warehouse, data my be inaccurate for these suppliers: ${suppliers.join(", ")}`);
157+
Ac.socialiseIt(`:thinking_face: There was an error inserting invoices to the data warehouse, data my be inaccurate for these suppliers: ${suppliers.join(", ")}`);
139158
return;
140159
}
141160

142161
try {
143162
await Gm.updateMessageLabels(messages, Ac.getEnv('GMAIL_LABEL_IN_QUEUE'), Ac.getEnv('GMAIL_LABEL_DONE'));
144163
}
145164
catch (error) {
146-
console.log("There was an error updating labels.", error);
147-
Ac.socialiseIt(`There was an error updating \`ops/in-queue\` and :white_check_mark labels, which may cause period duplicate records for these suppliers: ${suppliers.join(", ")}`);
165+
console.log(`${logId}: There was an error updating labels.`, error);
166+
Ac.socialiseIt(`:thinking_face: There was an error updating \`ops/in-queue\` and \`ops/done\` labels, which may cause duplicate records for these suppliers: ${suppliers.join(", ")}. (${batchSequence})`);
148167
}
149168

150-
console.log(`${logId}: Completed processing ${messages.length} email(s).`);
151-
Ac.socialiseIt(`Processed ${messages.length} tagged email(s) and imported ${invoiceRows.length} NFe documents for these suppliers: ${suppliers.join(", ")}`);
169+
const currentTime = new Date();
170+
timePassed = Math.round((currentTime.getTime() - startTime.getTime()) / 1000);
171+
console.log(`${logId}: Completed processing ${messagesProcessed} email(s).`);
172+
Ac.socialiseIt(`:white_check_mark: Processed ${messagesProcessed} tagged email(s) and imported ${invoiceRows.length} NFe documents for these suppliers: ${suppliers.join(", ")}. The cloud function ran for about ${timePassed} seconds, with batch sequence ${batchSequence}.`);
152173
}
153174
else {
154175
console.log(`${logId}: Nothing found process.`);
176+
Ac.socialiseIt(`:white_check_mark: No documents found to process.`);
155177
}
156178
}
157179

158180
ff.cloudEvent('processEmailXml', processEmailXml);
159-
160181
export { processEmailXml };

ac-etl/cloud-functions/email-nfe-warehouse/src/nfe.ts

+26-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import assert from 'assert';
2-
import { XMLParser } from 'fast-xml-parser';
2+
import { XMLParser, X2jOptions } from 'fast-xml-parser';
33
import { InvoiceRecordLine } from './bigquery';
44

55
/**
@@ -13,15 +13,28 @@ class NfeDocument {
1313

1414
constructor(xmlString: string, debugContext: string) {
1515
try {
16-
const options = {
16+
const options: X2jOptions = {
1717
ignoreAttributes: false,
18-
attributeNamePrefix : "_"
18+
attributeNamePrefix: "_",
19+
parseAttributeValue: true,
20+
trimValues: true,
21+
// Disable entity expansion to prevent XXE attacks
22+
processEntities: false,
23+
// Ignore DOCTYPE declarations
24+
ignoreDeclaration: true,
25+
ignorePiTags: true,
26+
// Set a reasonable limit for string lengths
27+
tagValueProcessor: (tagName: string, tagValue: string) =>
28+
tagValue.length > 1000 ? tagValue.slice(0, 1000) : tagValue,
1929
};
30+
2031
const parser = new XMLParser(options);
2132
this.nfeJson = parser.parse(xmlString);
2233
}
23-
catch {
24-
console.log(`XML parsing error for ${debugContext}`);
34+
catch (error: any) {
35+
console.error(`XML parsing error for ${debugContext}:`, error);
36+
// Consider throwing a custom error or handling this case appropriately
37+
throw new Error(`Failed to parse XML: ${error.message}`);
2538
}
2639
}
2740

@@ -169,13 +182,16 @@ class NfeDocument {
169182
try {
170183
switch (this.getType()) {
171184
case 'invoice':
172-
dateStr = this.nfeJson?.nfeProc?.NFe?.infNFe?.cobr?.dup?.dVenc;
185+
// Sometimes no due date.
186+
dateStr = this.nfeJson?.nfeProc?.NFe?.infNFe?.cobr?.dup?.dVenc || this.getDateTime();
173187
break;
174188
case 'service':
175189
dateStr = this.nfeJson?.CompNfse?.Nfse?.InfNfse?.DataEmissao;
176190
break;
177191
}
178-
} catch (err) {}
192+
} catch (err) {
193+
console.error(`Date fail for ${dateStr}:`, err);
194+
}
179195
let date = new Date(dateStr);
180196
dateStr = date.toISOString().slice(0, 19).replace("T", " ");
181197
return dateStr;
@@ -195,7 +211,9 @@ class NfeDocument {
195211
dateStr = this.nfeJson?.CompNfse?.Nfse?.InfNfse?.DataEmissao;
196212
break;
197213
}
198-
} catch (err) {}
214+
} catch (err) {
215+
console.error(`Date fail for ${dateStr}:`, err);
216+
}
199217
let date = new Date(dateStr);
200218
dateStr = date.toISOString().slice(0, 19).replace("T", " ");
201219
return dateStr;

0 commit comments

Comments
 (0)