001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2017-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.io.util; 020 021import java.lang.ref.WeakReference; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.concurrent.Semaphore; 027 028/** 029 * A class that manages a set of permits and notifies listeners 030 * about changes of availability. 031 * 032 * Listeners are added using {@link WeakReference}, so removing 033 * them isn't strictly necessary. 034 */ 035public class PermitsPool { 036 037 private final MySemaphore delegee; 038 private final List<WeakReference<AvailabilityListener>> listeners 039 = new LinkedList<>(); 040 private boolean lastNotification = true; 041 042 /** 043 * A variant of {@link Semaphore}. 044 */ 045 private static class MySemaphore extends Semaphore { 046 private static final long serialVersionUID = 8758302721594300704L; 047 048 /** 049 * Instantiates a new semaphore. 050 * 051 * @param permits the number of permits 052 */ 053 public MySemaphore(int permits) { 054 super(permits); 055 } 056 057 @Override 058 @SuppressWarnings("PMD.UselessOverridingMethod") 059 public void reducePermits(int reduction) { 060 super.reducePermits(reduction); 061 } 062 } 063 064 /** 065 * Instantiates a new permits pool. 066 * 067 * @param permits the permits 068 */ 069 public PermitsPool(int permits) { 070 delegee = new MySemaphore(permits); 071 } 072 073 /** 074 * Returns the number of currently available permits. 075 * 076 * @return the result 077 */ 078 public int availablePermits() { 079 return delegee.availablePermits(); 080 } 081 082 /** 083 * Adds the given number of permits to the pool. 084 * 085 * @param permits the number of permits to add 086 * @return the permits pool 087 */ 088 public PermitsPool augmentPermits(int permits) { 089 delegee.release(permits); 090 return this; 091 } 092 093 /** 094 * Remove the given number of permits from the pool. 095 * 096 * @param permits the number of permits to remove 097 * @return the permits pool 098 */ 099 public PermitsPool reducePermits(int permits) { 100 delegee.reducePermits(permits); 101 return this; 102 } 103 104 /** 105 * Adds an AvailabilityListener. 106 * 107 * @param listener the AvailabilityListener 108 * @return the permits pool 109 */ 110 public PermitsPool addListener(AvailabilityListener listener) { 111 synchronized (listeners) { 112 listeners.add(new WeakReference<>(listener)); 113 } 114 return this; 115 } 116 117 /** 118 * Removes the listener. 119 * 120 * @param listener the AvailabilityListener 121 * @return the permits pool 122 */ 123 public PermitsPool removeListener(AvailabilityListener listener) { 124 synchronized (listeners) { 125 for (Iterator<WeakReference<AvailabilityListener>> iter 126 = listeners.iterator(); iter.hasNext();) { 127 WeakReference<AvailabilityListener> item = iter.next(); 128 if (item.get() == null || item.get() == listener) { 129 iter.remove(); 130 } 131 } 132 } 133 return this; 134 } 135 136 private void notifyAvailabilityListeners() { 137 boolean available = availablePermits() > 0; 138 if (available == lastNotification) { 139 return; 140 } 141 lastNotification = available; 142 List<AvailabilityListener> copy = new ArrayList<>(); 143 synchronized (listeners) { 144 for (Iterator<WeakReference<AvailabilityListener>> iter 145 = listeners.iterator(); iter.hasNext();) { 146 WeakReference<AvailabilityListener> item = iter.next(); 147 AvailabilityListener listener = item.get(); 148 if (listener == null) { 149 iter.remove(); 150 continue; 151 } 152 copy.add(listener); 153 } 154 } 155 for (AvailabilityListener l : copy) { 156 l.availabilityChanged(this, available); 157 } 158 } 159 160 /** 161 * Release a previously obtained permit. 162 */ 163 public PermitsPool release() { 164 synchronized (this) { 165 delegee.release(); 166 notifyAvailabilityListeners(); 167 return this; 168 } 169 } 170 171 /** 172 * Acquire a permit, waiting until one becomes available. 173 * 174 * @return the permits pool 175 * @throws InterruptedException the interrupted exception 176 */ 177 public PermitsPool acquire() throws InterruptedException { 178 delegee.acquire(); 179 notifyAvailabilityListeners(); 180 return this; 181 } 182 183 /** 184 * Try to acquire a permit. 185 * 186 * @return `true` if successful 187 */ 188 public boolean tryAcquire() { 189 synchronized (this) { 190 boolean gotOne = delegee.tryAcquire(); 191 if (gotOne) { 192 notifyAvailabilityListeners(); 193 return true; 194 } 195 return false; 196 } 197 } 198 199}