Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
use Mix.Config
import Config
15 changes: 11 additions & 4 deletions lib/downstream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ defmodule Downstream do
headers = Keyword.get(options, :headers, [])
http_options = Keyword.get(options, :http_options, [])

download_task = Task.async(Download, :stream, [io_device])
httpoison_options = Keyword.merge(http_options, stream_to: download_task.pid)
download_task =
Task.async(Download, :stream, [
%{type: :get, io_device: io_device, options: options}
])

httpoison_options = Keyword.merge(http_options, stream_to: download_task.pid, follow_redirect: true)

HTTPoison.get!(url, headers, httpoison_options)

Expand Down Expand Up @@ -63,9 +67,12 @@ defmodule Downstream do
headers = Keyword.get(options, :headers, [])
http_options = Keyword.get(options, :http_options, [])

download_task = Task.async(Download, :stream, [io_device])
download_task =
Task.async(Download, :stream, [
%{type: :post, io_device: io_device, body: body, options: options}
])

httpoison_options = Keyword.merge(http_options, stream_to: download_task.pid)
httpoison_options = Keyword.merge(http_options, stream_to: download_task.pid, follow_redirect: true)

HTTPoison.post!(url, body, headers, httpoison_options)

Expand Down
35 changes: 20 additions & 15 deletions lib/downstream/download.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,39 @@ defmodule Downstream.Download do

@dialyzer {:no_return, stream: 1, stream: 2}

def stream(io_device, response \\ %Response{}) do
def stream(request_params, response \\ %Response{}) do
receive do
response_chunk -> handle_response_chunk(response_chunk, io_device, response)
response_chunk -> handle_response_chunk(response_chunk, request_params, response)
end
end

defp handle_response_chunk(%AsyncStatus{code: 200}, io_device, response) do
stream(io_device, %Response{response | status_code: 200})
defp handle_response_chunk(%AsyncStatus{code: 200}, request_params, response) do
stream(request_params, %Response{response | status_code: 200})
end

defp handle_response_chunk(%AsyncStatus{code: code}, io_device, _response) do
{:error, %Error{device: io_device, reason: :invalid_status_code, status_code: code}}
defp handle_response_chunk(%AsyncStatus{code: code}, request_params, _response) do
{:error, %Error{device: request_params.io_device, reason: :invalid_status_code, status_code: code}}
end

defp handle_response_chunk(%AsyncHeaders{headers: headers}, io_device, response) do
stream(io_device, %Response{response | headers: headers})
defp handle_response_chunk(%AsyncHeaders{headers: headers}, request_params, response) do
stream(request_params, %Response{response | headers: headers})
end

defp handle_response_chunk(%AsyncChunk{chunk: data}, io_device, response) do
IO.binwrite(io_device, data)
stream(io_device, response)
defp handle_response_chunk(%AsyncChunk{chunk: data}, request_params, response) do
IO.binwrite(request_params.io_device, data)
stream(request_params, response)
end

defp handle_response_chunk(%AsyncEnd{}, io_device, response) do
{:ok, %Response{response | device: io_device}}
defp handle_response_chunk(%AsyncEnd{}, request_params, response) do
{:ok, %Response{response | device: request_params.io_device}}
end

defp handle_response_chunk(_, io_device, _response) do
{:error, %Error{device: io_device, reason: :unexpected_error}}
defp handle_response_chunk(%HTTPoison.AsyncRedirect{to: to}, request_params, _response) do
params = [to, request_params.io_device, request_params[:body], request_params.options] |> Enum.reject(&is_nil/1)
apply(Downstream, request_params.type, params)
end

defp handle_response_chunk(_, request_params, _response) do
{:error, %Error{device: request_params.io_device, reason: :unexpected_error}}
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Downstream.MixProject do
start_permanent: Mix.env() == :prod,
source_url: "https://github.com/mpiercy827/downstream",
test_coverage: [tool: ExCoveralls],
version: "1.1.0"
version: "1.2.0"
]
end

