Skip to content

fix(logging): crash in release builds #3971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,19 @@ final class AWSCloudWatchLoggingSessionController {
}
self.batchSubscription = producer.logBatchPublisher.sink { [weak self] batch in
guard self?.networkMonitor.isOnline == true else { return }

// Capture strong references to consumer and batch before the async task
let strongConsumer = consumer
let strongBatch = batch

Task {
do {
try await consumer.consume(batch: batch)
try await strongConsumer.consume(batch: strongBatch)
} catch {
Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)")
let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription)
Amplify.Hub.dispatch(to: HubChannel.logging, payload: payload)
try batch.complete()
try strongBatch.complete()
}
}
}
Expand Down Expand Up @@ -178,8 +183,15 @@ final class AWSCloudWatchLoggingSessionController {
}

private func consumeLogBatch(_ batch: LogBatch) async throws {
// Check if consumer exists before trying to use it
guard let consumer = self.consumer else {
// If consumer is nil, still mark the batch as completed to prevent memory leaks
try batch.complete()
return
}

do {
try await consumer?.consume(batch: batch)
try await consumer.consume(batch: batch)
} catch {
Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)")
let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,50 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
}
await ensureLogStreamExists()

let batchByteSize = try encoder.encode(entries).count
// Add safety check for nil logStreamName
guard let _ = self.logStreamName else {
Amplify.Logging.error("Log stream name is nil, cannot send logs")
try batch.complete()
return
}

// Wrap encoding in do-catch to prevent crashes
var batchByteSize: Int
do {
batchByteSize = try encoder.encode(entries).count
} catch {
Amplify.Logging.error("Failed to encode log entries: \(error)")
try batch.complete()
return
}

