001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2022 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.mail; 020 021import jakarta.mail.AuthenticationFailedException; 022import jakarta.mail.Authenticator; 023import jakarta.mail.Folder; 024import jakarta.mail.FolderClosedException; 025import jakarta.mail.Message; 026import jakarta.mail.MessagingException; 027import jakarta.mail.NoSuchProviderException; 028import jakarta.mail.PasswordAuthentication; 029import jakarta.mail.Session; 030import jakarta.mail.Store; 031import jakarta.mail.event.ConnectionEvent; 032import jakarta.mail.event.ConnectionListener; 033import jakarta.mail.event.MessageCountAdapter; 034import jakarta.mail.event.MessageCountEvent; 035import java.io.IOException; 036import java.time.Duration; 037import java.util.ArrayList; 038import java.util.Arrays; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.List; 042import java.util.Map; 043import java.util.Optional; 044import java.util.Properties; 045import java.util.Set; 046import java.util.function.Consumer; 047import java.util.logging.Level; 048import java.util.logging.Logger; 049import org.eclipse.angus.mail.imap.IMAPFolder; 050import org.eclipse.angus.mail.imap.IdleManager; 051import org.jgrapes.core.Channel; 052import org.jgrapes.core.Components; 053import org.jgrapes.core.Components.Timer; 054import org.jgrapes.core.Event; 055import org.jgrapes.core.EventPipeline; 056import org.jgrapes.core.Subchannel; 057import org.jgrapes.core.annotation.Handler; 058import org.jgrapes.io.events.Closed; 059import org.jgrapes.io.events.ConnectError; 060import org.jgrapes.io.events.IOError; 061import org.jgrapes.io.events.Opening; 062import org.jgrapes.mail.events.MailFoldersUpdated; 063import org.jgrapes.mail.events.MailMonitorOpened; 064import org.jgrapes.mail.events.OpenMailMonitor; 065import org.jgrapes.mail.events.UpdateMailFolders; 066import org.jgrapes.util.Password; 067 068/** 069 * A component that opens mail stores and monitors mail folders for 070 * mails. After establishing a connection to a store and selected 071 * folders (see {@link #onOpenMailMonitor(OpenMailMonitor, Channel)}), 072 * the existing and all subsequently arriving mails will be sent 073 * downstream using {@link MailFoldersUpdated} events. 074 * 075 * This implementation uses the {@link IdleManager}. The 076 * {@link IdleManager} works only if its {@link IdleManager#watch} 077 * method is invoked (for a folder) after any operation on that folder. 078 * Note that operations such as e.g. setting the deleted flag of 079 * a message is also an operation on a folder. 080 * 081 * Folders are updated in response to an {@link UpdateMailFolders} event 082 * or when the store signals the arrival of new messages. Information 083 * about the folders is delivered by a {@link MailFoldersUpdated} event. 084 * Folders may be freely used while handling the event, because the 085 * folders will be re-registered with the {@link IdleManager} 086 * when the {@link MailFoldersUpdated} event completes. 087 * Any usage of folders independent of handling the events mentioned 088 * will result in a loss of the monitor function. 089 * 090 * If required, the monitor function may be reestablished any time 091 * by firing a {@link UpdateMailFolders} event for the folders used. 092 */ 093@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 094 "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 095public class MailStoreMonitor extends MailConnectionManager< 096 MailStoreMonitor.MonitorChannel, OpenMailMonitor> { 097 098 @SuppressWarnings("PMD.FieldNamingConventions") 099 private static final Logger logger 100 = Logger.getLogger(MailStoreMonitor.class.getName()); 101 102 private Duration maxIdleTime = Duration.ofMinutes(25); 103 private static IdleManager idleManager; 104 private final EventPipeline retrievals = newEventPipeline(); 105 106 /** 107 * Creates a new server using the given channel. 108 * 109 * @param componentChannel the component's channel 110 */ 111 public MailStoreMonitor(Channel componentChannel) { 112 super(componentChannel); 113 } 114 115 @Override 116 protected boolean connectionsGenerate() { 117 return true; 118 } 119 120 /** 121 * Sets the maximum idle time. A running {@link IMAPFolder#idle()} 122 * is terminated and renewed after this time. Defaults to 25 minutes. 123 * 124 * @param maxIdleTime the new max idle time 125 */ 126 public MailStoreMonitor setMaxIdleTime(Duration maxIdleTime) { 127 this.maxIdleTime = maxIdleTime; 128 return this; 129 } 130 131 /** 132 * Returns the max idle time. 133 * 134 * @return the duration 135 */ 136 public Duration maxIdleTime() { 137 return maxIdleTime; 138 } 139 140 /** 141 * Configure the component. Currently, only max idle time 142 * is supported. 143 * 144 * @param values the values 145 */ 146 @Override 147 protected void configureComponent(Map<String, String> values) { 148 Optional.ofNullable(values.get("maxIdleTime")) 149 .map(Integer::parseInt).map(Duration::ofSeconds) 150 .ifPresent(d -> setMaxIdleTime(d)); 151 } 152 153 /** 154 * Open a store as specified by the event and monitor the folders 155 * (also specified by the event). Information about all existing 156 * and all subsequently arriving mails will be signaled downstream 157 * using {@link MailFoldersUpdated} events. 158 * 159 * @param event the event 160 * @param channel the channel 161 */ 162 @Handler 163 public void onOpenMailMonitor(OpenMailMonitor event, Channel channel) { 164 Properties sessionProps = new Properties(mailProps); 165 sessionProps.putAll(event.mailProperties()); 166 sessionProps.put("mail.imap.usesocketchannels", true); 167 Session session = Session.getInstance(sessionProps, 168 // Workaround for class loading problem in OSGi with j.m. 2.1. 169 // Authenticator's classpath allows accessing provider's service. 170 // See https://github.com/eclipse-ee4j/mail/issues/631 171 new Authenticator() { 172 @Override 173 protected PasswordAuthentication 174 getPasswordAuthentication() { 175 return new PasswordAuthentication( 176 sessionProps.getProperty("mail.user"), 177 new String(event.password().or(() -> password()) 178 .map(Password::password).orElse(new char[0]))); 179 } 180 }); 181 182 try { 183 synchronized (MailStoreMonitor.class) { 184 // Cannot be created earlier, need session. 185 if (idleManager == null) { 186 idleManager = new IdleManager(session, 187 Components.defaultExecutorService()); 188 } 189 } 190 new MonitorChannel(event, channel, session.getStore(), 191 sessionProps.getProperty("mail.user"), 192 event.password().or(this::password).orElse(null)); 193 } catch (NoSuchProviderException e) { 194 fire(new ConnectError(event, "Cannot create store.", e)); 195 } catch (IOException e) { 196 fire(new IOError(event, "Cannot create resource.", e)); 197 } 198 } 199 200 /** 201 * Retrieves the folders specified in the event. 202 * 203 * @param event the event 204 * @param channel the channel 205 */ 206 @Handler 207 public void onUpdateFolders(UpdateMailFolders event, MailChannel channel) { 208 if (!connections.contains(channel)) { 209 return; 210 } 211 // This can take very long. 212 retrievals 213 .submit(() -> ((MonitorChannel) channel).onUpdateFolders(event)); 214 } 215 216 /** 217 * The Enum ChannelState. 218 */ 219 @SuppressWarnings("PMD.FieldNamingConventions") 220 private enum ChannelState { 221 Opening { 222 @Override 223 public boolean isOpening() { 224 return true; 225 } 226 }, 227 Open { 228 @Override 229 public boolean isOpen() { 230 return true; 231 } 232 }, 233 Reopening { 234 @Override 235 public boolean isOpening() { 236 return true; 237 } 238 }, 239 Reopened { 240 @Override 241 public boolean isOpen() { 242 return true; 243 } 244 }, 245 Closing, 246 Closed; 247 248 /** 249 * Checks if is open. 250 * 251 * @return true, if is open 252 */ 253 public boolean isOpen() { 254 return false; 255 } 256 257 /** 258 * Checks if is opening. 259 * 260 * @return true, if is opening 261 */ 262 public boolean isOpening() { 263 return false; 264 } 265 } 266 267 /** 268 * The specific implementation of the {@link MailChannel}. 269 */ 270 protected class MonitorChannel extends 271 MailConnectionManager<MailStoreMonitor.MonitorChannel, 272 OpenMailMonitor>.AbstractMailChannel 273 implements ConnectionListener { 274 275 private final EventPipeline requestPipeline; 276 private ChannelState state = ChannelState.Opening; 277 private final Store store; 278 private final String user; 279 private final Password password; 280 private final String[] subscribed; 281 @SuppressWarnings("PMD.UseConcurrentHashMap") 282 private final Map<String, Folder> folderCache = new HashMap<>(); 283 private final Timer idleTimer; 284 285 /** 286 * Instantiates a new monitor channel. 287 * 288 * @param event the event that triggered the creation 289 * @param mainChannel the main channel (of this {@link Subchannel}) 290 * @param store the store 291 * @param user the user 292 * @param password the password 293 */ 294 public MonitorChannel(OpenMailMonitor event, Channel mainChannel, 295 Store store, String user, Password password) { 296 super(event, mainChannel); 297 this.store = store; 298 this.user = user; 299 this.password = password; 300 this.subscribed = event.folderNames(); 301 requestPipeline = event.processedBy().get(); 302 store.addConnectionListener(this); 303 idleTimer = Components.schedule(t -> { 304 requestPipeline.fire(new UpdateMailFolders(), this); 305 }, maxIdleTime); 306 connect( 307 t -> downPipeline().fire(new ConnectError(event, t), 308 mainChannel)); 309 } 310 311 /** 312 * Attempt connections until connected. Attempts are stopped 313 * if it is the first time that the connection is to be 314 * established and the error indicates that the connection 315 * will never succeed (e.g. due to an authentication 316 * problem). 317 * 318 * @param onOpenFailed the on open failed 319 */ 320 private void connect(Consumer<Throwable> onOpenFailed) { 321 synchronized (this) { 322 if (state.isOpen()) { 323 return; 324 } 325 activeEventPipeline().executorService().submit(() -> { 326 while (state.isOpening()) { 327 try { 328 attemptConnect(onOpenFailed); 329 } catch (InterruptedException e) { 330 break; 331 } 332 } 333 }); 334 } 335 } 336 337 /** 338 * Single connection attempt. 339 * 340 * @param onOpenFailed the on open failed 341 * @throws InterruptedException the interrupted exception 342 */ 343 @SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause") 344 private void attemptConnect(Consumer<Throwable> onOpenFailed) 345 throws InterruptedException { 346 try { 347 store.connect(user, new String(password.password())); 348 synchronized (this) { 349 if (state == ChannelState.Opening) { 350 state = ChannelState.Open; 351 } else { 352 state = ChannelState.Reopened; 353 return; 354 } 355 } 356 } catch (MessagingException e) { 357 synchronized (this) { 358 if (state == ChannelState.Opening 359 && (e instanceof AuthenticationFailedException 360 || e instanceof NoSuchProviderException)) { 361 logger.log(Level.WARNING, 362 "Connecting to store failed, closing.", e); 363 state = ChannelState.Closed; 364 super.close(); 365 if (onOpenFailed != null) { 366 onOpenFailed.accept(e); 367 } 368 return; 369 } 370 } 371 logger.log(Level.WARNING, 372 "(Re)connecting to store failed, retrying.", e); 373 Thread.sleep(5000); 374 } 375 } 376 377 /** 378 * Close the connection to the store. 379 */ 380 @Override 381 public void close() { 382 synchronized (this) { 383 if (state == ChannelState.Closing 384 || state == ChannelState.Closed) { 385 return; 386 } 387 state = ChannelState.Closing; 388 } 389 390 idleTimer.cancel(); 391 try { 392 // Initiate close, callback will inform downstream components. 393 store.close(); 394 } catch (MessagingException e) { 395 // According to the documentation, the listeners should 396 // be invoked nevertheless. 397 logger.log(Level.WARNING, "Cannot close connection properly.", 398 e); 399 } 400 } 401 402 /** 403 * Callback from store.connect is the connection is successful. 404 * 405 * @param event the event 406 */ 407 @Override 408 @SuppressWarnings({ "PMD.GuardLogStatement", 409 "PMD.AvoidDuplicateLiterals" }) 410 public void opened(ConnectionEvent event) { 411 folderCache.clear(); 412 if (state == ChannelState.Reopened) { 413 // This is a re-open, only retrieve messages. 414 requestPipeline.fire(new UpdateMailFolders(), this); 415 return; 416 } 417 // (1) Opening, (2) Opened, (3) start retrieving mails 418 downPipeline().fire(Event.onCompletion(new Opening<Void>(), 419 o -> downPipeline().fire( 420 Event.onCompletion( 421 new MailMonitorOpened(openEvent(), store), 422 p -> requestPipeline 423 .fire(new UpdateMailFolders(), this)), 424 this)), 425 this); 426 } 427 428 /** 429 * According to the documentation, 430 * {@link ConnectionEvent#DISCONNECTED} is currently not 431 * used. It's implemented nevertheless and called explicitly. 432 * 433 * @param event the event or `null` if called explicitly 434 */ 435 @Override 436 public void disconnected(ConnectionEvent event) { 437 synchronized (this) { 438 folderCache.clear(); 439 if (state.isOpen()) { 440 state = ChannelState.Reopening; 441 connect(null); 442 } 443 } 444 } 445 446 /** 447 * Callback that indicates the connection close, 448 * can be called any time by jakarta mail. 449 * 450 * Whether closing is intended (callback after a call to 451 * {@link #close}) can be checked by looking at the state. 452 * 453 * @param event the event 454 */ 455 @Override 456 public void closed(ConnectionEvent event) { 457 // Ignore if already closed. 458 if (state == ChannelState.Closed) { 459 return; 460 } 461 462 // Handle involuntary close by reopening. 463 if (state != ChannelState.Closing) { 464 disconnected(event); 465 return; 466 } 467 468 // Cleanup and remove channel. 469 synchronized (this) { 470 state = ChannelState.Closed; 471 folderCache.clear(); 472 } 473 downPipeline().fire(new Closed<Void>(), this); 474 super.close(); 475 } 476 477 /** 478 * Retrieve the new messages from the folders specified in the 479 * event. 480 * 481 * @param event 482 */ 483 @SuppressWarnings({ "PMD.CognitiveComplexity", 484 "PMD.AvoidInstantiatingObjectsInLoops", 485 "PMD.AvoidDuplicateLiterals" }) 486 public void onUpdateFolders(UpdateMailFolders event) { 487 List<Folder> folders = new ArrayList<>(); 488 List<Message> newMsgs = new ArrayList<>(); 489 if (store.isConnected()) { 490 Set<String> folderNames 491 = new HashSet<>(Arrays.asList(subscribed)); 492 if (event.folderNames().length > 0) { 493 folderNames.retainAll(Arrays.asList(event.folderNames())); 494 } 495 try { 496 for (var folderName : folderNames) { 497 @SuppressWarnings("PMD.CloseResource") 498 Folder folder = getFolder(folderName); 499 if (folder == null) { 500 continue; 501 } 502 folders.add(folder); 503 } 504 } catch (FolderClosedException e) { 505 disconnected(null); 506 } 507 } else { 508 disconnected(null); 509 } 510 event.setResult(folders); 511 Event.onCompletion(event, e -> downPipeline().fire(Event 512 .onCompletion(new MailFoldersUpdated(folders, newMsgs), 513 evt -> refreshWatches(evt)), 514 this)); 515 } 516 517 @SuppressWarnings({ "PMD.GuardLogStatement", 518 "PMD.AvoidRethrowingException", "PMD.CloseResource" }) 519 private Folder getFolder(String folderName) 520 throws FolderClosedException { 521 synchronized (folderCache) { 522 Folder folder = folderCache.get(folderName); 523 if (folder != null) { 524 return folder; 525 } 526 try { 527 folder = store.getFolder(folderName); 528 if (folder == null || !folder.exists()) { 529 logger.fine(() -> "No folder \"" + folderName 530 + "\" in store " + store); 531 return null; 532 } 533 folder.open(Folder.READ_WRITE); 534 folderCache.put(folderName, folder); 535 // Add MessageCountListener to listen for new messages. 536 folder.addMessageCountListener(new MessageCountAdapter() { 537 @Override 538 public void 539 messagesAdded(MessageCountEvent countEvent) { 540 retrievals.submit("UpdateFolder", 541 () -> updateFolders(countEvent)); 542 } 543 544 @Override 545 public void 546 messagesRemoved(MessageCountEvent countEvent) { 547 retrievals.submit("UpdateFolder", 548 () -> updateFolders(countEvent)); 549 } 550 }); 551 return folder; 552 } catch (FolderClosedException e) { 553 throw e; 554 } catch (MessagingException e) { 555 logger.log(Level.FINE, 556 "Cannot open folder: " + e.getMessage(), e); 557 } 558 return null; 559 } 560 } 561 562 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 563 "PMD.GuardLogStatement" }) 564 private void updateFolders(MessageCountEvent event) { 565 List<Message> newMsgs = new ArrayList<>(); 566 if (event.getType() == MessageCountEvent.ADDED) { 567 newMsgs.addAll(Arrays.asList(event.getMessages())); 568 } else if (event.getType() != MessageCountEvent.REMOVED) { 569 return; 570 } 571 downPipeline().fire( 572 Event.onCompletion( 573 new MailFoldersUpdated( 574 new ArrayList<>(folderCache.values()), 575 newMsgs), 576 evt -> refreshWatches(evt)), 577 this); 578 } 579 580 /** 581 * Registers the folders from which messages have been received 582 * with the {@link IdleManager}. 583 * 584 * @param event the event 585 */ 586 @SuppressWarnings("PMD.CloseResource") 587 private void refreshWatches(MailFoldersUpdated event) { 588 if (!state.isOpen()) { 589 return; 590 } 591 for (Folder folder : event.folders()) { 592 try { 593 idleManager.watch(getFolder(folder.getFullName())); 594 } catch (MessagingException e) { 595 logger.log(Level.WARNING, "Cannot watch folder.", 596 e); 597 } 598 } 599 idleTimer.reschedule(maxIdleTime); 600 } 601 } 602 603 @Override 604 public String toString() { 605 return Components.objectName(this); 606 } 607 608}