diff --git a/src/Curl/Easy.jl b/src/Curl/Easy.jl index 0dd1108..8ad57fa 100644 --- a/src/Curl/Easy.jl +++ b/src/Curl/Easy.jl @@ -1,26 +1,28 @@ mutable struct Easy handle :: Ptr{Cvoid} - input :: Union{Vector{UInt8},Nothing} - ready :: Threads.Event + input :: IO + done :: Threads.Event seeker :: Union{Function,Nothing} - output :: Channel{Vector{UInt8}} - progress :: Channel{NTuple{4,Int}} + output :: IO + progress :: Function req_hdrs :: Ptr{curl_slist_t} res_hdrs :: Vector{String} code :: CURLcode errbuf :: Vector{UInt8} end -const EMPTY_BYTE_VECTOR = UInt8[] - -function Easy() +function Easy( + input :: IO, + output :: IO, + progress :: Union{Function,Nothing}, +) easy = Easy( curl_easy_init(), - EMPTY_BYTE_VECTOR, + input, Threads.Event(), nothing, - Channel{Vector{UInt8}}(Inf), - Channel{NTuple{4,Int}}(Inf), + output, + something(progress, (_, _, _, _) -> nothing), C_NULL, String[], typemax(CURLcode), @@ -284,50 +286,28 @@ end # callbacks function header_callback( - data :: Ptr{Cchar}, + data :: Ptr{UInt8}, size :: Csize_t, count :: Csize_t, easy_p :: Ptr{Cvoid}, )::Csize_t easy = unsafe_pointer_to_objref(easy_p)::Easy - n = size * count + n = size*count hdr = unsafe_string(data, n) push!(easy.res_hdrs, hdr) return n end -# feed data to read_callback -function upload_data(easy::Easy, input::IO) - while true - data = eof(input) ? nothing : readavailable(input) - easy.input === nothing && break - easy.input = data - curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT) - wait(easy.ready) - easy.input === nothing && break - easy.ready = Threads.Event() - end -end - function read_callback( - data :: Ptr{Cchar}, + data :: Ptr{UInt8}, size :: Csize_t, count :: Csize_t, easy_p :: Ptr{Cvoid}, )::Csize_t easy = unsafe_pointer_to_objref(easy_p)::Easy - buf = easy.input - if buf === nothing - notify(easy.ready) - return 0 # done uploading - end - if isempty(buf) - notify(easy.ready) - return CURL_READFUNC_PAUSE # wait for more data - end - n = min(size * count, length(buf)) - ccall(:memcpy, Ptr{Cvoid}, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), data, buf, n) - deleteat!(buf, 1:n) + eof(easy.input) && return 0 + buf = unsafe_wrap(Vector{UInt8}, data, size*count) + n = readbytes!(easy.input, buf, size*count) return n end @@ -344,24 +324,21 @@ function seek_callback( easy.seeker === nothing && return CURL_SEEKFUNC_CANTSEEK try easy.seeker(offset) catch err - @async @error("seek_callback: seeker failed", err) + @async @error("seek_callback: seek failed", err) return CURL_SEEKFUNC_FAIL end return CURL_SEEKFUNC_OK end function write_callback( - data :: Ptr{Cchar}, + data :: Ptr{UInt8}, size :: Csize_t, count :: Csize_t, easy_p :: Ptr{Cvoid}, )::Csize_t easy = unsafe_pointer_to_objref(easy_p)::Easy - n = size * count - buf = Array{UInt8}(undef, n) - ccall(:memcpy, Ptr{Cvoid}, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), buf, data, n) - put!(easy.output, buf) - return n + unsafe_write(easy.output, data, size*count) + return size*count end function progress_callback( @@ -372,7 +349,7 @@ function progress_callback( ul_now :: curl_off_t, )::Cint easy = unsafe_pointer_to_objref(easy_p)::Easy - put!(easy.progress, (dl_total, dl_now, ul_total, ul_now)) + easy.progress(dl_total, dl_now, ul_total, ul_now) return 0 end @@ -387,13 +364,13 @@ function add_callbacks(easy::Easy) # set header callback header_cb = @cfunction(header_callback, - Csize_t, (Ptr{Cchar}, Csize_t, Csize_t, Ptr{Cvoid})) + Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Cvoid})) setopt(easy, CURLOPT_HEADERFUNCTION, header_cb) setopt(easy, CURLOPT_HEADERDATA, easy_p) # set write callback write_cb = @cfunction(write_callback, - Csize_t, (Ptr{Cchar}, Csize_t, Csize_t, Ptr{Cvoid})) + Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Cvoid})) setopt(easy, CURLOPT_WRITEFUNCTION, write_cb) setopt(easy, CURLOPT_WRITEDATA, easy_p) @@ -410,7 +387,7 @@ function add_upload_callbacks(easy::Easy) # set read callback read_cb = @cfunction(read_callback, - Csize_t, (Ptr{Cchar}, Csize_t, Csize_t, Ptr{Cvoid})) + Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Cvoid})) setopt(easy, CURLOPT_READFUNCTION, read_cb) setopt(easy, CURLOPT_READDATA, easy_p) end diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index c56b94a..bc74955 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -85,10 +85,7 @@ function check_multi_info(multi::Multi) easy = unsafe_pointer_to_objref(easy_p_ref[])::Easy @assert easy_handle == easy.handle easy.code = message.code - close(easy.progress) - close(easy.output) - easy.input = nothing - notify(easy.ready) + notify(easy.done) else @async @error("curl_multi_info_read: unknown message", message) end diff --git a/src/Downloads.jl b/src/Downloads.jl index b56561d..f6ec990 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -308,7 +308,7 @@ function request( progress = p_func(progress, input, output) arg_read(input) do input arg_write(output) do output - with_handle(Easy()) do easy + with_handle(Easy(input, output, progress)) do easy # setup the request set_url(easy, url) set_timeout(easy, timeout) @@ -343,23 +343,8 @@ function request( # do the request add_handle(downloader.multi, easy) - try # ensure handle is removed - @sync begin - @async for buf in easy.output - write(output, buf) - end - if progress !== nothing - @async for prog in easy.progress - progress(prog...) - end - end - if have_input - @async upload_data(easy, input) - end - end - finally - remove_handle(downloader.multi, easy) - end + wait(easy.done) + remove_handle(downloader.multi, easy) # return the response or throw an error response = Response(get_response_info(easy)...)