From 7b0a3119b499989c31611a8fb944015aa9a400c5 Mon Sep 17 00:00:00 2001 From: sphill99 Date: Fri, 21 Aug 2020 13:56:40 -0400 Subject: [PATCH] AsyncRequestQueue implementation + some tests (#361) Co-authored-by: Scott Phillips --- .../com/android/volley/AsyncRequestQueue.java | 571 ++++++++++++++++++ .../com/android/volley/CacheDispatcher.java | 115 +--- .../java/com/android/volley/RequestQueue.java | 20 +- .../java/com/android/volley/RequestTask.java | 15 + .../android/volley/WaitingRequestManager.java | 176 ++++++ .../volley/cronet/CronetHttpStack.java | 47 +- .../volley/toolbox/BasicAsyncNetwork.java | 77 ++- .../android/volley/toolbox/NoAsyncCache.java | 37 ++ .../android/volley/toolbox/ThrowingCache.java | 54 ++ .../android/volley/AsyncRequestQueueTest.java | 156 +++++ 10 files changed, 1115 insertions(+), 153 deletions(-) create mode 100644 src/main/java/com/android/volley/AsyncRequestQueue.java create mode 100644 src/main/java/com/android/volley/RequestTask.java create mode 100644 src/main/java/com/android/volley/WaitingRequestManager.java create mode 100644 src/main/java/com/android/volley/toolbox/NoAsyncCache.java create mode 100644 src/main/java/com/android/volley/toolbox/ThrowingCache.java create mode 100644 src/test/java/com/android/volley/AsyncRequestQueueTest.java diff --git a/src/main/java/com/android/volley/AsyncRequestQueue.java b/src/main/java/com/android/volley/AsyncRequestQueue.java new file mode 100644 index 00000000..bbbcf927 --- /dev/null +++ b/src/main/java/com/android/volley/AsyncRequestQueue.java @@ -0,0 +1,571 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.volley; + +import android.os.Handler; +import android.os.Looper; +import android.os.SystemClock; +import androidx.annotation.NonNull; +import androidx.annotation.Nullable; +import com.android.volley.AsyncCache.OnGetCompleteCallback; +import com.android.volley.AsyncNetwork.OnRequestComplete; +import com.android.volley.Cache.Entry; +import com.android.volley.toolbox.ThrowingCache; +import java.net.HttpURLConnection; +import java.util.Comparator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * An asynchronous request dispatch queue. + * + *

Add requests to the queue with {@link #add(Request)}. Once completed, responses will be + * delivered on the main thread (unless a custom {@link ResponseDelivery} has been provided) + */ +public class AsyncRequestQueue extends RequestQueue { + /** Default number of blocking threads to start. */ + private static final int DEFAULT_BLOCKING_THREAD_POOL_SIZE = 4; + + /** + * AsyncCache used to retrieve and store responses. + * + *

{@code null} indicates use of blocking Cache. + */ + @Nullable private final AsyncCache mAsyncCache; + + /** AsyncNetwork used to perform nework requests. */ + private final AsyncNetwork mNetwork; + + /** Executor for non-blocking tasks. */ + private ExecutorService mNonBlockingExecutor; + + /** + * Executor for blocking tasks. + * + *

Some tasks in handling requests may not be easy to implement in a non-blocking way, such + * as reading or parsing the response data. This executor is used to run these tasks. + */ + private ExecutorService mBlockingExecutor; + + /** + * This interface may be used by advanced applications to provide custom executors according to + * their needs. Apps must create ExecutorServices dynamically given a blocking queue rather than + * providing them directly so that Volley can provide a PriorityQueue which will prioritize + * requests according to Request#getPriority. + */ + private ExecutorFactory mExecutorFactory; + + /** Manage list of waiting requests and de-duplicate requests with same cache key. */ + private final WaitingRequestManager mWaitingRequestManager = new WaitingRequestManager(this); + + /** + * Sets all the variables, but processing does not begin until {@link #start()} is called. + * + * @param cache to use for persisting responses to disk. If an AsyncCache was provided, then + * this will be a {@link ThrowingCache} + * @param network to perform HTTP requests + * @param asyncCache to use for persisting responses to disk. May be null to indicate use of + * blocking cache + * @param responseDelivery interface for posting responses and errors + * @param executorFactory Interface to be used to provide custom executors according to the + * users needs. + */ + private AsyncRequestQueue( + Cache cache, + AsyncNetwork network, + @Nullable AsyncCache asyncCache, + ResponseDelivery responseDelivery, + ExecutorFactory executorFactory) { + super(cache, network, /* threadPoolSize= */ 0, responseDelivery); + mAsyncCache = asyncCache; + mNetwork = network; + mExecutorFactory = executorFactory; + } + + /** Sets the executors and initializes the cache. */ + @Override + public void start() { + stop(); // Make sure any currently running threads are stopped + + // Create blocking / non-blocking executors and set them in the network and stack. + mNonBlockingExecutor = mExecutorFactory.createNonBlockingExecutor(getBlockingQueue()); + mBlockingExecutor = mExecutorFactory.createBlockingExecutor(getBlockingQueue()); + mNetwork.setBlockingExecutor(mBlockingExecutor); + mNetwork.setNonBlockingExecutor(mNonBlockingExecutor); + + mNonBlockingExecutor.execute( + new Runnable() { + @Override + public void run() { + // This is intentionally blocking, because we don't want to process any + // requests until the cache is initialized. + if (mAsyncCache != null) { + final CountDownLatch latch = new CountDownLatch(1); + mAsyncCache.initialize( + new AsyncCache.OnWriteCompleteCallback() { + @Override + public void onWriteComplete() { + latch.countDown(); + } + }); + try { + latch.await(); + } catch (InterruptedException e) { + VolleyLog.e( + e, "Thread was interrupted while initializing the cache."); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else { + getCache().initialize(); + } + } + }); + } + + /** Shuts down and nullifies both executors */ + @Override + public void stop() { + if (mNonBlockingExecutor != null) { + mNonBlockingExecutor.shutdownNow(); + mNonBlockingExecutor = null; + } + if (mBlockingExecutor != null) { + mBlockingExecutor.shutdownNow(); + mBlockingExecutor = null; + } + } + + /** Begins the request by sending it to the Cache or Network. */ + @Override + void beginRequest(Request request) { + // If the request is uncacheable, send it over the network. + if (request.shouldCache()) { + if (mAsyncCache != null) { + mNonBlockingExecutor.execute(new CacheTask<>(request)); + } else { + mBlockingExecutor.execute(new CacheTask<>(request)); + } + } else { + sendRequestOverNetwork(request); + } + } + + @Override + void sendRequestOverNetwork(Request request) { + mNonBlockingExecutor.execute(new NetworkTask<>(request)); + } + + /** Runnable that gets an entry from the cache. */ + private class CacheTask extends RequestTask { + CacheTask(Request request) { + super(request); + } + + @Override + public void run() { + // If the request has been canceled, don't bother dispatching it. + if (mRequest.isCanceled()) { + mRequest.finish("cache-discard-canceled"); + return; + } + + mRequest.addMarker("cache-queue-take"); + + // Attempt to retrieve this item from cache. + if (mAsyncCache != null) { + mAsyncCache.get( + mRequest.getCacheKey(), + new OnGetCompleteCallback() { + @Override + public void onGetComplete(Entry entry) { + handleEntry(entry, mRequest); + } + }); + } else { + Entry entry = getCache().get(mRequest.getCacheKey()); + handleEntry(entry, mRequest); + } + } + } + + /** Helper method that handles the cache entry after getting it from the Cache. */ + private void handleEntry(final Entry entry, final Request mRequest) { + if (entry == null) { + mRequest.addMarker("cache-miss"); + // Cache miss; send off to the network dispatcher. + if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { + sendRequestOverNetwork(mRequest); + } + return; + } + + // If it is completely expired, just send it to the network. + if (entry.isExpired()) { + mRequest.addMarker("cache-hit-expired"); + mRequest.setCacheEntry(entry); + if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { + sendRequestOverNetwork(mRequest); + } + return; + } + + // We have a cache hit; parse its data for delivery back to the request. + mBlockingExecutor.execute(new CacheParseTask<>(mRequest, entry)); + } + + private class CacheParseTask extends RequestTask { + Cache.Entry entry; + + CacheParseTask(Request request, Cache.Entry entry) { + super(request); + this.entry = entry; + } + + @Override + public void run() { + mRequest.addMarker("cache-hit"); + Response response = + mRequest.parseNetworkResponse( + new NetworkResponse( + HttpURLConnection.HTTP_OK, + entry.data, + /* notModified= */ false, + /* networkTimeMs= */ 0, + entry.allResponseHeaders)); + mRequest.addMarker("cache-hit-parsed"); + + if (!entry.refreshNeeded()) { + // Completely unexpired cache hit. Just deliver the response. + getResponseDelivery().postResponse(mRequest, response); + } else { + // Soft-expired cache hit. We can deliver the cached response, + // but we need to also send the request to the network for + // refreshing. + mRequest.addMarker("cache-hit-refresh-needed"); + mRequest.setCacheEntry(entry); + // Mark the response as intermediate. + response.intermediate = true; + + if (!mWaitingRequestManager.maybeAddToWaitingRequests(mRequest)) { + // Post the intermediate response back to the user and have + // the delivery then forward the request along to the network. + getResponseDelivery() + .postResponse( + mRequest, + response, + new Runnable() { + @Override + public void run() { + sendRequestOverNetwork(mRequest); + } + }); + } else { + // request has been added to list of waiting requests + // to receive the network response from the first request once it + // returns. + getResponseDelivery().postResponse(mRequest, response); + } + } + } + } + + private class ParseErrorTask extends RequestTask { + VolleyError volleyError; + + ParseErrorTask(Request request, VolleyError volleyError) { + super(request); + this.volleyError = volleyError; + } + + @Override + public void run() { + VolleyError parsedError = mRequest.parseNetworkError(volleyError); + getResponseDelivery().postError(mRequest, parsedError); + mRequest.notifyListenerResponseNotUsable(); + } + } + + /** Runnable that performs the network request */ + private class NetworkTask extends RequestTask { + NetworkTask(Request request) { + super(request); + } + + @Override + public void run() { + // If the request was cancelled already, do not perform the network request. + if (mRequest.isCanceled()) { + mRequest.finish("network-discard-cancelled"); + mRequest.notifyListenerResponseNotUsable(); + return; + } + + final long startTimeMs = SystemClock.elapsedRealtime(); + mRequest.addMarker("network-queue-take"); + + // TODO: Figure out what to do with traffic stats tags. Can this be pushed to the + // HTTP stack, or is it no longer feasible to support? + + // Perform the network request. + mNetwork.performRequest( + mRequest, + new OnRequestComplete() { + @Override + public void onSuccess(final NetworkResponse networkResponse) { + mRequest.addMarker("network-http-complete"); + + // If the server returned 304 AND we delivered a response already, + // we're done -- don't deliver a second identical response. + if (networkResponse.notModified && mRequest.hasHadResponseDelivered()) { + mRequest.finish("not-modified"); + mRequest.notifyListenerResponseNotUsable(); + return; + } + + // Parse the response here on the worker thread. + mBlockingExecutor.execute( + new NetworkParseTask<>(mRequest, networkResponse)); + } + + @Override + public void onError(final VolleyError volleyError) { + volleyError.setNetworkTimeMs( + SystemClock.elapsedRealtime() - startTimeMs); + mBlockingExecutor.execute(new ParseErrorTask<>(mRequest, volleyError)); + } + }); + } + } + + /** Runnable that parses a network response. */ + private class NetworkParseTask extends RequestTask { + NetworkResponse networkResponse; + + NetworkParseTask(Request request, NetworkResponse networkResponse) { + super(request); + this.networkResponse = networkResponse; + } + + @Override + public void run() { + final Response response = mRequest.parseNetworkResponse(networkResponse); + mRequest.addMarker("network-parse-complete"); + + // Write to cache if applicable. + // TODO: Only update cache metadata instead of entire + // record for 304s. + if (mRequest.shouldCache() && response.cacheEntry != null) { + if (mAsyncCache != null) { + mNonBlockingExecutor.execute(new CachePutTask<>(mRequest, response)); + } else { + mBlockingExecutor.execute(new CachePutTask<>(mRequest, response)); + } + } else { + finishRequest(mRequest, response, /* cached= */ false); + } + } + } + + private class CachePutTask extends RequestTask { + Response response; + + CachePutTask(Request request, Response response) { + super(request); + this.response = response; + } + + @Override + public void run() { + if (mAsyncCache != null) { + mAsyncCache.put( + mRequest.getCacheKey(), + response.cacheEntry, + new AsyncCache.OnWriteCompleteCallback() { + @Override + public void onWriteComplete() { + finishRequest(mRequest, response, /* cached= */ true); + } + }); + } else { + getCache().put(mRequest.getCacheKey(), response.cacheEntry); + finishRequest(mRequest, response, /* cached= */ true); + } + } + } + + /** Posts response and notifies listener */ + private void finishRequest(Request mRequest, Response response, boolean cached) { + if (cached) { + mRequest.addMarker("network-cache-written"); + } + // Post the response back. + mRequest.markDelivered(); + getResponseDelivery().postResponse(mRequest, response); + mRequest.notifyListenerResponseReceived(response); + } + + /** + * This interface may be used by advanced applications to provide custom executors according to + * their needs. Apps must create ExecutorServices dynamically given a blocking queue rather than + * providing them directly so that Volley can provide a PriorityQueue which will prioritize + * requests according to Request#getPriority. + */ + public interface ExecutorFactory { + ExecutorService createNonBlockingExecutor(BlockingQueue taskQueue); + + ExecutorService createBlockingExecutor(BlockingQueue taskQueue); + } + + /** Provides a BlockingQueue to be used to create executors. */ + private static PriorityBlockingQueue getBlockingQueue() { + return new PriorityBlockingQueue<>( + /* initialCapacity= */ 11, + new Comparator() { + @Override + public int compare(Runnable r1, Runnable r2) { + // Vanilla runnables are prioritized first, then RequestTasks are ordered + // by the underlying Request. + if (r1 instanceof RequestTask) { + if (r2 instanceof RequestTask) { + return ((RequestTask) r1).compareTo(((RequestTask) r2)); + } + return 1; + } + return r2 instanceof RequestTask ? -1 : 0; + } + }); + } + + /** + * Builder is used to build an instance of {@link AsyncRequestQueue} from values configured by + * the setters. + */ + public static class Builder { + @Nullable private AsyncCache mAsyncCache = null; + private final AsyncNetwork mNetwork; + @Nullable private Cache mCache = null; + @Nullable private ExecutorFactory mExecutorFactory = null; + @Nullable private ResponseDelivery mResponseDelivery = null; + + public Builder(AsyncNetwork asyncNetwork) { + if (asyncNetwork == null) { + throw new IllegalArgumentException("Network cannot be null"); + } + mNetwork = asyncNetwork; + } + + /** + * Sets the executor factory to be used by the AsyncRequestQueue. If this is not called, + * Volley will create suitable private thread pools. + */ + public Builder setExecutorFactory(ExecutorFactory executorFactory) { + mExecutorFactory = executorFactory; + return this; + } + + /** + * Sets the response deliver to be used by the AsyncRequestQueue. If this is not called, we + * will default to creating a new {@link ExecutorDelivery} with the application's main + * thread. + */ + public Builder setResponseDelivery(ResponseDelivery responseDelivery) { + mResponseDelivery = responseDelivery; + return this; + } + + /** Sets the AsyncCache to be used by the AsyncRequestQueue. */ + public Builder setAsyncCache(AsyncCache asyncCache) { + mAsyncCache = asyncCache; + return this; + } + + /** Sets the Cache to be used by the AsyncRequestQueue. */ + public Builder setCache(Cache cache) { + mCache = cache; + return this; + } + + /** Provides a default ExecutorFactory to use, if one is never set. */ + private ExecutorFactory getDefaultExecutorFactory() { + return new ExecutorFactory() { + @Override + public ExecutorService createNonBlockingExecutor( + BlockingQueue taskQueue) { + return getNewThreadPoolExecutor( + /* maximumPoolSize= */ 1, + /* threadNameSuffix= */ "Non-BlockingExecutor", + taskQueue); + } + + @Override + public ExecutorService createBlockingExecutor(BlockingQueue taskQueue) { + return getNewThreadPoolExecutor( + /* maximumPoolSize= */ DEFAULT_BLOCKING_THREAD_POOL_SIZE, + /* threadNameSuffix= */ "BlockingExecutor", + taskQueue); + } + + private ThreadPoolExecutor getNewThreadPoolExecutor( + int maximumPoolSize, + final String threadNameSuffix, + BlockingQueue taskQueue) { + return new ThreadPoolExecutor( + /* corePoolSize= */ 0, + /* maximumPoolSize= */ maximumPoolSize, + /* keepAliveTime= */ 60, + /* unit= */ TimeUnit.SECONDS, + taskQueue, + new ThreadFactory() { + @Override + public Thread newThread(@NonNull Runnable runnable) { + Thread t = Executors.defaultThreadFactory().newThread(runnable); + t.setName("Volley-" + threadNameSuffix); + return t; + } + }); + } + }; + } + + public AsyncRequestQueue build() { + // If neither cache is set by the caller, throw an illegal argument exception. + if (mCache == null && mAsyncCache == null) { + throw new IllegalArgumentException("You must set one of the cache objects"); + } + if (mCache == null) { + // if no cache is provided, we will provide one that throws + // UnsupportedOperationExceptions to pass into the parent class. + mCache = new ThrowingCache(); + } + if (mResponseDelivery == null) { + mResponseDelivery = new ExecutorDelivery(new Handler(Looper.getMainLooper())); + } + if (mExecutorFactory == null) { + mExecutorFactory = getDefaultExecutorFactory(); + } + return new AsyncRequestQueue( + mCache, mNetwork, mAsyncCache, mResponseDelivery, mExecutorFactory); + } + } +} diff --git a/src/main/java/com/android/volley/CacheDispatcher.java b/src/main/java/com/android/volley/CacheDispatcher.java index 12b10353..1bfc0ea5 100644 --- a/src/main/java/com/android/volley/CacheDispatcher.java +++ b/src/main/java/com/android/volley/CacheDispatcher.java @@ -18,10 +18,6 @@ import android.os.Process; import androidx.annotation.VisibleForTesting; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; /** @@ -72,7 +68,7 @@ public CacheDispatcher( mNetworkQueue = networkQueue; mCache = cache; mDelivery = delivery; - mWaitingRequestManager = new WaitingRequestManager(this); + mWaitingRequestManager = new WaitingRequestManager(this, networkQueue, delivery); } /** @@ -207,113 +203,4 @@ public void run() { request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED); } } - - private static class WaitingRequestManager implements Request.NetworkRequestCompleteListener { - - /** - * Staging area for requests that already have a duplicate request in flight. - * - *

- */ - private final Map>> mWaitingRequests = new HashMap<>(); - - private final CacheDispatcher mCacheDispatcher; - - WaitingRequestManager(CacheDispatcher cacheDispatcher) { - mCacheDispatcher = cacheDispatcher; - } - - /** Request received a valid response that can be used by other waiting requests. */ - @Override - public void onResponseReceived(Request request, Response response) { - if (response.cacheEntry == null || response.cacheEntry.isExpired()) { - onNoUsableResponseReceived(request); - return; - } - String cacheKey = request.getCacheKey(); - List> waitingRequests; - synchronized (this) { - waitingRequests = mWaitingRequests.remove(cacheKey); - } - if (waitingRequests != null) { - if (VolleyLog.DEBUG) { - VolleyLog.v( - "Releasing %d waiting requests for cacheKey=%s.", - waitingRequests.size(), cacheKey); - } - // Process all queued up requests. - for (Request waiting : waitingRequests) { - mCacheDispatcher.mDelivery.postResponse(waiting, response); - } - } - } - - /** No valid response received from network, release waiting requests. */ - @Override - public synchronized void onNoUsableResponseReceived(Request request) { - String cacheKey = request.getCacheKey(); - List> waitingRequests = mWaitingRequests.remove(cacheKey); - if (waitingRequests != null && !waitingRequests.isEmpty()) { - if (VolleyLog.DEBUG) { - VolleyLog.v( - "%d waiting requests for cacheKey=%s; resend to network", - waitingRequests.size(), cacheKey); - } - Request nextInLine = waitingRequests.remove(0); - mWaitingRequests.put(cacheKey, waitingRequests); - nextInLine.setNetworkRequestCompleteListener(this); - try { - mCacheDispatcher.mNetworkQueue.put(nextInLine); - } catch (InterruptedException iex) { - VolleyLog.e("Couldn't add request to queue. %s", iex.toString()); - // Restore the interrupted status of the calling thread (i.e. NetworkDispatcher) - Thread.currentThread().interrupt(); - // Quit the current CacheDispatcher thread. - mCacheDispatcher.quit(); - } - } - } - - /** - * For cacheable requests, if a request for the same cache key is already in flight, add it - * to a queue to wait for that in-flight request to finish. - * - * @return whether the request was queued. If false, we should continue issuing the request - * over the network. If true, we should put the request on hold to be processed when the - * in-flight request finishes. - */ - private synchronized boolean maybeAddToWaitingRequests(Request request) { - String cacheKey = request.getCacheKey(); - // Insert request into stage if there's already a request with the same cache key - // in flight. - if (mWaitingRequests.containsKey(cacheKey)) { - // There is already a request in flight. Queue up. - List> stagedRequests = mWaitingRequests.get(cacheKey); - if (stagedRequests == null) { - stagedRequests = new ArrayList<>(); - } - request.addMarker("waiting-for-response"); - stagedRequests.add(request); - mWaitingRequests.put(cacheKey, stagedRequests); - if (VolleyLog.DEBUG) { - VolleyLog.d("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); - } - return true; - } else { - // Insert 'null' queue for this cacheKey, indicating there is now a request in - // flight. - mWaitingRequests.put(cacheKey, null); - request.setNetworkRequestCompleteListener(this); - if (VolleyLog.DEBUG) { - VolleyLog.d("new request, sending to network %s", cacheKey); - } - return false; - } - } - } } diff --git a/src/main/java/com/android/volley/RequestQueue.java b/src/main/java/com/android/volley/RequestQueue.java index c127c7f1..6db0b1cc 100644 --- a/src/main/java/com/android/volley/RequestQueue.java +++ b/src/main/java/com/android/volley/RequestQueue.java @@ -263,13 +263,17 @@ public Request add(Request request) { request.addMarker("add-to-queue"); sendRequestEvent(request, RequestEvent.REQUEST_QUEUED); + beginRequest(request); + return request; + } + + void beginRequest(Request request) { // If the request is uncacheable, skip the cache queue and go straight to the network. if (!request.shouldCache()) { - mNetworkQueue.add(request); - return request; + sendRequestOverNetwork(request); + } else { + mCacheQueue.add(request); } - mCacheQueue.add(request); - return request; } /** @@ -327,4 +331,12 @@ public void removeRequestFinishedListener(RequestFinishedListener listene mFinishedListeners.remove(listener); } } + + public ResponseDelivery getResponseDelivery() { + return mDelivery; + } + + void sendRequestOverNetwork(Request request) { + mNetworkQueue.add(request); + } } diff --git a/src/main/java/com/android/volley/RequestTask.java b/src/main/java/com/android/volley/RequestTask.java new file mode 100644 index 00000000..8eeaf2c5 --- /dev/null +++ b/src/main/java/com/android/volley/RequestTask.java @@ -0,0 +1,15 @@ +package com.android.volley; + +/** Abstract runnable that's a task to be completed by the RequestQueue. */ +public abstract class RequestTask implements Runnable { + final Request mRequest; + + public RequestTask(Request request) { + mRequest = request; + } + + @SuppressWarnings("unchecked") + public int compareTo(RequestTask other) { + return mRequest.compareTo((Request) other.mRequest); + } +} diff --git a/src/main/java/com/android/volley/WaitingRequestManager.java b/src/main/java/com/android/volley/WaitingRequestManager.java new file mode 100644 index 00000000..682e3399 --- /dev/null +++ b/src/main/java/com/android/volley/WaitingRequestManager.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.volley; + +import androidx.annotation.NonNull; +import androidx.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Callback to notify the caller when the network request returns. Valid responses can be used by + * all duplicate requests. + */ +class WaitingRequestManager implements Request.NetworkRequestCompleteListener { + + /** + * Staging area for requests that already have a duplicate request in flight. + * + *
    + *
  • containsKey(cacheKey) indicates that there is a request in flight for the given cache + * key. + *
  • get(cacheKey) returns waiting requests for the given cache key. The in flight request + * is not contained in that list. Is null if no requests are staged. + *
+ */ + private final Map>> mWaitingRequests = new HashMap<>(); + + private final ResponseDelivery mResponseDelivery; + + /** + * RequestQueue that is passed in by the AsyncRequestQueue. This is null when this instance is + * initialized by the {@link CacheDispatcher} + */ + @Nullable private final RequestQueue mRequestQueue; + + /** + * CacheDispacter that is passed in by the CacheDispatcher. This is null when this instance is + * initialized by the {@link AsyncRequestQueue} + */ + @Nullable private final CacheDispatcher mCacheDispatcher; + + /** + * BlockingQueue that is passed in by the CacheDispatcher. This is null when this instance is + * initialized by the {@link AsyncRequestQueue} + */ + @Nullable private final BlockingQueue> mNetworkQueue; + + WaitingRequestManager(@NonNull RequestQueue requestQueue) { + mRequestQueue = requestQueue; + mResponseDelivery = mRequestQueue.getResponseDelivery(); + mCacheDispatcher = null; + mNetworkQueue = null; + } + + WaitingRequestManager( + @NonNull CacheDispatcher cacheDispatcher, + @NonNull BlockingQueue> networkQueue, + ResponseDelivery responseDelivery) { + mRequestQueue = null; + mResponseDelivery = responseDelivery; + mCacheDispatcher = cacheDispatcher; + mNetworkQueue = networkQueue; + } + + /** Request received a valid response that can be used by other waiting requests. */ + @Override + public void onResponseReceived(Request request, Response response) { + if (response.cacheEntry == null || response.cacheEntry.isExpired()) { + onNoUsableResponseReceived(request); + return; + } + String cacheKey = request.getCacheKey(); + List> waitingRequests; + synchronized (this) { + waitingRequests = mWaitingRequests.remove(cacheKey); + } + if (waitingRequests != null) { + if (VolleyLog.DEBUG) { + VolleyLog.v( + "Releasing %d waiting requests for cacheKey=%s.", + waitingRequests.size(), cacheKey); + } + // Process all queued up requests. + for (Request waiting : waitingRequests) { + mResponseDelivery.postResponse(waiting, response); + } + } + } + + /** No valid response received from network, release waiting requests. */ + @Override + public synchronized void onNoUsableResponseReceived(Request request) { + String cacheKey = request.getCacheKey(); + List> waitingRequests = mWaitingRequests.remove(cacheKey); + if (waitingRequests != null && !waitingRequests.isEmpty()) { + if (VolleyLog.DEBUG) { + VolleyLog.v( + "%d waiting requests for cacheKey=%s; resend to network", + waitingRequests.size(), cacheKey); + } + Request nextInLine = waitingRequests.remove(0); + mWaitingRequests.put(cacheKey, waitingRequests); + nextInLine.setNetworkRequestCompleteListener(this); + // RequestQueue will be non-null if this instance was created in AsyncRequestQueue. + if (mRequestQueue != null) { + // Will send the network request from the RequestQueue. + mRequestQueue.sendRequestOverNetwork(nextInLine); + } else if (mCacheDispatcher != null && mNetworkQueue != null) { + // If we're not using the AsyncRequestQueue, then submit it to the network queue. + try { + mNetworkQueue.put(nextInLine); + } catch (InterruptedException iex) { + VolleyLog.e("Couldn't add request to queue. %s", iex.toString()); + // Restore the interrupted status of the calling thread (i.e. NetworkDispatcher) + Thread.currentThread().interrupt(); + // Quit the current CacheDispatcher thread. + mCacheDispatcher.quit(); + } + } + } + } + + /** + * For cacheable requests, if a request for the same cache key is already in flight, add it to a + * queue to wait for that in-flight request to finish. + * + * @return whether the request was queued. If false, we should continue issuing the request over + * the network. If true, we should put the request on hold to be processed when the + * in-flight request finishes. + */ + synchronized boolean maybeAddToWaitingRequests(Request request) { + String cacheKey = request.getCacheKey(); + // Insert request into stage if there's already a request with the same cache key + // in flight. + if (mWaitingRequests.containsKey(cacheKey)) { + // There is already a request in flight. Queue up. + List> stagedRequests = mWaitingRequests.get(cacheKey); + if (stagedRequests == null) { + stagedRequests = new ArrayList<>(); + } + request.addMarker("waiting-for-response"); + stagedRequests.add(request); + mWaitingRequests.put(cacheKey, stagedRequests); + if (VolleyLog.DEBUG) { + VolleyLog.d("Request for cacheKey=%s is in flight, putting on hold.", cacheKey); + } + return true; + } else { + // Insert 'null' queue for this cacheKey, indicating there is now a request in + // flight. + mWaitingRequests.put(cacheKey, null); + request.setNetworkRequestCompleteListener(this); + if (VolleyLog.DEBUG) { + VolleyLog.d("new request, sending to network %s", cacheKey); + } + return false; + } + } +} diff --git a/src/main/java/com/android/volley/cronet/CronetHttpStack.java b/src/main/java/com/android/volley/cronet/CronetHttpStack.java index fcb2b634..c950ac0f 100644 --- a/src/main/java/com/android/volley/cronet/CronetHttpStack.java +++ b/src/main/java/com/android/volley/cronet/CronetHttpStack.java @@ -22,6 +22,7 @@ import com.android.volley.AuthFailureError; import com.android.volley.Header; import com.android.volley.Request; +import com.android.volley.RequestTask; import com.android.volley.toolbox.AsyncHttpStack; import com.android.volley.toolbox.ByteArrayPool; import com.android.volley.toolbox.HttpResponse; @@ -145,20 +146,38 @@ public void onFailed( .setPriority(getPriority(request)); // request.getHeaders() may be blocking, so submit it to the blocking executor. getBlockingExecutor() - .execute( - new Runnable() { - @Override - public void run() { - try { - setHttpMethod(request, builder); - setRequestHeaders(request, additionalHeaders, builder); - UrlRequest urlRequest = builder.build(); - urlRequest.start(); - } catch (AuthFailureError authFailureError) { - callback.onAuthError(authFailureError); - } - } - }); + .execute(new SetUpRequestTask<>(request, builder, additionalHeaders, callback)); + } + + private class SetUpRequestTask extends RequestTask { + UrlRequest.Builder builder; + Map additionalHeaders; + OnRequestComplete callback; + Request request; + + SetUpRequestTask( + Request request, + UrlRequest.Builder builder, + Map additionalHeaders, + OnRequestComplete callback) { + super(request); + this.builder = builder; + this.additionalHeaders = additionalHeaders; + this.callback = callback; + this.request = request; + } + + @Override + public void run() { + try { + setHttpMethod(request, builder); + setRequestHeaders(request, additionalHeaders, builder); + UrlRequest urlRequest = builder.build(); + urlRequest.start(); + } catch (AuthFailureError authFailureError) { + callback.onAuthError(authFailureError); + } + } } @VisibleForTesting diff --git a/src/main/java/com/android/volley/toolbox/BasicAsyncNetwork.java b/src/main/java/com/android/volley/toolbox/BasicAsyncNetwork.java index 67492dd7..cbc3c843 100644 --- a/src/main/java/com/android/volley/toolbox/BasicAsyncNetwork.java +++ b/src/main/java/com/android/volley/toolbox/BasicAsyncNetwork.java @@ -26,6 +26,7 @@ import com.android.volley.Header; import com.android.volley.NetworkResponse; import com.android.volley.Request; +import com.android.volley.RequestTask; import com.android.volley.VolleyError; import java.io.IOException; import java.io.InputStream; @@ -87,31 +88,16 @@ private void onRequestSucceeded( // a byte array, so we need to submit a blocking task to copy the response from the // InputStream instead. final InputStream inputStream = httpResponse.getContent(); - Runnable run = - new Runnable() { - @Override - public void run() { - byte[] finalResponseContents; - try { - finalResponseContents = - NetworkUtility.inputStreamToBytes( - inputStream, httpResponse.getContentLength(), mPool); - } catch (IOException e) { - onRequestFailed( - request, callback, e, requestStartMs, httpResponse, null); - return; - } - onResponseRead( - requestStartMs, - statusCode, + getBlockingExecutor() + .execute( + new ResponseParsingTask<>( + inputStream, httpResponse, request, callback, + requestStartMs, responseHeaders, - finalResponseContents); - } - }; - getBlockingExecutor().execute(run); + statusCode)); } /* Method to be called after a failed network request */ @@ -203,6 +189,55 @@ private void onResponseRead( responseHeaders)); } + private class ResponseParsingTask extends RequestTask { + InputStream inputStream; + HttpResponse httpResponse; + Request request; + OnRequestComplete callback; + long requestStartMs; + List
responseHeaders; + int statusCode; + + ResponseParsingTask( + InputStream inputStream, + HttpResponse httpResponse, + Request request, + OnRequestComplete callback, + long requestStartMs, + List
responseHeaders, + int statusCode) { + super(request); + this.inputStream = inputStream; + this.httpResponse = httpResponse; + this.request = request; + this.callback = callback; + this.requestStartMs = requestStartMs; + this.responseHeaders = responseHeaders; + this.statusCode = statusCode; + } + + @Override + public void run() { + byte[] finalResponseContents; + try { + finalResponseContents = + NetworkUtility.inputStreamToBytes( + inputStream, httpResponse.getContentLength(), mPool); + } catch (IOException e) { + onRequestFailed(request, callback, e, requestStartMs, httpResponse, null); + return; + } + onResponseRead( + requestStartMs, + statusCode, + httpResponse, + request, + callback, + responseHeaders, + finalResponseContents); + } + } + /** * Builder is used to build an instance of {@link BasicAsyncNetwork} from values configured by * the setters. diff --git a/src/main/java/com/android/volley/toolbox/NoAsyncCache.java b/src/main/java/com/android/volley/toolbox/NoAsyncCache.java new file mode 100644 index 00000000..aa4aeead --- /dev/null +++ b/src/main/java/com/android/volley/toolbox/NoAsyncCache.java @@ -0,0 +1,37 @@ +package com.android.volley.toolbox; + +import com.android.volley.AsyncCache; +import com.android.volley.Cache; + +/** An AsyncCache that doesn't cache anything. */ +public class NoAsyncCache extends AsyncCache { + @Override + public void get(String key, OnGetCompleteCallback callback) { + callback.onGetComplete(null); + } + + @Override + public void put(String key, Cache.Entry entry, OnWriteCompleteCallback callback) { + callback.onWriteComplete(); + } + + @Override + public void clear(OnWriteCompleteCallback callback) { + callback.onWriteComplete(); + } + + @Override + public void initialize(OnWriteCompleteCallback callback) { + callback.onWriteComplete(); + } + + @Override + public void invalidate(String key, boolean fullExpire, OnWriteCompleteCallback callback) { + callback.onWriteComplete(); + } + + @Override + public void remove(String key, OnWriteCompleteCallback callback) { + callback.onWriteComplete(); + } +} diff --git a/src/main/java/com/android/volley/toolbox/ThrowingCache.java b/src/main/java/com/android/volley/toolbox/ThrowingCache.java new file mode 100644 index 00000000..d51af5de --- /dev/null +++ b/src/main/java/com/android/volley/toolbox/ThrowingCache.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2020 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.volley.toolbox; + +import androidx.annotation.RestrictTo; +import com.android.volley.Cache; + +/** A cache that throws an error if a method is called. */ +@RestrictTo(RestrictTo.Scope.LIBRARY) +public class ThrowingCache implements Cache { + @Override + public Entry get(String key) { + throw new UnsupportedOperationException(); + } + + @Override + public void put(String key, Entry entry) { + throw new UnsupportedOperationException(); + } + + @Override + public void initialize() { + throw new UnsupportedOperationException(); + } + + @Override + public void invalidate(String key, boolean fullExpire) { + throw new UnsupportedOperationException(); + } + + @Override + public void remove(String key) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } +} diff --git a/src/test/java/com/android/volley/AsyncRequestQueueTest.java b/src/test/java/com/android/volley/AsyncRequestQueueTest.java new file mode 100644 index 00000000..05ecf133 --- /dev/null +++ b/src/test/java/com/android/volley/AsyncRequestQueueTest.java @@ -0,0 +1,156 @@ +/* + * Copyright (C) 2011 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.volley; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +import com.android.volley.mock.ShadowSystemClock; +import com.android.volley.toolbox.NoAsyncCache; +import com.android.volley.toolbox.StringRequest; +import com.android.volley.utils.ImmediateResponseDelivery; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.robolectric.RobolectricTestRunner; +import org.robolectric.annotation.Config; + +/** Unit tests for AsyncRequestQueue, with all dependencies mocked out */ +@RunWith(RobolectricTestRunner.class) +@Config(shadows = {ShadowSystemClock.class}) +public class AsyncRequestQueueTest { + + @Mock private AsyncNetwork mMockNetwork; + private AsyncRequestQueue queue; + + @Before + public void setUp() throws Exception { + ResponseDelivery mDelivery = new ImmediateResponseDelivery(); + initMocks(this); + queue = + new AsyncRequestQueue.Builder(mMockNetwork) + .setAsyncCache(new NoAsyncCache()) + .setResponseDelivery(mDelivery) + .setExecutorFactory( + new AsyncRequestQueue.ExecutorFactory() { + @Override + public ExecutorService createNonBlockingExecutor( + BlockingQueue taskQueue) { + return MoreExecutors.newDirectExecutorService(); + } + + @Override + public ExecutorService createBlockingExecutor( + BlockingQueue taskQueue) { + return MoreExecutors.newDirectExecutorService(); + } + }) + .build(); + } + + @Test + public void cancelAll_onlyCorrectTag() throws Exception { + queue.start(); + Object tagA = new Object(); + Object tagB = new Object(); + StringRequest req1 = mock(StringRequest.class); + when(req1.getTag()).thenReturn(tagA); + StringRequest req2 = mock(StringRequest.class); + when(req2.getTag()).thenReturn(tagB); + StringRequest req3 = mock(StringRequest.class); + when(req3.getTag()).thenReturn(tagA); + StringRequest req4 = mock(StringRequest.class); + when(req4.getTag()).thenReturn(tagA); + + queue.add(req1); // A + queue.add(req2); // B + queue.add(req3); // A + queue.cancelAll(tagA); + queue.add(req4); // A + + verify(req1).cancel(); // A cancelled + verify(req3).cancel(); // A cancelled + verify(req2, never()).cancel(); // B not cancelled + verify(req4, never()).cancel(); // A added after cancel not cancelled + queue.stop(); + } + + @Test + public void add_notifiesListener() throws Exception { + RequestQueue.RequestEventListener listener = mock(RequestQueue.RequestEventListener.class); + queue.start(); + queue.addRequestEventListener(listener); + StringRequest req = mock(StringRequest.class); + + queue.add(req); + + verify(listener).onRequestEvent(req, RequestQueue.RequestEvent.REQUEST_QUEUED); + verifyNoMoreInteractions(listener); + queue.stop(); + } + + @Test + public void finish_notifiesListener() throws Exception { + RequestQueue.RequestEventListener listener = mock(RequestQueue.RequestEventListener.class); + queue.start(); + queue.addRequestEventListener(listener); + StringRequest req = mock(StringRequest.class); + + queue.finish(req); + + verify(listener).onRequestEvent(req, RequestQueue.RequestEvent.REQUEST_FINISHED); + verifyNoMoreInteractions(listener); + queue.stop(); + } + + @Test + public void sendRequestEvent_notifiesListener() throws Exception { + StringRequest req = mock(StringRequest.class); + RequestQueue.RequestEventListener listener = mock(RequestQueue.RequestEventListener.class); + queue.start(); + queue.addRequestEventListener(listener); + + queue.sendRequestEvent(req, RequestQueue.RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED); + + verify(listener) + .onRequestEvent(req, RequestQueue.RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED); + verifyNoMoreInteractions(listener); + queue.stop(); + } + + @Test + public void removeRequestEventListener_removesListener() throws Exception { + StringRequest req = mock(StringRequest.class); + RequestQueue.RequestEventListener listener = mock(RequestQueue.RequestEventListener.class); + queue.start(); + queue.addRequestEventListener(listener); + queue.removeRequestEventListener(listener); + + queue.sendRequestEvent(req, RequestQueue.RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED); + + verifyNoMoreInteractions(listener); + queue.stop(); + } +}