001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.util;
020
021import java.io.IOException;
022import java.lang.ref.WeakReference;
023import java.nio.file.FileSystem;
024import java.nio.file.Files;
025import java.nio.file.NoSuchFileException;
026import java.nio.file.Path;
027import static java.nio.file.StandardWatchEventKinds.*;
028import java.nio.file.WatchKey;
029import java.nio.file.WatchService;
030import java.time.Instant;
031import java.util.ArrayList;
032import java.util.Collections;
033import java.util.List;
034import java.util.Map;
035import java.util.Optional;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.logging.Level;
038import java.util.logging.Logger;
039import org.jgrapes.core.Channel;
040import org.jgrapes.core.Component;
041import org.jgrapes.core.Event;
042import org.jgrapes.core.Manager;
043import org.jgrapes.core.annotation.Handler;
044import org.jgrapes.util.events.FileChanged;
045import org.jgrapes.util.events.WatchFile;
046
047/**
048 * A component that watches paths in the file system for changes
049 * and sends events if such changes occur. 
050 */
051@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
052public class FileSystemWatcher extends Component {
053
054    @SuppressWarnings("PMD.FieldNamingConventions")
055    protected static final Logger logger
056        = Logger.getLogger(FileSystemWatcher.class.getName());
057
058    private final WatcherRegistry watcherRegistry = new WatcherRegistry();
059    private final Map<Path, DirectorySubscription> subscriptions
060        = new ConcurrentHashMap<>();
061
062    /**
063     * Creates a new component base with its channel set to
064     * itself.
065     */
066    public FileSystemWatcher() {
067        super();
068    }
069
070    /**
071     * Creates a new component base with its channel set to the given 
072     * channel. As a special case {@link Channel#SELF} can be
073     * passed to the constructor to make the component use itself
074     * as channel. The special value is necessary as you 
075     * obviously cannot pass an object to be constructed to its 
076     * constructor.
077     *
078     * @param componentChannel the channel that the component's
079     * handlers listen on by default and that 
080     * {@link Manager#fire(Event, Channel...)} sends the event to
081     */
082    public FileSystemWatcher(Channel componentChannel) {
083        super(componentChannel);
084    }
085
086    /**
087     * Register a path to wath. Subsequent {@link FileChanged} 
088     * events will be fire on the channel(s) on which the
089     * {@link WatchFile} event was fired.
090     * 
091     * The channel is stored using a weak reference, so no explicit
092     * "clear watch" is required.
093     *
094     * @param event the event
095     * @param channel the channel
096     * @throws IOException if an I/O exception occurs
097     */
098    @Handler
099    public void onWatchFile(WatchFile event, Channel channel)
100            throws IOException {
101        final Path path = event.path().toAbsolutePath();
102        synchronized (subscriptions) {
103            addSubscription(path, channel);
104        }
105    }
106
107    private Subscription addSubscription(Path watched, Channel channel) {
108        var subs = new Subscription(watched, channel);
109        try {
110            // Using computeIfAbsent causes recursive update
111            var watcher = subscriptions.get(watched.getParent());
112            if (watcher == null) {
113                watcher = watcherRegistry.register(watched.getParent());
114            }
115            watcher.add(subs);
116            if (Files.exists(watched)) {
117                Path real = watched.toRealPath();
118                if (!real.equals(watched)) {
119                    addSubscription(real, channel).linkedFrom(subs);
120                }
121            }
122        } catch (IOException e) {
123            logger.log(Level.WARNING, e,
124                () -> "Cannot watch: " + e.getMessage());
125        }
126        return subs;
127    }
128
129    private void handleWatchEvent(Path directory) {
130        Optional.ofNullable(subscriptions.get(directory))
131            .ifPresent(DirectorySubscription::directoryChanged);
132    }
133
134    /**
135     * The Class WatcherRegistry.
136     */
137    private final class WatcherRegistry {
138        private final Map<FileSystem, Watcher> watchers
139            = new ConcurrentHashMap<>();
140
141        private Watcher watcher(Path path) {
142            @SuppressWarnings("PMD.CloseResource")
143            Watcher watcher = watchers.get(path.getFileSystem());
144            if (watcher == null) {
145                try {
146                    watcher = new Watcher(path.getFileSystem());
147                    watchers.put(path.getFileSystem(), watcher);
148                } catch (IOException e) {
149                    logger.log(Level.WARNING, e,
150                        () -> "Cannot get watch service: " + e.getMessage());
151                    return null;
152                }
153            }
154            return watcher;
155        }
156
157        /**
158         * Register.
159         *
160         * @param toWatch the to watch
161         * @return the directory subscription
162         */
163        public DirectorySubscription register(Path toWatch) {
164            Watcher watcher = watcher(toWatch);
165            if (watcher == null) {
166                return null;
167            }
168            try {
169                var watcherRef = new DirectorySubscription(
170                    toWatch.register(watcher.watchService,
171                        ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY));
172                subscriptions.put(toWatch, watcherRef);
173                return watcherRef;
174            } catch (IOException e) {
175                logger.log(Level.WARNING, e,
176                    () -> "Cannot watch: " + e.getMessage());
177            }
178            return null;
179        }
180
181    }
182
183    /**
184     * The Class Watcher.
185     */
186    private final class Watcher {
187        private final WatchService watchService;
188        private final Thread thread;
189
190        private Watcher(FileSystem fileSystem) throws IOException {
191            watchService = fileSystem.newWatchService();
192            thread = new Thread(() -> {
193                while (true) {
194                    try {
195                        WatchKey key = watchService.take();
196                        // Events have to be consumed
197                        key.pollEvents();
198                        if (!(key.watchable() instanceof Path)) {
199                            key.reset();
200                            continue;
201                        }
202                        handleWatchEvent((Path) key.watchable());
203                        key.reset();
204                    } catch (InterruptedException e) {
205                        logger.log(Level.WARNING, e,
206                            () -> "No WatchKey: " + e.getMessage());
207                    }
208                }
209            });
210            thread.setDaemon(true);
211            thread.setName(fileSystem.toString() + " watcher");
212            thread.start();
213        }
214
215    }
216
217    /**
218     * The Class DirectorySubscription.
219     */
220    private class DirectorySubscription {
221        private final WatchKey watchKey;
222        private final List<Subscription> watched;
223
224        /**
225         * Instantiates a new directory watcher.
226         *
227         * @param watchKey the watch key
228         */
229        public DirectorySubscription(WatchKey watchKey) {
230            this.watchKey = watchKey;
231            watched = Collections.synchronizedList(new ArrayList<>());
232        }
233
234        /**
235         * Adds the subscription.
236         *
237         * @param subs the subs
238         */
239        public void add(Subscription subs) {
240            watched.add(subs);
241        }
242
243        /**
244         * Removes the subscription.
245         *
246         * @param subs the subs
247         */
248        public void remove(Subscription subs) {
249            watched.remove(subs);
250            if (watched.isEmpty()) {
251                subscriptions.remove(subs.directory());
252                watchKey.cancel();
253            }
254
255        }
256
257        /**
258         * Directory changed.
259         */
260        public void directoryChanged() {
261            // Prevent concurrent modification exception
262            List.copyOf(watched).forEach(Subscription::handleChange);
263        }
264    }
265
266    /**
267     * The Class Registree.
268     */
269    private class Subscription {
270        private WeakReference<Channel> notifyOn;
271        private final Path path;
272        private Subscription linkedFrom;
273        private Subscription linksTo;
274        private Instant lastModified;
275
276        /**
277         * Instantiates a new subscription.
278         *
279         * @param path the path
280         * @param notifyOn the notify on
281         */
282        @SuppressWarnings("PMD.UseVarargs")
283        public Subscription(Path path, Channel notifyOn) {
284            this.notifyOn = new WeakReference<>(notifyOn);
285            this.path = path;
286            updateLastModified();
287        }
288
289        /**
290         * Return the directoy of this subscription's path.
291         *
292         * @return the path
293         */
294        public Path directory() {
295            return path.getParent();
296        }
297
298        /**
299         * Linked from.
300         *
301         * @param symLinkSubs the sym link subs
302         * @return the subscription
303         */
304        public Subscription linkedFrom(Subscription symLinkSubs) {
305            linkedFrom = symLinkSubs;
306            symLinkSubs.linksTo = this;
307            notifyOn = null;
308            return this;
309        }
310
311        /**
312         * Removes the subscription.
313         */
314        public void remove() {
315            synchronized (subscriptions) {
316                if (linksTo != null) {
317                    linksTo.remove();
318                }
319                var directory = path.getParent();
320                var watchInfo = subscriptions.get(directory);
321                if (watchInfo == null) {
322                    // Shouldn't happen, but...
323                    return;
324                }
325                watchInfo.remove(this);
326            }
327        }
328
329        private void updateLastModified() {
330            try {
331                if (!Files.exists(path)) {
332                    lastModified = null;
333                    return;
334                }
335                lastModified = Files.getLastModifiedTime(path).toInstant();
336            } catch (NoSuchFileException e) {
337                // There's a race condition here.
338                lastModified = null;
339            } catch (IOException e) {
340                logger.log(Level.WARNING, e,
341                    () -> "Cannot get modified time: " + e.getMessage());
342            }
343        }
344
345        /**
346         * Handle change.
347         */
348        private void handleChange() {
349            Subscription watched = Optional.ofNullable(linkedFrom).orElse(this);
350
351            // Check if channel is still valid
352            Channel channel = watched.notifyOn.get();
353            if (channel == null) {
354                watched.remove();
355                return;
356            }
357
358            // Evaluate change from the perspective of "watched"
359            Instant prevModified = watched.lastModified;
360            watched.updateLastModified();
361            if (prevModified == null) {
362                // Check if created
363                if (watched.lastModified != null) {
364                    // Yes, created.
365                    fire(new FileChanged(watched.path,
366                        FileChanged.Kind.CREATED), channel);
367                    checkLink(watched, channel);
368                }
369                return;
370            }
371
372            // File has existed (prevModified != null)
373            if (watched.lastModified == null) {
374                // ... but is now deleted
375                if (watched.linksTo != null) {
376                    watched.linksTo.remove();
377                }
378                fire(new FileChanged(watched.path, FileChanged.Kind.DELETED),
379                    channel);
380                return;
381            }
382
383            // Check if modified
384            if (!prevModified.equals(watched.lastModified)) {
385                fire(new FileChanged(watched.path, FileChanged.Kind.MODIFIED),
386                    channel);
387                checkLink(watched, channel);
388            }
389        }
390
391        private void checkLink(Subscription watched, Channel channel) {
392            try {
393                Path curTarget = watched.path.toRealPath();
394                if (!curTarget.equals(watched.path)) {
395                    // watched is symbolic link
396                    if (watched.linksTo == null) {
397                        addSubscription(curTarget, channel).linkedFrom(watched);
398                        return;
399                    }
400                    if (!watched.linksTo.path.equals(curTarget)) {
401                        // Link target has changed
402                        watched.linksTo.remove();
403                        addSubscription(curTarget, channel).linkedFrom(watched);
404                    }
405
406                }
407            } catch (IOException e) { // NOPMD
408                // Race condition, target deleted?
409            }
410        }
411    }
412}