Expand Down
33 changes: 17 additions & 16 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
%{
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
"certifi": {:hex, :certifi, "2.4.2", "75424ff0f3baaccfd34b1214184b6ef616d89e420b258bb0a5ea7d7bc628f7f0", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "01d479edba0569a7b7a2c8bf923feeb6dc6a358edc2965ef69aea9ba288bb243"},
"credo": {:hex, :credo, "1.0.2", "88bc918f215168bf6ce7070610a6173c45c82f32baa08bdfc80bf58df2d103b6", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "df2e5ad6f6d40b140fa109b465325ee718d54f23f56a4aa9178796ede2a1ab83"},
"certifi": {:hex, :certifi, "2.8.0", "d4fb0a6bb20b7c9c3643e22507e42f356ac090a1dcea9ab99e27e0376d695eba", [:rebar3], [], "hexpm", "6ac7efc1c6f8600b08d625292d4bbf584e14847ce1b6b5c44d983d273e1097ea"},
"credo": {:hex, :credo, "1.6.1", "7dc76dcdb764a4316c1596804c48eada9fff44bd4b733a91ccbf0c0f368be61e", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "698607fb5993720c7e93d2d8e76f2175bba024de964e160e2f7151ef3ab82ac5"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
"earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm", "000aaeff08919e95e7aea13e4af7b2b9734577b3e6a7c50ee31ee88cab6ec4fb"},
"earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"},
"ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"},
"excoveralls": {:hex, :excoveralls, "0.10.5", "7c912c4ec0715a6013647d835c87cde8154855b9b84e256bc7a63858d5f284e3", [:mix], [{:hackney, "~> 1.13", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "176052589b681f44de12fb85182ccf0296030f619b6939926bc647dabedf9320"},
"earmark_parser": {:hex, :earmark_parser, "1.4.18", "e1b2be73eb08a49fb032a0208bf647380682374a725dfb5b9e510def8397f6f2", [:mix], [], "hexpm", "114a0e85ec3cf9e04b811009e73c206394ffecfcc313e0b346de0d557774ee97"},
"ex_doc": {:hex, :ex_doc, "0.26.0", "1922164bac0b18b02f84d6f69cab1b93bc3e870e2ad18d5dacb50a9e06b542a3", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2775d66e494a9a48355db7867478ffd997864c61c65a47d31c4949459281c78d"},
"excoveralls": {:hex, :excoveralls, "0.14.4", "295498f1ae47bdc6dce59af9a585c381e1aefc63298d48172efaaa90c3d251db", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e3ab02f2df4c1c7a519728a6f0a747e71d7d6e846020aae338173619217931c1"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, optional: false]}]},
"hackney": {:hex, :hackney, "1.15.0", "287a5d2304d516f63e56c469511c42b016423bcb167e61b611f6bad47e3ca60e", [:rebar3], [{:certifi, "2.4.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "b69d97134f1876ba8e4e2f405e9da8cba7cf4f2da0b7cc24a5ccef8dcf1b46b2"},
"httpoison": {:hex, :httpoison, "1.5.0", "71ae9f304bdf7f00e9cd1823f275c955bdfc68282bc5eb5c85c3a9ade865d68e", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "e9d994aea63fab9e29307920492ab95f87339b56fbc5c8c4b1f65ea20d3ba9a4"},
"idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "4bdd305eb64e18b0273864920695cb18d7a2021f31a11b9c5fbcd9a253f936e2"},
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fdf843bca858203ae1de16da2ee206f53416bbda5dc8c9e78f43243de4bc3afe"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"hackney": {:hex, :hackney, "1.18.0", "c4443d960bb9fba6d01161d01cd81173089686717d9490e5d3606644c48d121f", [:rebar3], [{:certifi, "~>2.8.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "9afcda620704d720db8c6a3123e9848d09c87586dc1c10479c42627b905b5c5e"},
"httpoison": {:hex, :httpoison, "1.8.0", "6b85dea15820b7804ef607ff78406ab449dd78bed923a49c7160e1886e987a3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "28089eaa98cf90c66265b6b5ad87c59a3729bea2e74e9d08f9b51eb9729b3c3a"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [:mix, :rebar3], []},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm", "7a4c8e1115a2732a67d7624e28cf6c9f30c66711a9e92928e745c255887ba465"},
"mimic": {:hex, :mimic, "0.2.0", "66601fa5ce58db59e88fa0bef9d35d8169f11afcc3c8f0c77f911093f4469785", [:mix], [], "hexpm", "887d7b0c686ed0bae09e1988590778f0b896093b288c3c6f48cae964a511009a"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm", "603561dc0fd62f4f2ea9b890f4e20e1a0d388746d6e20557cafb1b16950de88c"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.4.1", "d869e4c68901dd9531385bb0c8c40444ebf624e60b6962d95952775cac5e90cd", [:rebar3], [], "hexpm", "1d1848c40487cdb0b30e8ed975e34e025860c02e419cb615d255849f3427439d"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mimic": {:hex, :mimic, "0.3.0", "584dfd5ac2ce3347f56b0de3b7906554e2f6020007efed37e721b968f27ebbc0", [:mix], [], "hexpm", "6e817cbd79bca8c70b90b1e3268fb54bbd0c89387e2143236f0233bf935cc5a1"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
}
25 changes: 16 additions & 9 deletions test/downstream/download_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,59 @@ defmodule Downstream.DownloadTest do
use ExUnit.Case

alias Downstream.{Download, Error, Response}
alias HTTPoison.{AsyncChunk, AsyncEnd, AsyncHeaders, AsyncStatus}
alias HTTPoison.{AsyncChunk, AsyncEnd, AsyncHeaders, AsyncStatus, AsyncRedirect}

describe "stream/1" do
setup _context do
{:ok, pid} = StringIO.open("test")

[io_device: pid]
params = %{
request_type: :get,
io_device: pid,
options: []
}

%{params: params}
end

test "returns file_path after normal request flow", context do
task = Task.async(Download, :stream, [context.io_device])
task = Task.async(Download, :stream, [context.params])

Process.send(task.pid, %AsyncStatus{code: 200}, [:nosuspend])
Process.send(task.pid, %AsyncHeaders{headers: []}, [:nosuspend])
Process.send(task.pid, %AsyncChunk{chunk: "data"}, [:nosuspend])
Process.send(task.pid, %AsyncEnd{}, [:nosuspend])
Process.send(task.pid, %AsyncRedirect{}, [:nosuspend])

{:ok, %Response{device: device, headers: headers, status_code: code}} = Task.await(task)

assert device == context.io_device
assert device == context.params.io_device
assert code == 200
assert is_list(headers)

assert StringIO.flush(context.io_device) == "data"
assert StringIO.flush(context.params.io_device) == "data"
end

test "handles unexpected status codes", context do
task = Task.async(Download, :stream, [context.io_device])
task = Task.async(Download, :stream, [context.params])

Process.send(task.pid, %AsyncStatus{code: 401}, [:nosuspend])

{:error, %Error{device: device, reason: :invalid_status_code, status_code: code}} =
Task.await(task)

assert device == context.io_device
assert device == context.params.io_device
assert code == 401
end

test "handles unexpected messages", context do
task = Task.async(Download, :stream, [context.io_device])
task = Task.async(Download, :stream, [context.params])

Process.send(task.pid, :invalid_message, [:nosuspend])

{:error, %Error{device: device, reason: error}} = Task.await(task)

assert device == context.io_device
assert device == context.params.io_device
assert error == :unexpected_error
end
end
Expand Down
Loading