Skip to content

Commit

Permalink
Merge pull request #57 from ardriveapp/PE-5029-suspend-data-item-vali…
Browse files Browse the repository at this point in the history
…dation-for-cli-uploads-during-download-to-investigate-root-cause

PE-5029: adds a parameter to by pass the download verification
  • Loading branch information
thiagocarvalhodev authored Jan 4, 2024
2 parents 363d4b6 + 00d28cc commit 41d5906
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 119 deletions.
254 changes: 136 additions & 118 deletions lib/src/streams/download.dart
Original file line number Diff line number Diff line change
@@ -1,39 +1,13 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:arweave/arweave.dart';
import 'package:arweave/src/utils/graph_ql_utils.dart';
import 'package:async/async.dart';
import 'package:http/http.dart';
import 'http_client/io.dart' if (dart.library.js) 'http_client/browsers.dart';

String gqlGetTxInfo(String txId) => '''
{
transaction(id: "$txId") {
owner {
key
}
data {
size
}
quantity {
winston
}
fee {
winston
}
anchor
signature
recipient
tags {
name
value
}
bundledIn {
id
}
}
}
''';
import 'http_client/io.dart' if (dart.library.js) 'http_client/browsers.dart';

Future<
(
Expand All @@ -43,47 +17,26 @@ Future<
required String txId,
String? gatewayHost = 'arweave.net',
Function(double progress, int speed)? onProgress,
bool verifyDownload = true,
}) async {
final downloadUrl = "https://$gatewayHost/$txId";
final gqlUrl = "https://$gatewayHost/graphql";

final gqlResponse = await post(
Uri.parse(gqlUrl),
headers: {'Content-Type': 'application/json'},
body: jsonEncode({'query': gqlGetTxInfo(txId)}),
);

if (gqlResponse.statusCode != 200) {
throw Exception('Failed to download $txId');
}

var txData = jsonDecode(gqlResponse.body)['data']['transaction'];

final txAnchor = txData['anchor'];
final txOwner = txData['owner']['key'];
final txTarget = txData['recipient'];
final txSignature = txData['signature'];
final txDataSize = int.parse(txData['data']['size']);
final isDataItem = txData['bundledIn'] != null;
final txQuantity = int.parse(txData['quantity']['winston']);
final txReward = int.parse(txData['fee']['winston']);
final downloadedTags = txData['tags'];
final List<Tag> txTags = [];
for (var tag in downloadedTags) {
txTags.add(createTag(tag['name'], tag['value']));
}

int bytesDownloaded = 0;
StreamSubscription<List<int>>? subscription;
final controller = StreamController<List<int>>();

final txData = await _getTransactionData(
txId: txId,
gatewayHost: gatewayHost!,
);

// keep track of progress and download speed
int lastBytes = 0;
setProgressTimer(onProgress) => Timer.periodic(
Duration(milliseconds: 500),
(Timer timer) {
double progress =
double.parse((bytesDownloaded / txDataSize).toStringAsFixed(2));
double progress = double.parse(
(bytesDownloaded / txData.dataSize).toStringAsFixed(2));
int speed = bytesDownloaded - lastBytes;

onProgress(
Expand All @@ -106,46 +59,62 @@ Future<
final client = getClient();
final streamResponse = await client.send(request);

final splitStream = StreamSplitter(streamResponse.stream);
final downloadStream = splitStream.split();
final verificationStream = splitStream.split();

// Calling `close()` indicates that no further streams will be created,
// causing splitStream to function without an internal buffer.
// The future will be completed when both streams are consumed, so we
// shouldn't await it here.
unawaited(splitStream.close());

_verify(
isDataItem: isDataItem,
id: txId,
owner: txOwner,
signature: txSignature,
target: txTarget,
anchor: txAnchor,
tags: txTags,
dataStream: verificationStream.map((list) => Uint8List.fromList(list)),
reward: txReward,
quantity: txQuantity,
dataSize: txDataSize,
).then((isVerified) {
if (!isVerified) {
controller.addError('failed to verify transaction');
}

subscription?.cancel();
controller.close();
if (onProgress != null) {
progressTimer.cancel();
}
});
Stream<List<int>> downloadStream;

if (verifyDownload) {
final splitStream = StreamSplitter(streamResponse.stream);

downloadStream = splitStream.split();
final verificationStream = splitStream.split();

// Calling `close()` indicates that no further streams will be created,
// causing splitStream to function without an internal buffer.
// The future will be completed when both streams are consumed, so we
// shouldn't await it here.
unawaited(splitStream.close());

_verify(
dataStream: verificationStream,
txData: txData,
txId: txId,
).then((isVerified) {
if (!isVerified) {
// TODO: maybe using a custom Exception here would be better? e.g. ValidationException
controller.addError('failed to verify transaction');
}

controller.close();
subscription?.cancel();
});
} else {
/// If we don't need to verify the download, we can just return the
/// stream directly.
downloadStream = streamResponse.stream;
}

subscription = downloadStream.listen(
(List<int> chunk) {
bytesDownloaded += chunk.length;
controller.sink.add(chunk);
},
cancelOnError: true,
onError: (e) {
print('[arweave]: Error downloading $txId');

controller.addError(e);
controller.close();

if (onProgress != null) {
progressTimer.cancel();
}
},
onDone: () {
if (onProgress != null) {
progressTimer.cancel();
}

controller.close();
},
);
}

Expand Down Expand Up @@ -175,40 +144,89 @@ Future<
return (controller.stream, cancelDownload);
}

_verify({
required bool isDataItem,
required String id,
required String owner,
required String signature,
required String target,
required String anchor,
required List<Tag> tags,
// TODO: maybe move this to a separate file? An utils file maybe?
// We problably have the same logic on ardrive-app project. Maybe we can
// create a package with this logic and use it on both projects.
Future<TransactionData> _getTransactionData({
required String txId,
required String gatewayHost,
}) async {
final gqlUrl = "https://$gatewayHost/graphql";

final gqlResponse = await post(
Uri.parse(gqlUrl),
headers: {'Content-Type': 'application/json'},
body: jsonEncode({'query': gqlGetTxInfo(txId)}),
);

if (gqlResponse.statusCode != 200) {
throw Exception('Failed to download $txId');
}

var txDataJson = jsonDecode(gqlResponse.body)['data']['transaction'];

return TransactionData.fromJson(txDataJson);
}

Future<bool> _verify({
required TransactionData txData,
required String txId,
required Stream<List<int>> dataStream,
required int reward,
required int quantity,
required int dataSize,
}) {
if (isDataItem) {
if (txData.isDataItem) {
return verifyDataItem(
id: id,
owner: owner,
signature: signature,
target: target,
anchor: anchor,
tags: tags,
id: txId,
owner: txData.owner,
signature: txData.signature,
target: txData.target,
anchor: txData.anchor,
tags: txData.tags,
dataStream: dataStream.map((list) => Uint8List.fromList(list)));
} else {
return verifyTransaction(
id: id,
owner: owner,
signature: signature,
target: target,
anchor: anchor,
tags: tags,
reward: reward,
quantity: quantity,
dataSize: dataSize,
id: txId,
owner: txData.owner,
signature: txData.signature,
target: txData.target,
anchor: txData.anchor,
tags: txData.tags,
reward: txData.reward,
quantity: txData.quantity,
dataSize: txData.dataSize,
dataStream: dataStream.map((list) => Uint8List.fromList(list)),
);
}
}

class TransactionData {
late String anchor;
late String owner;
late String target;
late String signature;
late int dataSize;
late bool isDataItem;
late int quantity;
late int reward;
late List<Tag> tags;

TransactionData.fromJson(Map<String, dynamic> json) {
parseData(json);
}

void parseData(Map<String, dynamic> jsonData) {
anchor = jsonData['anchor'];
owner = jsonData['owner']['key'];
target = jsonData['recipient'];
signature = jsonData['signature'];
dataSize = int.parse(jsonData['data']['size']);
isDataItem = jsonData['bundledIn'] != null;
quantity = int.parse(jsonData['quantity']['winston']);
reward = int.parse(jsonData['fee']['winston']);

var downloadedTags = jsonData['tags'];
tags = [];
for (var tag in downloadedTags) {
tags.add(createTag(tag['name'], tag['value']));
}
}
}
28 changes: 28 additions & 0 deletions lib/src/utils/graph_ql_utils.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
String gqlGetTxInfo(String txId) => '''
{
transaction(id: "$txId") {
owner {
key
}
data {
size
}
quantity {
winston
}
fee {
winston
}
anchor
signature
recipient
tags {
name
value
}
bundledIn {
id
}
}
}
''';
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: arweave
description: ""
version: 3.8.2
version: 3.8.3
environment:
sdk: ">=3.0.0 <4.0.0"
publish_to: none
Expand Down

0 comments on commit 41d5906

Please sign in to comment.