diff --git a/src/CoreApi/PubSubApi.cs b/src/CoreApi/PubSubApi.cs index cae6f1b..1313f1c 100644 --- a/src/CoreApi/PubSubApi.cs +++ b/src/CoreApi/PubSubApi.cs @@ -8,6 +8,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using Multiformats.Base; namespace Ipfs.Http { @@ -21,7 +22,7 @@ internal PubSubApi(IpfsClient ipfs) this.ipfs = ipfs; } - public async Task> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken)) + public async Task> SubscribedTopicsAsync(CancellationToken cancel = default) { var json = await ipfs.DoCommandAsync("pubsub/ls", cancel); var result = JObject.Parse(json); @@ -30,61 +31,63 @@ internal PubSubApi(IpfsClient ipfs) return strings.Select(s => (string)s); } - public async Task> PeersAsync(string topic = null, CancellationToken cancel = default(CancellationToken)) + public async Task> PeersAsync(string topic = null, CancellationToken cancel = default) { var json = await ipfs.DoCommandAsync("pubsub/peers", cancel, topic); var result = JObject.Parse(json); var strings = result["Strings"] as JArray; - if (strings == null) return new Peer[0]; + + if (strings == null) + return Array.Empty(); + return strings.Select(s => new Peer { Id = (string)s }); } - public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default(CancellationToken)) + public Task PublishAsync(string topic, byte[] message, CancellationToken cancel = default) { var url = new StringBuilder(); url.Append("/api/v0/pubsub/pub"); - url.Append("?arg="); - url.Append(System.Net.WebUtility.UrlEncode(topic)); - url.Append("&arg="); - var data = Encoding.ASCII.GetString(System.Net.WebUtility.UrlEncodeToBytes(message, 0, message.Length)); - url.Append(data); - return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), cancel); + url.Append("?arg=u"); + url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))); + + return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel); } - public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default(CancellationToken)) + public Task PublishAsync(string topic, Stream message, CancellationToken cancel = default) { - using (MemoryStream ms = new MemoryStream()) - { - message.CopyTo(ms); - return PublishAsync(topic, ms.ToArray(), cancel); - } + var url = new StringBuilder(); + url.Append("/api/v0/pubsub/pub"); + url.Append("?arg=u"); + url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))); + + return ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel); } - public async Task PublishAsync(string topic, string message, CancellationToken cancel = default(CancellationToken)) + public async Task PublishAsync(string topic, string message, CancellationToken cancel = default) { - var _ = await ipfs.DoCommandAsync("pubsub/pub", cancel, topic, "arg=" + message); - return; + var url = new StringBuilder(); + url.Append("/api/v0/pubsub/pub"); + url.Append("?arg=u"); + url.Append(Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))); + + await ipfs.DoCommandAsync(new Uri(ipfs.ApiUri, url.ToString()), message, cancel); } public async Task SubscribeAsync(string topic, Action handler, CancellationToken cancellationToken) { - var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, topic); + var messageStream = await ipfs.PostDownloadAsync("pubsub/sub", cancellationToken, $"u{Multibase.Encode(MultibaseEncoding.Base64Url, Encoding.UTF8.GetBytes(topic))}"); var sr = new StreamReader(messageStream); -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken)); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - - return; + _ = Task.Run(() => ProcessMessages(topic, handler, sr, cancellationToken), cancellationToken); } void ProcessMessages(string topic, Action handler, StreamReader sr, CancellationToken ct) { - log.DebugFormat("Start listening for '{0}' messages", topic); + log.DebugFormat($"Start listening for '{topic}' messages"); // .Net needs a ReadLine(CancellationToken) // As a work-around, we register a function to close the stream - ct.Register(() => sr.Dispose()); + ct.Register(sr.Dispose); try { while (!sr.EndOfStream && !ct.IsCancellationRequested) @@ -92,6 +95,7 @@ void ProcessMessages(string topic, Action handler, StreamReade var json = sr.ReadLine(); if (json == null) break; + if (log.IsDebugEnabled) log.DebugFormat("PubSub message {0}", json); diff --git a/src/IpfsClient.cs b/src/IpfsClient.cs index b8a3e0d..3523df4 100644 --- a/src/IpfsClient.cs +++ b/src/IpfsClient.cs @@ -23,8 +23,7 @@ namespace Ipfs.Http /// IPFS API /// IPFS commands /// - /// IpfsClient is thread safe, only one instance is required - /// by the application. + /// IpfsClient is thread safe, only one instance is required by the application. /// public partial class IpfsClient : ICoreApi { @@ -61,6 +60,7 @@ public IpfsClient() var assembly = typeof(IpfsClient).GetTypeInfo().Assembly; var version = assembly.GetName().Version; + UserAgent = string.Format("{0}/{1}.{2}.{3}", assembly.GetName().Name, version.Major, version.Minor, version.Revision); TrustedPeers = new TrustedPeerCollection(this); @@ -115,7 +115,7 @@ public IpfsClient(string host) /// The list of peers that are initially trusted by IPFS. /// /// - /// This is equilivent to ipfs bootstrap list. + /// This is equivalent to ipfs bootstrap list. /// public TrustedPeerCollection TrustedPeers { get; private set; } @@ -174,6 +174,7 @@ Uri BuildCommand(string command, string arg = null, params string[] options) { var url = "/api/v0/" + command; var q = new StringBuilder(); + if (arg != null) { q.Append("&arg="); @@ -223,16 +224,17 @@ HttpClient Api() { if (api == null) { - var handler = new HttpClientHandler(); - if (handler.SupportsAutomaticDecompression) + if (HttpMessageHandler is HttpClientHandler handler && handler.SupportsAutomaticDecompression) { handler.AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate; } - api = new HttpClient(handler) + + api = new HttpClient(HttpMessageHandler) { - Timeout = System.Threading.Timeout.InfiniteTimeSpan + Timeout = Timeout.InfiniteTimeSpan }; + api.DefaultRequestHeaders.Add("User-Agent", UserAgent); } } @@ -240,6 +242,11 @@ HttpClient Api() return api; } + /// + /// The message handler to use for communicating over HTTP. + /// + public HttpMessageHandler HttpMessageHandler { get; set; } = new HttpClientHandler(); + /// /// Perform an IPFS API command returning a string. /// @@ -265,29 +272,49 @@ HttpClient Api() public async Task DoCommandAsync(string command, CancellationToken cancel, string arg = null, params string[] options) { var url = BuildCommand(command, arg, options); + if (log.IsDebugEnabled) - log.Debug("POST " + url.ToString()); + log.Debug("POST " + url); + using (var response = await Api().PostAsync(url, null, cancel)) { await ThrowOnErrorAsync(response); var body = await response.Content.ReadAsStringAsync(); + if (log.IsDebugEnabled) log.Debug("RSP " + body); + return body; } } - internal async Task DoCommandAsync(Uri url, CancellationToken cancel) + internal Task DoCommandAsync(Uri url, byte[] bytes, CancellationToken cancel) + { + return DoCommandAsync(url, new ByteArrayContent(bytes), cancel); + } + + internal Task DoCommandAsync(Uri url, Stream stream, CancellationToken cancel) + { + return DoCommandAsync(url, new StreamContent(stream), cancel); + } + + internal Task DoCommandAsync(Uri url, string str, CancellationToken cancel) + { + return DoCommandAsync(url, new StringContent(str), cancel); + } + + internal async Task DoCommandAsync(Uri url, HttpContent content, CancellationToken cancel) { if (log.IsDebugEnabled) - log.Debug("POST " + url.ToString()); - using (var response = await Api().PostAsync(url, null, cancel)) + log.Debug("POST " + url); + + using (var response = await Api().PostAsync(url, new MultipartFormDataContent { { content, "\"file\"" } }, cancel)) { await ThrowOnErrorAsync(response); var body = await response.Content.ReadAsStringAsync(); + if (log.IsDebugEnabled) log.Debug("RSP " + body); - return; } } @@ -353,12 +380,15 @@ public async Task DoCommandAsync(string command, CancellationToken cancel, public async Task PostDownloadAsync(string command, CancellationToken cancel, string arg = null, params string[] options) { var url = BuildCommand(command, arg, options); + if (log.IsDebugEnabled) - log.Debug("POST " + url.ToString()); - var request = new HttpRequestMessage(HttpMethod.Post, url); + log.Debug("POST " + url); + var request = new HttpRequestMessage(HttpMethod.Post, url); var response = await Api().SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancel); + await ThrowOnErrorAsync(response); + return await response.Content.ReadAsStreamAsync(); } @@ -388,10 +418,13 @@ public async Task PostDownloadAsync(string command, CancellationToken ca public async Task DownloadAsync(string command, CancellationToken cancel, string arg = null, params string[] options) { var url = BuildCommand(command, arg, options); + if (log.IsDebugEnabled) - log.Debug("GET " + url.ToString()); + log.Debug("GET " + url); + var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel); await ThrowOnErrorAsync(response); + return await response.Content.ReadAsStreamAsync(); } @@ -421,10 +454,13 @@ public async Task DownloadAsync(string command, CancellationToken cancel public async Task DownloadBytesAsync(string command, CancellationToken cancel, string arg = null, params string[] options) { var url = BuildCommand(command, arg, options); + if (log.IsDebugEnabled) - log.Debug("GET " + url.ToString()); + log.Debug("GET " + url); + var response = await Api().GetAsync(url, HttpCompletionOption.ResponseHeadersRead, cancel); await ThrowOnErrorAsync(response); + return await response.Content.ReadAsByteArrayAsync(); } @@ -460,7 +496,9 @@ public async Task UploadAsync(string command, CancellationToken cancel, { var content = new MultipartFormDataContent(); var streamContent = new StreamContent(data); + streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); + if (string.IsNullOrEmpty(name)) content.Add(streamContent, "file", unknownFilename); else @@ -468,13 +506,16 @@ public async Task UploadAsync(string command, CancellationToken cancel, var url = BuildCommand(command, null, options); if (log.IsDebugEnabled) - log.Debug("POST " + url.ToString()); + log.Debug("POST " + url); + using (var response = await Api().PostAsync(url, content, cancel)) { await ThrowOnErrorAsync(response); var json = await response.Content.ReadAsStringAsync(); + if (log.IsDebugEnabled) log.Debug("RSP " + json); + return json; } } @@ -510,17 +551,19 @@ public async Task Upload2Async(string command, CancellationToken cancel, { var content = new MultipartFormDataContent(); var streamContent = new StreamContent(data); + streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); - if (string.IsNullOrEmpty(name)) - content.Add(streamContent, "file", unknownFilename); - else - content.Add(streamContent, "file", name); + + content.Add(streamContent, "file", string.IsNullOrEmpty(name) ? unknownFilename : name); var url = BuildCommand(command, null, options); + if (log.IsDebugEnabled) - log.Debug("POST " + url.ToString()); + log.Debug("POST " + url); + var response = await Api().PostAsync(url, content, cancel); await ThrowOnErrorAsync(response); + return await response.Content.ReadAsStreamAsync(); } @@ -531,18 +574,24 @@ public async Task UploadAsync(string command, CancellationToken cancel, { var content = new MultipartFormDataContent(); var streamContent = new ByteArrayContent(data); + streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); content.Add(streamContent, "file", unknownFilename); var url = BuildCommand(command, null, options); + if (log.IsDebugEnabled) - log.Debug("POST " + url.ToString()); + log.Debug("POST " + url); + using (var response = await Api().PostAsync(url, content, cancel)) { await ThrowOnErrorAsync(response); + var json = await response.Content.ReadAsStringAsync(); + if (log.IsDebugEnabled) log.Debug("RSP " + json); + return json; } } @@ -562,24 +611,31 @@ async Task ThrowOnErrorAsync(HttpResponseMessage response) { if (response.IsSuccessStatusCode) return true; + if (response.StatusCode == HttpStatusCode.NotFound) { - var error = "Invalid IPFS command: " + response.RequestMessage.RequestUri.ToString(); + var error = "Invalid IPFS command: " + response.RequestMessage.RequestUri; + if (log.IsDebugEnabled) log.Debug("ERR " + error); + throw new HttpRequestException(error); } var body = await response.Content.ReadAsStringAsync(); + if (log.IsDebugEnabled) log.Debug("ERR " + body); + string message = body; + try { var res = JsonConvert.DeserializeObject(body); message = (string)res.Message; } catch { } + throw new HttpRequestException(message); } diff --git a/src/IpfsHttpClient.csproj b/src/IpfsHttpClient.csproj index c11524f..8272803 100644 --- a/src/IpfsHttpClient.csproj +++ b/src/IpfsHttpClient.csproj @@ -48,8 +48,7 @@ - - + diff --git a/src/PublishedMessage.cs b/src/PublishedMessage.cs index c0f945f..ca257ca 100644 --- a/src/PublishedMessage.cs +++ b/src/PublishedMessage.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Runtime.Serialization; using System.Text; +using Multiformats.Base; namespace Ipfs.Http { @@ -27,11 +28,13 @@ public class PublishedMessage : IPublishedMessage public PublishedMessage(string json) { var o = JObject.Parse(json); - this.Sender = Convert.FromBase64String((string)o["from"]).ToBase58(); - this.SequenceNumber = Convert.FromBase64String((string)o["seqno"]); - this.DataBytes = Convert.FromBase64String((string)o["data"]); - var topics = (JArray)(o["topicIDs"]); - this.Topics = topics.Select(t => (string)t); + + this.Sender = (string)o["from"]; + this.SequenceNumber = Multibase.Decode((string)o["seqno"], out MultibaseEncoding _); + this.DataBytes = Multibase.Decode((string)o["data"], out MultibaseEncoding _); + + var topics = (JArray) (o["topicIDs"]); + this.Topics = topics.Select(t => Encoding.UTF8.GetString(Multibase.Decode((string)t, out MultibaseEncoding _))); } /// diff --git a/test/CoreApi/PubSubApiTest.cs b/test/CoreApi/PubSubApiTest.cs index 9d94f9c..135df6e 100644 --- a/test/CoreApi/PubSubApiTest.cs +++ b/test/CoreApi/PubSubApiTest.cs @@ -22,7 +22,7 @@ public void Api_Exists() public async Task Peers() { var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); try { @@ -40,7 +40,7 @@ public async Task Peers() public void Peers_Unknown_Topic() { var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-unknown" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-unknown" + Guid.NewGuid(); var peers = ipfs.PubSub.PeersAsync(topic).Result.ToArray(); Assert.AreEqual(0, peers.Length); } @@ -49,7 +49,7 @@ public void Peers_Unknown_Topic() public async Task Subscribed_Topics() { var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); try { @@ -71,7 +71,7 @@ public async Task Subscribe() { messageCount = 0; var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); try { @@ -96,7 +96,7 @@ public async Task Subscribe_Mutiple_Messages() messageCount = 0; var messages = "hello world this is pubsub".Split(); var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); try { @@ -119,12 +119,12 @@ await ipfs.PubSub.SubscribeAsync(topic, msg => } [TestMethod] - public async Task Multiple_Subscribe_Mutiple_Messages() + public async Task Multiple_Subscribe_Multiple_Messages() { messageCount = 0; var messages = "hello world this is pubsub".Split(); var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); Action processMessage = (msg) => { @@ -155,7 +155,7 @@ public async Task Unsubscribe() { messageCount1 = 0; var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); await ipfs.PubSub.SubscribeAsync(topic, msg => { @@ -177,7 +177,7 @@ public async Task Subscribe_BinaryMessage() var messages = new List(); var expected = new byte[] { 0, 1, 2, 4, (byte)'a', (byte)'b', 0xfe, 0xff }; var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); try { @@ -203,7 +203,7 @@ public async Task Subscribe_StreamMessage() var messages = new List(); var expected = new byte[] { 0, 1, 2, 4, (byte)'a', (byte)'b', 0xfe, 0xff }; var ipfs = TestFixture.Ipfs; - var topic = "net-ipfs-http-client-test-" + Guid.NewGuid().ToString(); + var topic = "net-ipfs-http-client-test-" + Guid.NewGuid(); var cs = new CancellationTokenSource(); try {