001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.util; 020 021import java.io.File; 022import java.io.IOException; 023import java.lang.ref.WeakReference; 024import java.nio.file.FileSystem; 025import java.nio.file.Files; 026import java.nio.file.NoSuchFileException; 027import java.nio.file.Path; 028import static java.nio.file.StandardWatchEventKinds.*; 029import java.nio.file.WatchKey; 030import java.nio.file.WatchService; 031import java.time.Instant; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.List; 035import java.util.Map; 036import java.util.Optional; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040import java.util.stream.Collectors; 041import java.util.stream.StreamSupport; 042import org.jgrapes.core.Channel; 043import org.jgrapes.core.Component; 044import org.jgrapes.core.Components; 045import org.jgrapes.core.Event; 046import org.jgrapes.core.Manager; 047import org.jgrapes.core.annotation.Handler; 048import org.jgrapes.util.events.FileChanged; 049import org.jgrapes.util.events.WatchFile; 050 051/** 052 * A component that watches paths in the file system for changes 053 * and sends events if such changes occur. 054 */ 055@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 056public class FileSystemWatcher extends Component { 057 058 @SuppressWarnings("PMD.FieldNamingConventions") 059 protected static final Logger logger 060 = Logger.getLogger(FileSystemWatcher.class.getName()); 061 062 private final WatcherRegistry watcherRegistry = new WatcherRegistry(); 063 private final Map<Path, DirectorySubscription> subscriptions 064 = new ConcurrentHashMap<>(); 065 066 /** 067 * Creates a new component base with its channel set to 068 * itself. 069 */ 070 public FileSystemWatcher() { 071 super(); 072 } 073 074 /** 075 * Creates a new component base with its channel set to the given 076 * channel. As a special case {@link Channel#SELF} can be 077 * passed to the constructor to make the component use itself 078 * as channel. The special value is necessary as you 079 * obviously cannot pass an object to be constructed to its 080 * constructor. 081 * 082 * @param componentChannel the channel that the component's 083 * handlers listen on by default and that 084 * {@link Manager#fire(Event, Channel...)} sends the event to 085 */ 086 public FileSystemWatcher(Channel componentChannel) { 087 super(componentChannel); 088 } 089 090 /** 091 * Register a path to wath. Subsequent {@link FileChanged} 092 * events will be fire on the channel(s) on which the 093 * {@link WatchFile} event was fired. 094 * 095 * The channel is stored using a weak reference, so no explicit 096 * "clear watch" is required. 097 * 098 * @param event the event 099 * @param channel the channel 100 * @throws IOException if an I/O exception occurs 101 */ 102 @Handler 103 public void onWatchFile(WatchFile event, Channel channel) 104 throws IOException { 105 final Path path = event.path().toAbsolutePath(); 106 synchronized (subscriptions) { 107 addSubscription(path, channel); 108 } 109 } 110 111 private Subscription addSubscription(Path watched, Channel channel) { 112 var subs = new Subscription(watched, channel); 113 try { 114 // Using computeIfAbsent causes recursive update 115 var watcher = subscriptions.get(watched.getParent()); 116 if (watcher == null) { 117 watcher = watcherRegistry.register(watched.getParent()); 118 } 119 watcher.add(subs); 120 if (Files.exists(watched)) { 121 Path real = watched.toRealPath(); 122 if (!real.equals(watched)) { 123 addSubscription(real, channel).linkedFrom(subs); 124 } 125 } 126 } catch (IOException e) { 127 logger.log(Level.WARNING, e, 128 () -> "Cannot watch: " + e.getMessage()); 129 } 130 return subs; 131 } 132 133 private void handleWatchEvent(Path directory) { 134 Optional.ofNullable(subscriptions.get(directory)) 135 .ifPresent(DirectorySubscription::directoryChanged); 136 } 137 138 /** 139 * The Class WatcherRegistry. 140 */ 141 private final class WatcherRegistry { 142 private final Map<FileSystem, Watcher> watchers 143 = new ConcurrentHashMap<>(); 144 145 private Watcher watcher(Path path) { 146 @SuppressWarnings("PMD.CloseResource") 147 Watcher watcher = watchers.get(path.getFileSystem()); 148 if (watcher == null) { 149 try { 150 watcher = new Watcher(path.getFileSystem()); 151 watchers.put(path.getFileSystem(), watcher); 152 } catch (IOException e) { 153 logger.log(Level.WARNING, e, 154 () -> "Cannot get watch service: " + e.getMessage()); 155 return null; 156 } 157 } 158 return watcher; 159 } 160 161 /** 162 * Register. 163 * 164 * @param toWatch the to watch 165 * @return the directory subscription 166 */ 167 public DirectorySubscription register(Path toWatch) { 168 Watcher watcher = watcher(toWatch); 169 if (watcher == null) { 170 return null; 171 } 172 try { 173 var watcherRef = new DirectorySubscription( 174 toWatch.register(watcher.watchService, 175 ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)); 176 subscriptions.put(toWatch, watcherRef); 177 return watcherRef; 178 } catch (IOException e) { 179 logger.log(Level.WARNING, e, 180 () -> "Cannot watch: " + e.getMessage()); 181 } 182 return null; 183 } 184 185 } 186 187 /** 188 * The Class Watcher. 189 */ 190 private final class Watcher { 191 private final WatchService watchService; 192 193 private Watcher(FileSystem fileSystem) throws IOException { 194 watchService = fileSystem.newWatchService(); 195 var roots = StreamSupport 196 .stream(fileSystem.getRootDirectories().spliterator(), false) 197 .map(Path::toString) 198 .collect(Collectors.joining(File.pathSeparator)); 199 (Components.useVirtualThreads() ? Thread.ofVirtual() 200 : Thread.ofPlatform()).name(roots + " watcher") 201 .start(() -> { 202 while (true) { 203 try { 204 WatchKey key = watchService.take(); 205 // Events have to be consumed 206 key.pollEvents(); 207 if (!(key.watchable() instanceof Path)) { 208 key.reset(); 209 continue; 210 } 211 handleWatchEvent((Path) key.watchable()); 212 key.reset(); 213 } catch (InterruptedException e) { 214 logger.log(Level.WARNING, e, 215 () -> "No WatchKey: " + e.getMessage()); 216 } 217 } 218 }); 219 } 220 } 221 222 /** 223 * The Class DirectorySubscription. 224 */ 225 private class DirectorySubscription { 226 private final WatchKey watchKey; 227 private final List<Subscription> watched; 228 229 /** 230 * Instantiates a new directory watcher. 231 * 232 * @param watchKey the watch key 233 */ 234 public DirectorySubscription(WatchKey watchKey) { 235 this.watchKey = watchKey; 236 watched = Collections.synchronizedList(new ArrayList<>()); 237 } 238 239 /** 240 * Adds the subscription. 241 * 242 * @param subs the subs 243 */ 244 public void add(Subscription subs) { 245 watched.add(subs); 246 } 247 248 /** 249 * Removes the subscription. 250 * 251 * @param subs the subs 252 */ 253 public void remove(Subscription subs) { 254 watched.remove(subs); 255 if (watched.isEmpty()) { 256 subscriptions.remove(subs.directory()); 257 watchKey.cancel(); 258 } 259 260 } 261 262 /** 263 * Directory changed. 264 */ 265 public void directoryChanged() { 266 // Prevent concurrent modification exception 267 List.copyOf(watched).forEach(Subscription::handleChange); 268 } 269 } 270 271 /** 272 * The Class Registree. 273 */ 274 private class Subscription { 275 private WeakReference<Channel> notifyOn; 276 private final Path path; 277 private Subscription linkedFrom; 278 private Subscription linksTo; 279 private Instant lastModified; 280 281 /** 282 * Instantiates a new subscription. 283 * 284 * @param path the path 285 * @param notifyOn the notify on 286 */ 287 @SuppressWarnings("PMD.UseVarargs") 288 public Subscription(Path path, Channel notifyOn) { 289 this.notifyOn = new WeakReference<>(notifyOn); 290 this.path = path; 291 updateLastModified(); 292 } 293 294 /** 295 * Return the directoy of this subscription's path. 296 * 297 * @return the path 298 */ 299 public Path directory() { 300 return path.getParent(); 301 } 302 303 /** 304 * Linked from. 305 * 306 * @param symLinkSubs the sym link subs 307 * @return the subscription 308 */ 309 public Subscription linkedFrom(Subscription symLinkSubs) { 310 linkedFrom = symLinkSubs; 311 symLinkSubs.linksTo = this; 312 notifyOn = null; 313 return this; 314 } 315 316 /** 317 * Removes the subscription. 318 */ 319 public void remove() { 320 synchronized (subscriptions) { 321 if (linksTo != null) { 322 linksTo.remove(); 323 } 324 var directory = path.getParent(); 325 var watchInfo = subscriptions.get(directory); 326 if (watchInfo == null) { 327 // Shouldn't happen, but... 328 return; 329 } 330 watchInfo.remove(this); 331 } 332 } 333 334 private void updateLastModified() { 335 try { 336 if (!Files.exists(path)) { 337 lastModified = null; 338 return; 339 } 340 lastModified = Files.getLastModifiedTime(path).toInstant(); 341 } catch (NoSuchFileException e) { 342 // There's a race condition here. 343 lastModified = null; 344 } catch (IOException e) { 345 logger.log(Level.WARNING, e, 346 () -> "Cannot get modified time: " + e.getMessage()); 347 } 348 } 349 350 /** 351 * Handle change. 352 */ 353 private void handleChange() { 354 Subscription watched = Optional.ofNullable(linkedFrom).orElse(this); 355 356 // Check if channel is still valid 357 Channel channel = watched.notifyOn.get(); 358 if (channel == null) { 359 watched.remove(); 360 return; 361 } 362 363 // Evaluate change from the perspective of "watched" 364 Instant prevModified = watched.lastModified; 365 watched.updateLastModified(); 366 if (prevModified == null) { 367 // Check if created 368 if (watched.lastModified != null) { 369 // Yes, created. 370 fire(new FileChanged(watched.path, 371 FileChanged.Kind.CREATED), channel); 372 checkLink(watched, channel); 373 } 374 return; 375 } 376 377 // File has existed (prevModified != null) 378 if (watched.lastModified == null) { 379 // ... but is now deleted 380 if (watched.linksTo != null) { 381 watched.linksTo.remove(); 382 } 383 fire(new FileChanged(watched.path, FileChanged.Kind.DELETED), 384 channel); 385 return; 386 } 387 388 // Check if modified 389 if (!prevModified.equals(watched.lastModified)) { 390 fire(new FileChanged(watched.path, FileChanged.Kind.MODIFIED), 391 channel); 392 checkLink(watched, channel); 393 } 394 } 395 396 private void checkLink(Subscription watched, Channel channel) { 397 try { 398 Path curTarget = watched.path.toRealPath(); 399 if (!curTarget.equals(watched.path)) { 400 // watched is symbolic link 401 if (watched.linksTo == null) { 402 addSubscription(curTarget, channel).linkedFrom(watched); 403 return; 404 } 405 if (!watched.linksTo.path.equals(curTarget)) { 406 // Link target has changed 407 watched.linksTo.remove(); 408 addSubscription(curTarget, channel).linkedFrom(watched); 409 } 410 411 } 412 } catch (IOException e) { // NOPMD 413 // Race condition, target deleted? 414 } 415 } 416 } 417}