001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.util;
020
021import java.io.File;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.nio.file.FileSystem;
025import java.nio.file.Files;
026import java.nio.file.NoSuchFileException;
027import java.nio.file.Path;
028import static java.nio.file.StandardWatchEventKinds.*;
029import java.nio.file.WatchKey;
030import java.nio.file.WatchService;
031import java.time.Instant;
032import java.util.ArrayList;
033import java.util.Collections;
034import java.util.List;
035import java.util.Map;
036import java.util.Optional;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040import java.util.stream.Collectors;
041import java.util.stream.StreamSupport;
042import org.jgrapes.core.Channel;
043import org.jgrapes.core.Component;
044import org.jgrapes.core.Components;
045import org.jgrapes.core.Event;
046import org.jgrapes.core.Manager;
047import org.jgrapes.core.annotation.Handler;
048import org.jgrapes.util.events.FileChanged;
049import org.jgrapes.util.events.WatchFile;
050
051/**
052 * A component that watches paths in the file system for changes
053 * and sends events if such changes occur. 
054 */
055@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
056public class FileSystemWatcher extends Component {
057
058    @SuppressWarnings("PMD.FieldNamingConventions")
059    protected static final Logger logger
060        = Logger.getLogger(FileSystemWatcher.class.getName());
061
062    private final WatcherRegistry watcherRegistry = new WatcherRegistry();
063    private final Map<Path, DirectorySubscription> subscriptions
064        = new ConcurrentHashMap<>();
065
066    /**
067     * Creates a new component base with its channel set to
068     * itself.
069     */
070    public FileSystemWatcher() {
071        super();
072    }
073
074    /**
075     * Creates a new component base with its channel set to the given 
076     * channel. As a special case {@link Channel#SELF} can be
077     * passed to the constructor to make the component use itself
078     * as channel. The special value is necessary as you 
079     * obviously cannot pass an object to be constructed to its 
080     * constructor.
081     *
082     * @param componentChannel the channel that the component's
083     * handlers listen on by default and that 
084     * {@link Manager#fire(Event, Channel...)} sends the event to
085     */
086    public FileSystemWatcher(Channel componentChannel) {
087        super(componentChannel);
088    }
089
090    /**
091     * Register a path to wath. Subsequent {@link FileChanged} 
092     * events will be fire on the channel(s) on which the
093     * {@link WatchFile} event was fired.
094     * 
095     * The channel is stored using a weak reference, so no explicit
096     * "clear watch" is required.
097     *
098     * @param event the event
099     * @param channel the channel
100     * @throws IOException if an I/O exception occurs
101     */
102    @Handler
103    public void onWatchFile(WatchFile event, Channel channel)
104            throws IOException {
105        final Path path = event.path().toAbsolutePath();
106        synchronized (subscriptions) {
107            addSubscription(path, channel);
108        }
109    }
110
111    private Subscription addSubscription(Path watched, Channel channel) {
112        var subs = new Subscription(watched, channel);
113        try {
114            // Using computeIfAbsent causes recursive update
115            var watcher = subscriptions.get(watched.getParent());
116            if (watcher == null) {
117                watcher = watcherRegistry.register(watched.getParent());
118            }
119            watcher.add(subs);
120            if (Files.exists(watched)) {
121                Path real = watched.toRealPath();
122                if (!real.equals(watched)) {
123                    addSubscription(real, channel).linkedFrom(subs);
124                }
125            }
126        } catch (IOException e) {
127            logger.log(Level.WARNING, e,
128                () -> "Cannot watch: " + e.getMessage());
129        }
130        return subs;
131    }
132
133    private void handleWatchEvent(Path directory) {
134        Optional.ofNullable(subscriptions.get(directory))
135            .ifPresent(DirectorySubscription::directoryChanged);
136    }
137
138    /**
139     * The Class WatcherRegistry.
140     */
141    private final class WatcherRegistry {
142        private final Map<FileSystem, Watcher> watchers
143            = new ConcurrentHashMap<>();
144
145        private Watcher watcher(Path path) {
146            @SuppressWarnings("PMD.CloseResource")
147            Watcher watcher = watchers.get(path.getFileSystem());
148            if (watcher == null) {
149                try {
150                    watcher = new Watcher(path.getFileSystem());
151                    watchers.put(path.getFileSystem(), watcher);
152                } catch (IOException e) {
153                    logger.log(Level.WARNING, e,
154                        () -> "Cannot get watch service: " + e.getMessage());
155                    return null;
156                }
157            }
158            return watcher;
159        }
160
161        /**
162         * Register.
163         *
164         * @param toWatch the to watch
165         * @return the directory subscription
166         */
167        public DirectorySubscription register(Path toWatch) {
168            Watcher watcher = watcher(toWatch);
169            if (watcher == null) {
170                return null;
171            }
172            try {
173                var watcherRef = new DirectorySubscription(
174                    toWatch.register(watcher.watchService,
175                        ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY));
176                subscriptions.put(toWatch, watcherRef);
177                return watcherRef;
178            } catch (IOException e) {
179                logger.log(Level.WARNING, e,
180                    () -> "Cannot watch: " + e.getMessage());
181            }
182            return null;
183        }
184
185    }
186
187    /**
188     * The Class Watcher.
189     */
190    private final class Watcher {
191        private final WatchService watchService;
192
193        private Watcher(FileSystem fileSystem) throws IOException {
194            watchService = fileSystem.newWatchService();
195            var roots = StreamSupport
196                .stream(fileSystem.getRootDirectories().spliterator(), false)
197                .map(Path::toString)
198                .collect(Collectors.joining(File.pathSeparator));
199            (Components.useVirtualThreads() ? Thread.ofVirtual()
200                : Thread.ofPlatform()).name(roots + " watcher")
201                    .start(() -> {
202                        while (true) {
203                            try {
204                                WatchKey key = watchService.take();
205                                // Events have to be consumed
206                                key.pollEvents();
207                                if (!(key.watchable() instanceof Path)) {
208                                    key.reset();
209                                    continue;
210                                }
211                                handleWatchEvent((Path) key.watchable());
212                                key.reset();
213                            } catch (InterruptedException e) {
214                                logger.log(Level.WARNING, e,
215                                    () -> "No WatchKey: " + e.getMessage());
216                            }
217                        }
218                    });
219        }
220    }
221
222    /**
223     * The Class DirectorySubscription.
224     */
225    private class DirectorySubscription {
226        private final WatchKey watchKey;
227        private final List<Subscription> watched;
228
229        /**
230         * Instantiates a new directory watcher.
231         *
232         * @param watchKey the watch key
233         */
234        public DirectorySubscription(WatchKey watchKey) {
235            this.watchKey = watchKey;
236            watched = Collections.synchronizedList(new ArrayList<>());
237        }
238
239        /**
240         * Adds the subscription.
241         *
242         * @param subs the subs
243         */
244        public void add(Subscription subs) {
245            watched.add(subs);
246        }
247
248        /**
249         * Removes the subscription.
250         *
251         * @param subs the subs
252         */
253        public void remove(Subscription subs) {
254            watched.remove(subs);
255            if (watched.isEmpty()) {
256                subscriptions.remove(subs.directory());
257                watchKey.cancel();
258            }
259
260        }
261
262        /**
263         * Directory changed.
264         */
265        public void directoryChanged() {
266            // Prevent concurrent modification exception
267            List.copyOf(watched).forEach(Subscription::handleChange);
268        }
269    }
270
271    /**
272     * The Class Registree.
273     */
274    private class Subscription {
275        private WeakReference<Channel> notifyOn;
276        private final Path path;
277        private Subscription linkedFrom;
278        private Subscription linksTo;
279        private Instant lastModified;
280
281        /**
282         * Instantiates a new subscription.
283         *
284         * @param path the path
285         * @param notifyOn the notify on
286         */
287        @SuppressWarnings("PMD.UseVarargs")
288        public Subscription(Path path, Channel notifyOn) {
289            this.notifyOn = new WeakReference<>(notifyOn);
290            this.path = path;
291            updateLastModified();
292        }
293
294        /**
295         * Return the directoy of this subscription's path.
296         *
297         * @return the path
298         */
299        public Path directory() {
300            return path.getParent();
301        }
302
303        /**
304         * Linked from.
305         *
306         * @param symLinkSubs the sym link subs
307         * @return the subscription
308         */
309        public Subscription linkedFrom(Subscription symLinkSubs) {
310            linkedFrom = symLinkSubs;
311            symLinkSubs.linksTo = this;
312            notifyOn = null;
313            return this;
314        }
315
316        /**
317         * Removes the subscription.
318         */
319        public void remove() {
320            synchronized (subscriptions) {
321                if (linksTo != null) {
322                    linksTo.remove();
323                }
324                var directory = path.getParent();
325                var watchInfo = subscriptions.get(directory);
326                if (watchInfo == null) {
327                    // Shouldn't happen, but...
328                    return;
329                }
330                watchInfo.remove(this);
331            }
332        }
333
334        private void updateLastModified() {
335            try {
336                if (!Files.exists(path)) {
337                    lastModified = null;
338                    return;
339                }
340                lastModified = Files.getLastModifiedTime(path).toInstant();
341            } catch (NoSuchFileException e) {
342                // There's a race condition here.
343                lastModified = null;
344            } catch (IOException e) {
345                logger.log(Level.WARNING, e,
346                    () -> "Cannot get modified time: " + e.getMessage());
347            }
348        }
349
350        /**
351         * Handle change.
352         */
353        private void handleChange() {
354            Subscription watched = Optional.ofNullable(linkedFrom).orElse(this);
355
356            // Check if channel is still valid
357            Channel channel = watched.notifyOn.get();
358            if (channel == null) {
359                watched.remove();
360                return;
361            }
362
363            // Evaluate change from the perspective of "watched"
364            Instant prevModified = watched.lastModified;
365            watched.updateLastModified();
366            if (prevModified == null) {
367                // Check if created
368                if (watched.lastModified != null) {
369                    // Yes, created.
370                    fire(new FileChanged(watched.path,
371                        FileChanged.Kind.CREATED), channel);
372                    checkLink(watched, channel);
373                }
374                return;
375            }
376
377            // File has existed (prevModified != null)
378            if (watched.lastModified == null) {
379                // ... but is now deleted
380                if (watched.linksTo != null) {
381                    watched.linksTo.remove();
382                }
383                fire(new FileChanged(watched.path, FileChanged.Kind.DELETED),
384                    channel);
385                return;
386            }
387
388            // Check if modified
389            if (!prevModified.equals(watched.lastModified)) {
390                fire(new FileChanged(watched.path, FileChanged.Kind.MODIFIED),
391                    channel);
392                checkLink(watched, channel);
393            }
394        }
395
396        private void checkLink(Subscription watched, Channel channel) {
397            try {
398                Path curTarget = watched.path.toRealPath();
399                if (!curTarget.equals(watched.path)) {
400                    // watched is symbolic link
401                    if (watched.linksTo == null) {
402                        addSubscription(curTarget, channel).linkedFrom(watched);
403                        return;
404                    }
405                    if (!watched.linksTo.path.equals(curTarget)) {
406                        // Link target has changed
407                        watched.linksTo.remove();
408                        addSubscription(curTarget, channel).linkedFrom(watched);
409                    }
410
411                }
412            } catch (IOException e) { // NOPMD
413                // Race condition, target deleted?
414            }
415        }
416    }
417}