001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2022,2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.mail; 020 021import jakarta.mail.AuthenticationFailedException; 022import jakarta.mail.Authenticator; 023import jakarta.mail.Folder; 024import jakarta.mail.FolderClosedException; 025import jakarta.mail.Message; 026import jakarta.mail.MessagingException; 027import jakarta.mail.NoSuchProviderException; 028import jakarta.mail.PasswordAuthentication; 029import jakarta.mail.Session; 030import jakarta.mail.Store; 031import jakarta.mail.event.ConnectionEvent; 032import jakarta.mail.event.ConnectionListener; 033import jakarta.mail.event.MessageCountAdapter; 034import jakarta.mail.event.MessageCountEvent; 035import jakarta.mail.event.StoreEvent; 036import jakarta.mail.event.StoreListener; 037import java.io.IOException; 038import java.time.Duration; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.HashMap; 042import java.util.HashSet; 043import java.util.List; 044import java.util.Map; 045import java.util.Optional; 046import java.util.Properties; 047import java.util.Set; 048import java.util.function.Consumer; 049import java.util.logging.Level; 050import org.eclipse.angus.mail.imap.IMAPFolder; 051import org.eclipse.angus.mail.imap.IdleManager; 052import org.jgrapes.core.Channel; 053import org.jgrapes.core.Components; 054import org.jgrapes.core.Components.Timer; 055import org.jgrapes.core.Event; 056import org.jgrapes.core.EventPipeline; 057import org.jgrapes.core.Subchannel; 058import org.jgrapes.core.annotation.Handler; 059import org.jgrapes.io.events.Closed; 060import org.jgrapes.io.events.ConnectError; 061import org.jgrapes.io.events.IOError; 062import org.jgrapes.io.events.Opening; 063import org.jgrapes.mail.events.MailFoldersUpdated; 064import org.jgrapes.mail.events.MailMonitorOpened; 065import org.jgrapes.mail.events.OpenMailMonitor; 066import org.jgrapes.mail.events.UpdateMailFolders; 067import org.jgrapes.util.Password; 068 069/** 070 * A component that opens mail stores and monitors mail folders for 071 * mails. After establishing a connection to a store and selected 072 * folders (see {@link #onOpenMailMonitor(OpenMailMonitor, Channel)}), 073 * the existing and all subsequently arriving mails will be sent 074 * downstream using {@link MailFoldersUpdated} events. 075 * 076 * This implementation uses the {@link IdleManager}. The 077 * {@link IdleManager} works only if its {@link IdleManager#watch} 078 * method is invoked (for a folder) after any operation on that folder. 079 * Note that operations such as e.g. setting the deleted flag of 080 * a message is also an operation on a folder. 081 * 082 * Folders are updated in response to an {@link UpdateMailFolders} event 083 * or when the store signals the arrival of new messages. Information 084 * about the folders is delivered by a {@link MailFoldersUpdated} event. 085 * Folders may be freely used while handling the event, because the 086 * folders will be re-registered with the {@link IdleManager} 087 * when the {@link MailFoldersUpdated} event completes. 088 * Any usage of folders independent of handling the events mentioned 089 * will result in a loss of the monitor function. 090 * 091 * If required, the monitor function may be reestablished any time 092 * by firing a {@link UpdateMailFolders} event for the folders used. 093 */ 094@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 095 "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 096public class MailMonitor extends MailConnectionManager< 097 MailMonitor.MonitorChannel, OpenMailMonitor> { 098 099 private Duration maxIdleTime = Duration.ofMinutes(25); 100 private static IdleManager idleManager; 101 private final EventPipeline retrievals = newEventPipeline(); 102 103 /** 104 * Creates a new server using the given channel. 105 * 106 * @param componentChannel the component's channel 107 */ 108 public MailMonitor(Channel componentChannel) { 109 super(componentChannel); 110 } 111 112 @Override 113 protected boolean connectionsGenerate() { 114 return true; 115 } 116 117 /** 118 * Sets the maximum idle time. A running {@link IMAPFolder#idle()} 119 * is terminated and renewed after this time. Defaults to 25 minutes. 120 * 121 * @param maxIdleTime the new max idle time 122 */ 123 public MailMonitor setMaxIdleTime(Duration maxIdleTime) { 124 this.maxIdleTime = maxIdleTime; 125 return this; 126 } 127 128 /** 129 * Returns the max idle time. 130 * 131 * @return the duration 132 */ 133 public Duration maxIdleTime() { 134 return maxIdleTime; 135 } 136 137 /** 138 * Configure the component. Currently, only max idle time 139 * is supported. 140 * 141 * @param values the values 142 */ 143 @Override 144 protected void configureComponent(Map<String, String> values) { 145 Optional.ofNullable(values.get("maxIdleTime")) 146 .map(Integer::parseInt).map(Duration::ofSeconds) 147 .ifPresent(d -> setMaxIdleTime(d)); 148 } 149 150 /** 151 * Open a store as specified by the event and monitor the folders 152 * (also specified by the event). Information about all existing 153 * and all subsequently arriving mails will be signaled downstream 154 * using {@link MailFoldersUpdated} events. 155 * 156 * @param event the event 157 * @param channel the channel 158 */ 159 @Handler 160 public void onOpenMailMonitor(OpenMailMonitor event, Channel channel) { 161 Properties sessionProps = new Properties(mailProps); 162 sessionProps.putAll(event.mailProperties()); 163 sessionProps.put("mail.imap.usesocketchannels", true); 164 Session session = Session.getInstance(sessionProps, 165 // Workaround for class loading problem in OSGi with j.m. 2.1. 166 // Authenticator's classpath allows accessing provider's service. 167 // See https://github.com/eclipse-ee4j/mail/issues/631 168 new Authenticator() { 169 @Override 170 protected PasswordAuthentication 171 getPasswordAuthentication() { 172 return new PasswordAuthentication( 173 sessionProps.getProperty("mail.user"), 174 new String(event.password().or(() -> password()) 175 .map(Password::password).orElse(new char[0]))); 176 } 177 }); 178 179 try { 180 synchronized (MailMonitor.class) { 181 // Cannot be created earlier, need session. 182 if (idleManager == null) { 183 idleManager = new IdleManager(session, 184 Components.defaultExecutorService()); 185 } 186 } 187 new MonitorChannel(event, channel, session.getStore(), 188 sessionProps.getProperty("mail.user"), 189 event.password().or(this::password).orElse(null)); 190 } catch (NoSuchProviderException e) { 191 fire(new ConnectError(event, "Cannot create store.", e)); 192 } catch (IOException e) { 193 fire(new IOError(event, "Cannot create resource.", e)); 194 } 195 } 196 197 /** 198 * Retrieves the folders specified in the event. 199 * 200 * @param event the event 201 * @param channel the channel 202 */ 203 @Handler 204 public void onUpdateFolders(UpdateMailFolders event, MailChannel channel) { 205 if (!connections.contains(channel)) { 206 return; 207 } 208 // This can take very long. 209 retrievals 210 .submit(() -> ((MonitorChannel) channel).onUpdateFolders(event)); 211 } 212 213 /** 214 * The Enum ChannelState. 215 */ 216 @SuppressWarnings("PMD.FieldNamingConventions") 217 private enum ChannelState { 218 Opening { 219 @Override 220 public boolean isOpening() { 221 return true; 222 } 223 }, 224 Open { 225 @Override 226 public boolean isOpen() { 227 return true; 228 } 229 }, 230 Reopening { 231 @Override 232 public boolean isOpening() { 233 return true; 234 } 235 }, 236 Reopened { 237 @Override 238 public boolean isOpen() { 239 return true; 240 } 241 }, 242 Closing, 243 Closed; 244 245 /** 246 * Checks if is open. 247 * 248 * @return true, if is open 249 */ 250 public boolean isOpen() { 251 return false; 252 } 253 254 /** 255 * Checks if is opening. 256 * 257 * @return true, if is opening 258 */ 259 public boolean isOpening() { 260 return false; 261 } 262 } 263 264 /** 265 * The specific implementation of the {@link MailChannel}. 266 */ 267 protected class MonitorChannel extends 268 MailConnectionManager<MailMonitor.MonitorChannel, 269 OpenMailMonitor>.AbstractMailChannel 270 implements ConnectionListener, StoreListener { 271 272 private final EventPipeline requestPipeline; 273 private ChannelState state = ChannelState.Opening; 274 private final Store store; 275 private final String user; 276 private final Password password; 277 private final String[] subscribed; 278 @SuppressWarnings("PMD.UseConcurrentHashMap") 279 private final Map<String, Folder> folderCache = new HashMap<>(); 280 private final Timer idleTimer; 281 282 /** 283 * Instantiates a new monitor channel. 284 * 285 * @param event the event that triggered the creation 286 * @param mainChannel the main channel (of this {@link Subchannel}) 287 * @param store the store 288 * @param user the user 289 * @param password the password 290 */ 291 public MonitorChannel(OpenMailMonitor event, Channel mainChannel, 292 Store store, String user, Password password) { 293 super(event, mainChannel); 294 this.store = store; 295 this.user = user; 296 this.password = password; 297 this.subscribed = event.folderNames(); 298 requestPipeline = event.processedBy().get(); 299 store.addConnectionListener(this); 300 store.addStoreListener(this); 301 idleTimer = Components.schedule(t -> { 302 requestPipeline.fire(new UpdateMailFolders(), this); 303 }, maxIdleTime); 304 connect( 305 t -> downPipeline().fire(new ConnectError(event, t), 306 mainChannel)); 307 } 308 309 /** 310 * Attempt connections until connected. Attempts are stopped 311 * if it is the first time that the connection is to be 312 * established and the error indicates that the connection 313 * will never succeed (e.g. due to an authentication 314 * problem). 315 * 316 * @param onOpenFailed the on open failed 317 */ 318 private void connect(Consumer<Throwable> onOpenFailed) { 319 synchronized (this) { 320 if (state.isOpen()) { 321 return; 322 } 323 activeEventPipeline().executorService().submit(() -> { 324 while (state.isOpening()) { 325 try { 326 attemptConnect(onOpenFailed); 327 } catch (InterruptedException e) { 328 break; 329 } 330 } 331 }); 332 } 333 } 334 335 /** 336 * Single connection attempt. 337 * 338 * @param onOpenFailed the on open failed 339 * @throws InterruptedException the interrupted exception 340 */ 341 @SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause") 342 private void attemptConnect(Consumer<Throwable> onOpenFailed) 343 throws InterruptedException { 344 try { 345 store.connect(user, new String(password.password())); 346 synchronized (this) { 347 if (state == ChannelState.Opening) { 348 state = ChannelState.Open; 349 } else { 350 state = ChannelState.Reopened; 351 return; 352 } 353 } 354 } catch (MessagingException e) { 355 synchronized (this) { 356 if (state == ChannelState.Opening 357 && (e instanceof AuthenticationFailedException 358 || e instanceof NoSuchProviderException)) { 359 logger.log(Level.WARNING, 360 "Connecting to store failed, closing.", e); 361 state = ChannelState.Closed; 362 super.close(); 363 if (onOpenFailed != null) { 364 onOpenFailed.accept(e); 365 } 366 return; 367 } 368 } 369 logger.log(Level.WARNING, 370 "(Re)connecting to store failed, retrying.", e); 371 Thread.sleep(5000); 372 } 373 } 374 375 /** 376 * Close the connection to the store. 377 */ 378 @Override 379 public void close() { 380 synchronized (this) { 381 if (state == ChannelState.Closing 382 || state == ChannelState.Closed) { 383 return; 384 } 385 state = ChannelState.Closing; 386 } 387 388 idleTimer.cancel(); 389 try { 390 // Initiate close, callback will inform downstream components. 391 store.close(); 392 } catch (MessagingException e) { 393 // According to the documentation, the listeners should 394 // be invoked nevertheless. 395 logger.log(Level.WARNING, "Cannot close connection properly.", 396 e); 397 } 398 } 399 400 /** 401 * Callback from store.connect is the connection is successful. 402 * 403 * @param event the event 404 */ 405 @Override 406 @SuppressWarnings({ "PMD.GuardLogStatement", 407 "PMD.AvoidDuplicateLiterals" }) 408 public void opened(ConnectionEvent event) { 409 folderCache.clear(); 410 if (state == ChannelState.Reopened) { 411 // This is a re-open, only retrieve messages. 412 requestPipeline.fire(new UpdateMailFolders(), this); 413 return; 414 } 415 // (1) Opening, (2) Opened, (3) start retrieving mails 416 downPipeline().fire(Event.onCompletion(new Opening<Void>(), 417 o -> downPipeline().fire( 418 Event.onCompletion( 419 new MailMonitorOpened(openEvent(), store), 420 p -> requestPipeline 421 .fire(new UpdateMailFolders(), this)), 422 this)), 423 this); 424 } 425 426 /** 427 * According to the documentation, 428 * {@link ConnectionEvent#DISCONNECTED} is currently not 429 * used. It's implemented nevertheless and called explicitly. 430 * 431 * @param event the event or `null` if called explicitly 432 */ 433 @Override 434 public void disconnected(ConnectionEvent event) { 435 synchronized (this) { 436 folderCache.clear(); 437 if (state.isOpen()) { 438 state = ChannelState.Reopening; 439 connect(null); 440 } 441 } 442 } 443 444 /** 445 * Callback that indicates the connection close, 446 * can be called any time by jakarta mail. 447 * 448 * Whether closing is intended (callback after a call to 449 * {@link #close}) can be checked by looking at the state. 450 * 451 * @param event the event 452 */ 453 @Override 454 public void closed(ConnectionEvent event) { 455 // Ignore if already closed. 456 if (state == ChannelState.Closed) { 457 return; 458 } 459 460 // Handle involuntary close by reopening. 461 if (state != ChannelState.Closing) { 462 disconnected(event); 463 return; 464 } 465 466 // Cleanup and remove channel. 467 synchronized (this) { 468 state = ChannelState.Closed; 469 folderCache.clear(); 470 } 471 downPipeline().fire(new Closed<Void>(), this); 472 super.close(); 473 } 474 475 @Override 476 public void notification(StoreEvent event) { 477 if (event.getMessage().contains("SocketException")) { 478 logger.fine(() -> "Problem with store: " + event.getMessage()); 479 if (store.isConnected()) { 480 logger.fine(() -> "Updating folders to resume"); 481 requestPipeline.fire(new UpdateMailFolders(), this); 482 return; 483 } 484 logger.fine(() -> "Reconnecting to resume"); 485 disconnected(null); 486 } 487 } 488 489 /** 490 * Retrieve the new messages from the folders specified in the 491 * event. 492 * 493 * @param event 494 */ 495 @SuppressWarnings({ "PMD.CognitiveComplexity", 496 "PMD.AvoidInstantiatingObjectsInLoops", 497 "PMD.AvoidDuplicateLiterals" }) 498 public void onUpdateFolders(UpdateMailFolders event) { 499 List<Folder> folders = new ArrayList<>(); 500 List<Message> newMsgs = new ArrayList<>(); 501 if (store.isConnected()) { 502 Set<String> folderNames 503 = new HashSet<>(Arrays.asList(subscribed)); 504 if (event.folderNames().length > 0) { 505 folderNames.retainAll(Arrays.asList(event.folderNames())); 506 } 507 try { 508 for (var folderName : folderNames) { 509 @SuppressWarnings("PMD.CloseResource") 510 Folder folder = getFolder(folderName); 511 if (folder == null) { 512 continue; 513 } 514 folders.add(folder); 515 } 516 } catch (FolderClosedException e) { 517 disconnected(null); 518 } 519 } else { 520 disconnected(null); 521 } 522 event.setResult(folders); 523 Event.onCompletion(event, e -> downPipeline().fire(Event 524 .onCompletion(new MailFoldersUpdated(folders, newMsgs), 525 evt -> refreshWatches(evt)), 526 this)); 527 } 528 529 @SuppressWarnings({ "PMD.GuardLogStatement", 530 "PMD.AvoidRethrowingException", "PMD.CloseResource" }) 531 private Folder getFolder(String folderName) 532 throws FolderClosedException { 533 synchronized (folderCache) { 534 Folder folder = folderCache.get(folderName); 535 if (folder != null) { 536 return folder; 537 } 538 try { 539 folder = store.getFolder(folderName); 540 if (folder == null || !folder.exists()) { 541 logger.fine(() -> "No folder \"" + folderName 542 + "\" in store " + store); 543 return null; 544 } 545 folder.open(Folder.READ_WRITE); 546 folderCache.put(folderName, folder); 547 // Add MessageCountListener to listen for new messages. 548 folder.addMessageCountListener(new MessageCountAdapter() { 549 @Override 550 public void 551 messagesAdded(MessageCountEvent countEvent) { 552 retrievals.submit("UpdateFolder", 553 () -> updateFolders(countEvent)); 554 } 555 556 @Override 557 public void 558 messagesRemoved(MessageCountEvent countEvent) { 559 retrievals.submit("UpdateFolder", 560 () -> updateFolders(countEvent)); 561 } 562 }); 563 return folder; 564 } catch (FolderClosedException e) { 565 throw e; 566 } catch (MessagingException e) { 567 logger.log(Level.FINE, 568 "Cannot open folder: " + e.getMessage(), e); 569 } 570 return null; 571 } 572 } 573 574 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 575 "PMD.GuardLogStatement" }) 576 private void updateFolders(MessageCountEvent event) { 577 List<Message> newMsgs = new ArrayList<>(); 578 if (event.getType() == MessageCountEvent.ADDED) { 579 newMsgs.addAll(Arrays.asList(event.getMessages())); 580 } else if (event.getType() != MessageCountEvent.REMOVED) { 581 return; 582 } 583 downPipeline().fire( 584 Event.onCompletion( 585 new MailFoldersUpdated( 586 new ArrayList<>(folderCache.values()), 587 newMsgs), 588 evt -> refreshWatches(evt)), 589 this); 590 } 591 592 /** 593 * Registers the folders from which messages have been received 594 * with the {@link IdleManager}. 595 * 596 * @param event the event 597 */ 598 @SuppressWarnings("PMD.CloseResource") 599 private void refreshWatches(MailFoldersUpdated event) { 600 if (!state.isOpen()) { 601 return; 602 } 603 for (Folder folder : event.folders()) { 604 try { 605 idleManager.watch(getFolder(folder.getFullName())); 606 } catch (MessagingException e) { 607 logger.log(Level.WARNING, "Cannot watch folder.", 608 e); 609 } 610 } 611 idleTimer.reschedule(maxIdleTime); 612 } 613 } 614 615 @Override 616 public String toString() { 617 return Components.objectName(this); 618 } 619 620}