001// License: GPL. For details, see LICENSE file.
002package org.openstreetmap.josm.data.cache;
003
004import java.io.IOException;
005import java.net.URL;
006import java.util.Iterator;
007import java.util.Map;
008import java.util.concurrent.ConcurrentHashMap;
009import java.util.concurrent.LinkedBlockingDeque;
010import java.util.concurrent.Semaphore;
011import java.util.concurrent.TimeUnit;
012
013import org.openstreetmap.josm.Main;
014
015/**
016 * @author Wiktor Niesiobędzki
017 *
018 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task
019 * and it will set a runnable task with semaphore release, when job has finished.
020 *
021 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't
022 * guarantee that all threads will be busy, when there is work for them[2].
023 *
024 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus
025 *     tasks do not go through the Queue
026 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread
027 *     take the first available job and wait for semaphore. It might be the case, that semaphore was released
028 *     for some task further in queue, but this implementation doesn't try to detect such situation
029 *
030 *
031 */
032public class HostLimitQueue extends LinkedBlockingDeque<Runnable> {
033    private static final long serialVersionUID = 1L;
034
035    private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>();
036    private final int hostLimit;
037
038    /**
039     * Creates an unbounded queue
040     * @param hostLimit how many parallel calls to host to allow
041     */
042    public HostLimitQueue(int hostLimit) {
043        super(); // create unbounded queue
044        this.hostLimit = hostLimit;
045    }
046
047    private JCSCachedTileLoaderJob<?, ?> findJob() {
048        for (Iterator<Runnable> it = iterator(); it.hasNext();) {
049            Runnable r = it.next();
050            if (r instanceof JCSCachedTileLoaderJob) {
051                JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r;
052                if (tryAcquireSemaphore(job)) {
053                    if (remove(job)) {
054                        return job;
055                    } else {
056                        // we have acquired the semaphore, but we didn't manage to remove job, as someone else did
057                        // release the semaphore and look for another candidate
058                        releaseSemaphore(job);
059                    }
060                } else {
061                    URL url = null;
062                    try {
063                        url = job.getUrl();
064                    } catch (IOException e) {
065                        Main.debug(e);
066                    }
067                    Main.debug("TMS - Skipping job {0} because host limit reached", url);
068                }
069            }
070        }
071        return null;
072    }
073
074    @Override
075    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
076        Runnable job = findJob();
077        if (job != null) {
078            return job;
079        }
080        job = pollFirst(timeout, unit);
081        if (job != null) {
082            acquireSemaphore(job);
083        }
084        return job;
085    }
086
087    @Override
088    public Runnable take() throws InterruptedException {
089        Runnable job = findJob();
090        if (job != null) {
091            return job;
092        }
093        job = takeFirst();
094        if (job != null) {
095            acquireSemaphore(job);
096        }
097        return job;
098    }
099
100    private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
101        String host;
102        try {
103            host = job.getUrl().getHost();
104        } catch (IOException e) {
105            // do not pass me illegal URL's
106            throw new IllegalArgumentException(e);
107        }
108        Semaphore limit = hostSemaphores.get(host);
109        if (limit == null) {
110            synchronized (hostSemaphores) {
111                limit = hostSemaphores.get(host);
112                if (limit == null) {
113                    limit = new Semaphore(hostLimit);
114                    hostSemaphores.put(host, limit);
115                }
116            }
117        }
118        return limit;
119    }
120
121    private void acquireSemaphore(Runnable job) throws InterruptedException {
122        if (job instanceof JCSCachedTileLoaderJob) {
123            final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job;
124            Semaphore limit = getSemaphore(jcsJob);
125            if (limit != null) {
126                limit.acquire();
127                jcsJob.setFinishedTask(new Runnable() {
128                    @Override
129                    public void run() {
130                        releaseSemaphore(jcsJob);
131                    }
132                });
133            }
134        }
135    }
136
137    private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) {
138        boolean ret = true;
139        Semaphore limit = getSemaphore(job);
140        if (limit != null) {
141            ret = limit.tryAcquire();
142            if (ret) {
143                job.setFinishedTask(new Runnable() {
144                    @Override
145                    public void run() {
146                        releaseSemaphore(job);
147                    }
148                });
149            }
150        }
151        return ret;
152    }
153
154    private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) {
155        Semaphore limit = getSemaphore(job);
156        if (limit != null) {
157            limit.release();
158            if (limit.availablePermits() > hostLimit) {
159                Main.warn("More permits than it should be");
160            }
161        }
162    }
163}