if entries.count > AWSCloudWatchConstants.maxLogEvents {
let smallerEntries = entries.chunked(into: AWSCloudWatchConstants.maxLogEvents)
for entries in smallerEntries {
let entrySize = try encoder.encode(entries).count
if entrySize > AWSCloudWatchConstants.maxBatchByteSize {
let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
for chunk in chunks {
try await sendLogEvents(chunk)
// Wrap in do-catch to prevent crashes
do {
let entrySize = try encoder.encode(entries).count
if entrySize > AWSCloudWatchConstants.maxBatchByteSize {
let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
for chunk in chunks {
try await sendLogEvents(chunk)
}
} else {
try await sendLogEvents(entries)
}
} else {
try await sendLogEvents(entries)
} catch {
Amplify.Logging.error("Error processing log batch: \(error)")
continue
}
}

} else if batchByteSize > AWSCloudWatchConstants.maxBatchByteSize {
let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
for entries in smallerEntries {
try await sendLogEvents(entries)
do {
let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
for entries in smallerEntries {
try await sendLogEvents(entries)
}
} catch {
Amplify.Logging.error("Error chunking log entries: \(error)")
}
} else {
try await sendLogEvents(entries)
Expand All @@ -72,47 +97,103 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
return
}

defer {
ensureLogStreamExistsComplete = true
}

// Only mark as complete after everything has finished successfully
// to avoid potential race conditions with incomplete state

if logStreamName == nil {
self.logStreamName = await formatter.formattedStreamName()
do {
// Explicitly capture self to avoid potential memory issues
let streamName = await self.formatter.formattedStreamName()
// Check if self is still valid and streamName is not nil before assigning
if !streamName.isEmpty {
self.logStreamName = streamName
} else {
// Fallback to a default if the stream name couldn't be determined
self.logStreamName = "default.\(UUID().uuidString)"
}
} catch {
// Handle any potential errors from async call
Amplify.Logging.error("Failed to get formatted stream name: \(error)")
// Fallback to a default
self.logStreamName = "default.\(UUID().uuidString)"
}
}

let stream = try? await self.client.describeLogStreams(input: DescribeLogStreamsInput(
logGroupName: self.logGroupName,
logStreamNamePrefix: self.logStreamName
)).logStreams?.first(where: { stream in
return stream.logStreamName == self.logStreamName
})
if stream != nil {
// Safety check - ensure we have a valid stream name before proceeding
guard let logStreamName = self.logStreamName, !logStreamName.isEmpty else {
Amplify.Logging.error("Invalid log stream name")
ensureLogStreamExistsComplete = true
return
}

_ = try? await self.client.createLogStream(input: CreateLogStreamInput(
logGroupName: self.logGroupName,
logStreamName: self.logStreamName
))

do {
let stream = try? await self.client.describeLogStreams(input: DescribeLogStreamsInput(
logGroupName: self.logGroupName,
logStreamNamePrefix: logStreamName
)).logStreams?.first(where: { stream in
return stream.logStreamName == logStreamName
})

if stream == nil {
_ = try? await self.client.createLogStream(input: CreateLogStreamInput(
logGroupName: self.logGroupName,
logStreamName: logStreamName
))
}

// Mark as complete only after all operations finished
ensureLogStreamExistsComplete = true
} catch {
Amplify.Logging.error("Error ensuring log stream exists: \(error)")
// Still mark as complete to avoid getting stuck in a failed state
ensureLogStreamExistsComplete = true
}
}

private func sendLogEvents(_ entries: [LogEntry]) async throws {
// Safety check for empty entries
if entries.isEmpty {
return
}

// Safety check for logStreamName
guard let logStreamName = self.logStreamName, !logStreamName.isEmpty else {
Amplify.Logging.error("Cannot send log events: Log stream name is nil or empty")
return
}

let events = convertToCloudWatchInputLogEvents(for: entries)
let response = try await self.client.putLogEvents(input: PutLogEventsInput(
logEvents: events,
logGroupName: self.logGroupName,
logStreamName: self.logStreamName,
sequenceToken: nil
))
let retriableEntries = retriable(entries: entries, in: response)
if !retriableEntries.isEmpty {
let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries)
_ = try await self.client.putLogEvents(input: PutLogEventsInput(
logEvents: retriableEvents,

// Safety check for empty events
if events.isEmpty {
Amplify.Logging.warn("No valid events to send to CloudWatch")
return
}

do {
let response = try await self.client.putLogEvents(input: PutLogEventsInput(
logEvents: events,
logGroupName: self.logGroupName,
logStreamName: self.logStreamName,
logStreamName: logStreamName,
sequenceToken: nil
))

// Handle retriable entries
let retriableEntries = retriable(entries: entries, in: response)
if !retriableEntries.isEmpty {
let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries)
if !retriableEvents.isEmpty {
_ = try await self.client.putLogEvents(input: PutLogEventsInput(
logEvents: retriableEvents,
logGroupName: self.logGroupName,
logStreamName: logStreamName,
sequenceToken: nil
))
}
}
} catch {
Amplify.Logging.error("Failed to send log events: \(error)")
throw error
}
}

Expand Down Expand Up @@ -147,19 +228,40 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
var chunks: [[LogEntry]] = []
var chunk: [LogEntry] = []
var currentChunkSize = 0

for entry in entries {
let entrySize = try encoder.encode(entry).count
// Wrap the encoding in a do-catch to handle potential errors
var entrySize: Int
do {
entrySize = try encoder.encode(entry).count
} catch {
Amplify.Logging.error("Failed to encode log entry: \(error)")
// Skip this entry and continue with the next one
continue
}

if currentChunkSize + entrySize < maxByteSize {
chunk.append(entry)
currentChunkSize = currentChunkSize + entrySize
} else {
chunks.append(chunk)
// Only add non-empty chunks
if !chunk.isEmpty {
chunks.append(chunk)
}
chunk = [entry]
currentChunkSize = currentChunkSize + entrySize
currentChunkSize = entrySize
}
}


// Add the last chunk if it's not empty
if !chunk.isEmpty {
chunks.append(chunk)
}

// Return even if chunks is empty to avoid null pointer issues
return chunks
}
// swiftlint:enable shorthand_operator
}


Loading