001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.util; 020 021import java.io.IOException; 022import java.lang.ref.WeakReference; 023import java.nio.file.FileSystem; 024import java.nio.file.Files; 025import java.nio.file.NoSuchFileException; 026import java.nio.file.Path; 027import static java.nio.file.StandardWatchEventKinds.*; 028import java.nio.file.WatchKey; 029import java.nio.file.WatchService; 030import java.time.Instant; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.List; 034import java.util.Map; 035import java.util.Optional; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039import org.jgrapes.core.Channel; 040import org.jgrapes.core.Component; 041import org.jgrapes.core.Event; 042import org.jgrapes.core.Manager; 043import org.jgrapes.core.annotation.Handler; 044import org.jgrapes.util.events.FileChanged; 045import org.jgrapes.util.events.WatchFile; 046 047/** 048 * A component that watches paths in the file system for changes 049 * and sends events if such changes occur. 050 */ 051@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 052public class FileSystemWatcher extends Component { 053 054 @SuppressWarnings("PMD.FieldNamingConventions") 055 protected static final Logger logger 056 = Logger.getLogger(FileSystemWatcher.class.getName()); 057 058 private final WatcherRegistry watcherRegistry = new WatcherRegistry(); 059 private final Map<Path, DirectorySubscription> subscriptions 060 = new ConcurrentHashMap<>(); 061 062 /** 063 * Creates a new component base with its channel set to 064 * itself. 065 */ 066 public FileSystemWatcher() { 067 super(); 068 } 069 070 /** 071 * Creates a new component base with its channel set to the given 072 * channel. As a special case {@link Channel#SELF} can be 073 * passed to the constructor to make the component use itself 074 * as channel. The special value is necessary as you 075 * obviously cannot pass an object to be constructed to its 076 * constructor. 077 * 078 * @param componentChannel the channel that the component's 079 * handlers listen on by default and that 080 * {@link Manager#fire(Event, Channel...)} sends the event to 081 */ 082 public FileSystemWatcher(Channel componentChannel) { 083 super(componentChannel); 084 } 085 086 /** 087 * Register a path to wath. Subsequent {@link FileChanged} 088 * events will be fire on the channel(s) on which the 089 * {@link WatchFile} event was fired. 090 * 091 * The channel is stored using a weak reference, so no explicit 092 * "clear watch" is required. 093 * 094 * @param event the event 095 * @param channel the channel 096 * @throws IOException if an I/O exception occurs 097 */ 098 @Handler 099 public void onWatchFile(WatchFile event, Channel channel) 100 throws IOException { 101 final Path path = event.path().toAbsolutePath(); 102 synchronized (subscriptions) { 103 addSubscription(path, channel); 104 } 105 } 106 107 private Subscription addSubscription(Path watched, Channel channel) { 108 var subs = new Subscription(watched, channel); 109 try { 110 // Using computeIfAbsent causes recursive update 111 var watcher = subscriptions.get(watched.getParent()); 112 if (watcher == null) { 113 watcher = watcherRegistry.register(watched.getParent()); 114 } 115 watcher.add(subs); 116 if (Files.exists(watched)) { 117 Path real = watched.toRealPath(); 118 if (!real.equals(watched)) { 119 addSubscription(real, channel).linkedFrom(subs); 120 } 121 } 122 } catch (IOException e) { 123 logger.log(Level.WARNING, e, 124 () -> "Cannot watch: " + e.getMessage()); 125 } 126 return subs; 127 } 128 129 private void handleWatchEvent(Path directory) { 130 Optional.ofNullable(subscriptions.get(directory)) 131 .ifPresent(DirectorySubscription::directoryChanged); 132 } 133 134 /** 135 * The Class WatcherRegistry. 136 */ 137 private final class WatcherRegistry { 138 private final Map<FileSystem, Watcher> watchers 139 = new ConcurrentHashMap<>(); 140 141 private Watcher watcher(Path path) { 142 @SuppressWarnings("PMD.CloseResource") 143 Watcher watcher = watchers.get(path.getFileSystem()); 144 if (watcher == null) { 145 try { 146 watcher = new Watcher(path.getFileSystem()); 147 watchers.put(path.getFileSystem(), watcher); 148 } catch (IOException e) { 149 logger.log(Level.WARNING, e, 150 () -> "Cannot get watch service: " + e.getMessage()); 151 return null; 152 } 153 } 154 return watcher; 155 } 156 157 /** 158 * Register. 159 * 160 * @param toWatch the to watch 161 * @return the directory subscription 162 */ 163 public DirectorySubscription register(Path toWatch) { 164 Watcher watcher = watcher(toWatch); 165 if (watcher == null) { 166 return null; 167 } 168 try { 169 var watcherRef = new DirectorySubscription( 170 toWatch.register(watcher.watchService, 171 ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)); 172 subscriptions.put(toWatch, watcherRef); 173 return watcherRef; 174 } catch (IOException e) { 175 logger.log(Level.WARNING, e, 176 () -> "Cannot watch: " + e.getMessage()); 177 } 178 return null; 179 } 180 181 } 182 183 /** 184 * The Class Watcher. 185 */ 186 private final class Watcher { 187 private final WatchService watchService; 188 private final Thread thread; 189 190 private Watcher(FileSystem fileSystem) throws IOException { 191 watchService = fileSystem.newWatchService(); 192 thread = new Thread(() -> { 193 while (true) { 194 try { 195 WatchKey key = watchService.take(); 196 // Events have to be consumed 197 key.pollEvents(); 198 if (!(key.watchable() instanceof Path)) { 199 key.reset(); 200 continue; 201 } 202 handleWatchEvent((Path) key.watchable()); 203 key.reset(); 204 } catch (InterruptedException e) { 205 logger.log(Level.WARNING, e, 206 () -> "No WatchKey: " + e.getMessage()); 207 } 208 } 209 }); 210 thread.setDaemon(true); 211 thread.setName(fileSystem.toString() + " watcher"); 212 thread.start(); 213 } 214 215 } 216 217 /** 218 * The Class DirectorySubscription. 219 */ 220 private class DirectorySubscription { 221 private final WatchKey watchKey; 222 private final List<Subscription> watched; 223 224 /** 225 * Instantiates a new directory watcher. 226 * 227 * @param watchKey the watch key 228 */ 229 public DirectorySubscription(WatchKey watchKey) { 230 this.watchKey = watchKey; 231 watched = Collections.synchronizedList(new ArrayList<>()); 232 } 233 234 /** 235 * Adds the subscription. 236 * 237 * @param subs the subs 238 */ 239 public void add(Subscription subs) { 240 watched.add(subs); 241 } 242 243 /** 244 * Removes the subscription. 245 * 246 * @param subs the subs 247 */ 248 public void remove(Subscription subs) { 249 watched.remove(subs); 250 if (watched.isEmpty()) { 251 subscriptions.remove(subs.directory()); 252 watchKey.cancel(); 253 } 254 255 } 256 257 /** 258 * Directory changed. 259 */ 260 public void directoryChanged() { 261 // Prevent concurrent modification exception 262 List.copyOf(watched).forEach(Subscription::handleChange); 263 } 264 } 265 266 /** 267 * The Class Registree. 268 */ 269 private class Subscription { 270 private WeakReference<Channel> notifyOn; 271 private final Path path; 272 private Subscription linkedFrom; 273 private Subscription linksTo; 274 private Instant lastModified; 275 276 /** 277 * Instantiates a new subscription. 278 * 279 * @param path the path 280 * @param notifyOn the notify on 281 */ 282 @SuppressWarnings("PMD.UseVarargs") 283 public Subscription(Path path, Channel notifyOn) { 284 this.notifyOn = new WeakReference<>(notifyOn); 285 this.path = path; 286 updateLastModified(); 287 } 288 289 /** 290 * Return the directoy of this subscription's path. 291 * 292 * @return the path 293 */ 294 public Path directory() { 295 return path.getParent(); 296 } 297 298 /** 299 * Linked from. 300 * 301 * @param symLinkSubs the sym link subs 302 * @return the subscription 303 */ 304 public Subscription linkedFrom(Subscription symLinkSubs) { 305 linkedFrom = symLinkSubs; 306 symLinkSubs.linksTo = this; 307 notifyOn = null; 308 return this; 309 } 310 311 /** 312 * Removes the subscription. 313 */ 314 public void remove() { 315 synchronized (subscriptions) { 316 if (linksTo != null) { 317 linksTo.remove(); 318 } 319 var directory = path.getParent(); 320 var watchInfo = subscriptions.get(directory); 321 if (watchInfo == null) { 322 // Shouldn't happen, but... 323 return; 324 } 325 watchInfo.remove(this); 326 } 327 } 328 329 private void updateLastModified() { 330 try { 331 if (!Files.exists(path)) { 332 lastModified = null; 333 return; 334 } 335 lastModified = Files.getLastModifiedTime(path).toInstant(); 336 } catch (NoSuchFileException e) { 337 // There's a race condition here. 338 lastModified = null; 339 } catch (IOException e) { 340 logger.log(Level.WARNING, e, 341 () -> "Cannot get modified time: " + e.getMessage()); 342 } 343 } 344 345 /** 346 * Handle change. 347 */ 348 private void handleChange() { 349 Subscription watched = Optional.ofNullable(linkedFrom).orElse(this); 350 351 // Check if channel is still valid 352 Channel channel = watched.notifyOn.get(); 353 if (channel == null) { 354 watched.remove(); 355 return; 356 } 357 358 // Evaluate change from the perspective of "watched" 359 Instant prevModified = watched.lastModified; 360 watched.updateLastModified(); 361 if (prevModified == null) { 362 // Check if created 363 if (watched.lastModified != null) { 364 // Yes, created. 365 fire(new FileChanged(watched.path, 366 FileChanged.Kind.CREATED), channel); 367 checkLink(watched, channel); 368 } 369 return; 370 } 371 372 // File has existed (prevModified != null) 373 if (watched.lastModified == null) { 374 // ... but is now deleted 375 if (watched.linksTo != null) { 376 watched.linksTo.remove(); 377 } 378 fire(new FileChanged(watched.path, FileChanged.Kind.DELETED), 379 channel); 380 return; 381 } 382 383 // Check if modified 384 if (!prevModified.equals(watched.lastModified)) { 385 fire(new FileChanged(watched.path, FileChanged.Kind.MODIFIED), 386 channel); 387 checkLink(watched, channel); 388 } 389 } 390 391 private void checkLink(Subscription watched, Channel channel) { 392 try { 393 Path curTarget = watched.path.toRealPath(); 394 if (!curTarget.equals(watched.path)) { 395 // watched is symbolic link 396 if (watched.linksTo == null) { 397 addSubscription(curTarget, channel).linkedFrom(watched); 398 return; 399 } 400 if (!watched.linksTo.path.equals(curTarget)) { 401 // Link target has changed 402 watched.linksTo.remove(); 403 addSubscription(curTarget, channel).linkedFrom(watched); 404 } 405 406 } 407 } catch (IOException e) { // NOPMD 408 // Race condition, target deleted? 409 } 410 } 411 } 412}