001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2022 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.mail;
020
021import jakarta.mail.AuthenticationFailedException;
022import jakarta.mail.Authenticator;
023import jakarta.mail.Folder;
024import jakarta.mail.FolderClosedException;
025import jakarta.mail.Message;
026import jakarta.mail.MessagingException;
027import jakarta.mail.NoSuchProviderException;
028import jakarta.mail.PasswordAuthentication;
029import jakarta.mail.Session;
030import jakarta.mail.Store;
031import jakarta.mail.event.ConnectionEvent;
032import jakarta.mail.event.ConnectionListener;
033import jakarta.mail.event.MessageCountAdapter;
034import jakarta.mail.event.MessageCountEvent;
035import java.io.IOException;
036import java.time.Duration;
037import java.util.ArrayList;
038import java.util.Arrays;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.List;
042import java.util.Map;
043import java.util.Optional;
044import java.util.Properties;
045import java.util.Set;
046import java.util.function.Consumer;
047import java.util.logging.Level;
048import java.util.logging.Logger;
049import org.eclipse.angus.mail.imap.IMAPFolder;
050import org.eclipse.angus.mail.imap.IdleManager;
051import org.jgrapes.core.Channel;
052import org.jgrapes.core.Components;
053import org.jgrapes.core.Components.Timer;
054import org.jgrapes.core.Event;
055import org.jgrapes.core.EventPipeline;
056import org.jgrapes.core.Subchannel;
057import org.jgrapes.core.annotation.Handler;
058import org.jgrapes.io.events.Closed;
059import org.jgrapes.io.events.ConnectError;
060import org.jgrapes.io.events.IOError;
061import org.jgrapes.io.events.Opening;
062import org.jgrapes.mail.events.MailFoldersUpdated;
063import org.jgrapes.mail.events.MailMonitorOpened;
064import org.jgrapes.mail.events.OpenMailMonitor;
065import org.jgrapes.mail.events.UpdateMailFolders;
066import org.jgrapes.util.Password;
067
068/**
069 * A component that opens mail stores and monitors mail folders for 
070 * mails. After establishing a connection to a store and selected 
071 * folders (see {@link #onOpenMailMonitor(OpenMailMonitor, Channel)}), 
072 * the existing and all subsequently arriving mails will be sent 
073 * downstream using {@link MailFoldersUpdated} events.
074 * 
075 * This implementation uses the {@link IdleManager}. The 
076 * {@link IdleManager} works only if its {@link IdleManager#watch}
077 * method is invoked (for a folder) after any operation on that folder. 
078 * Note that operations such as e.g. setting the deleted flag of 
079 * a message is also an operation on a folder.
080 * 
081 * Folders are updated in response to an {@link UpdateMailFolders} event 
082 * or when the store signals the arrival of new messages. Information 
083 * about the folders is delivered by a {@link MailFoldersUpdated} event. 
084 * Folders may be freely used while handling the event, because the
085 * folders will be re-registered with the {@link IdleManager}
086 * when the {@link MailFoldersUpdated} event completes.
087 * Any usage of folders independent of handling the events mentioned
088 * will result in a loss of the monitor function.
089 * 
090 * If required, the monitor function may be reestablished any time
091 * by firing a {@link UpdateMailFolders} event for the folders used.
092 */
093@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
094    "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
095public class MailStoreMonitor extends MailConnectionManager<
096        MailStoreMonitor.MonitorChannel, OpenMailMonitor> {
097
098    @SuppressWarnings("PMD.FieldNamingConventions")
099    private static final Logger logger
100        = Logger.getLogger(MailStoreMonitor.class.getName());
101
102    private Duration maxIdleTime = Duration.ofMinutes(25);
103    private static IdleManager idleManager;
104    private final EventPipeline retrievals = newEventPipeline();
105
106    /**
107     * Creates a new server using the given channel.
108     * 
109     * @param componentChannel the component's channel
110     */
111    public MailStoreMonitor(Channel componentChannel) {
112        super(componentChannel);
113    }
114
115    @Override
116    protected boolean connectionsGenerate() {
117        return true;
118    }
119
120    /**
121     * Sets the maximum idle time. A running {@link IMAPFolder#idle()}
122     * is terminated and renewed after this time. Defaults to 25 minutes.
123     *
124     * @param maxIdleTime the new max idle time
125     */
126    public MailStoreMonitor setMaxIdleTime(Duration maxIdleTime) {
127        this.maxIdleTime = maxIdleTime;
128        return this;
129    }
130
131    /**
132     * Returns the max idle time.
133     *
134     * @return the duration
135     */
136    public Duration maxIdleTime() {
137        return maxIdleTime;
138    }
139
140    /**
141     * Configure the component. Currently, only max idle time
142     * is supported.
143     *
144     * @param values the values
145     */
146    @Override
147    protected void configureComponent(Map<String, String> values) {
148        Optional.ofNullable(values.get("maxIdleTime"))
149            .map(Integer::parseInt).map(Duration::ofSeconds)
150            .ifPresent(d -> setMaxIdleTime(d));
151    }
152
153    /**
154     * Open a store as specified by the event and monitor the folders
155     * (also specified by the event). Information about all existing 
156     * and all subsequently arriving mails will be signaled downstream 
157     * using {@link MailFoldersUpdated} events.
158     *
159     * @param event the event
160     * @param channel the channel
161     */
162    @Handler
163    public void onOpenMailMonitor(OpenMailMonitor event, Channel channel) {
164        Properties sessionProps = new Properties(mailProps);
165        sessionProps.putAll(event.mailProperties());
166        sessionProps.put("mail.imap.usesocketchannels", true);
167        Session session = Session.getInstance(sessionProps,
168            // Workaround for class loading problem in OSGi with j.m. 2.1.
169            // Authenticator's classpath allows accessing provider's service.
170            // See https://github.com/eclipse-ee4j/mail/issues/631
171            new Authenticator() {
172                @Override
173                protected PasswordAuthentication
174                        getPasswordAuthentication() {
175                    return new PasswordAuthentication(
176                        sessionProps.getProperty("mail.user"),
177                        new String(event.password().or(() -> password())
178                            .map(Password::password).orElse(new char[0])));
179                }
180            });
181
182        try {
183            synchronized (MailStoreMonitor.class) {
184                // Cannot be created earlier, need session.
185                if (idleManager == null) {
186                    idleManager = new IdleManager(session,
187                        Components.defaultExecutorService());
188                }
189            }
190            new MonitorChannel(event, channel, session.getStore(),
191                sessionProps.getProperty("mail.user"),
192                event.password().or(this::password).orElse(null));
193        } catch (NoSuchProviderException e) {
194            fire(new ConnectError(event, "Cannot create store.", e));
195        } catch (IOException e) {
196            fire(new IOError(event, "Cannot create resource.", e));
197        }
198    }
199
200    /**
201     * Retrieves the folders specified in the event.
202     *
203     * @param event the event
204     * @param channel the channel
205     */
206    @Handler
207    public void onUpdateFolders(UpdateMailFolders event, MailChannel channel) {
208        if (!connections.contains(channel)) {
209            return;
210        }
211        // This can take very long.
212        retrievals
213            .submit(() -> ((MonitorChannel) channel).onUpdateFolders(event));
214    }
215
216    /**
217     * The Enum ChannelState.
218     */
219    @SuppressWarnings("PMD.FieldNamingConventions")
220    private enum ChannelState {
221        Opening {
222            @Override
223            public boolean isOpening() {
224                return true;
225            }
226        },
227        Open {
228            @Override
229            public boolean isOpen() {
230                return true;
231            }
232        },
233        Reopening {
234            @Override
235            public boolean isOpening() {
236                return true;
237            }
238        },
239        Reopened {
240            @Override
241            public boolean isOpen() {
242                return true;
243            }
244        },
245        Closing,
246        Closed;
247
248        /**
249         * Checks if is open.
250         *
251         * @return true, if is open
252         */
253        public boolean isOpen() {
254            return false;
255        }
256
257        /**
258         * Checks if is opening.
259         *
260         * @return true, if is opening
261         */
262        public boolean isOpening() {
263            return false;
264        }
265    }
266
267    /**
268     * The specific implementation of the {@link MailChannel}.
269     */
270    protected class MonitorChannel extends
271            MailConnectionManager<MailStoreMonitor.MonitorChannel,
272                    OpenMailMonitor>.AbstractMailChannel
273            implements ConnectionListener {
274
275        private final EventPipeline requestPipeline;
276        private ChannelState state = ChannelState.Opening;
277        private final Store store;
278        private final String user;
279        private final Password password;
280        private final String[] subscribed;
281        @SuppressWarnings("PMD.UseConcurrentHashMap")
282        private final Map<String, Folder> folderCache = new HashMap<>();
283        private final Timer idleTimer;
284
285        /**
286         * Instantiates a new monitor channel.
287         *
288         * @param event the event that triggered the creation
289         * @param mainChannel the main channel (of this {@link Subchannel})
290         * @param store the store
291         * @param user the user
292         * @param password the password
293         */
294        public MonitorChannel(OpenMailMonitor event, Channel mainChannel,
295                Store store, String user, Password password) {
296            super(event, mainChannel);
297            this.store = store;
298            this.user = user;
299            this.password = password;
300            this.subscribed = event.folderNames();
301            requestPipeline = event.processedBy().get();
302            store.addConnectionListener(this);
303            idleTimer = Components.schedule(t -> {
304                requestPipeline.fire(new UpdateMailFolders(), this);
305            }, maxIdleTime);
306            connect(
307                t -> downPipeline().fire(new ConnectError(event, t),
308                    mainChannel));
309        }
310
311        /**
312         * Attempt connections until connected. Attempts are stopped
313         * if it is the first time that the connection is to be
314         * established and the error indicates that the connection
315         * will never succeed (e.g. due to an authentication
316         * problem).
317         *
318         * @param onOpenFailed the on open failed
319         */
320        private void connect(Consumer<Throwable> onOpenFailed) {
321            synchronized (this) {
322                if (state.isOpen()) {
323                    return;
324                }
325                activeEventPipeline().executorService().submit(() -> {
326                    while (state.isOpening()) {
327                        try {
328                            attemptConnect(onOpenFailed);
329                        } catch (InterruptedException e) {
330                            break;
331                        }
332                    }
333                });
334            }
335        }
336
337        /**
338         * Single connection attempt.
339         *
340         * @param onOpenFailed the on open failed
341         * @throws InterruptedException the interrupted exception
342         */
343        @SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
344        private void attemptConnect(Consumer<Throwable> onOpenFailed)
345                throws InterruptedException {
346            try {
347                store.connect(user, new String(password.password()));
348                synchronized (this) {
349                    if (state == ChannelState.Opening) {
350                        state = ChannelState.Open;
351                    } else {
352                        state = ChannelState.Reopened;
353                        return;
354                    }
355                }
356            } catch (MessagingException e) {
357                synchronized (this) {
358                    if (state == ChannelState.Opening
359                        && (e instanceof AuthenticationFailedException
360                            || e instanceof NoSuchProviderException)) {
361                        logger.log(Level.WARNING,
362                            "Connecting to store failed, closing.", e);
363                        state = ChannelState.Closed;
364                        super.close();
365                        if (onOpenFailed != null) {
366                            onOpenFailed.accept(e);
367                        }
368                        return;
369                    }
370                }
371                logger.log(Level.WARNING,
372                    "(Re)connecting to store failed, retrying.", e);
373                Thread.sleep(5000);
374            }
375        }
376
377        /**
378         * Close the connection to the store.
379         */
380        @Override
381        public void close() {
382            synchronized (this) {
383                if (state == ChannelState.Closing
384                    || state == ChannelState.Closed) {
385                    return;
386                }
387                state = ChannelState.Closing;
388            }
389
390            idleTimer.cancel();
391            try {
392                // Initiate close, callback will inform downstream components.
393                store.close();
394            } catch (MessagingException e) {
395                // According to the documentation, the listeners should
396                // be invoked nevertheless.
397                logger.log(Level.WARNING, "Cannot close connection properly.",
398                    e);
399            }
400        }
401
402        /**
403         * Callback from store.connect is the connection is successful.
404         *
405         * @param event the event
406         */
407        @Override
408        @SuppressWarnings({ "PMD.GuardLogStatement",
409            "PMD.AvoidDuplicateLiterals" })
410        public void opened(ConnectionEvent event) {
411            folderCache.clear();
412            if (state == ChannelState.Reopened) {
413                // This is a re-open, only retrieve messages.
414                requestPipeline.fire(new UpdateMailFolders(), this);
415                return;
416            }
417            // (1) Opening, (2) Opened, (3) start retrieving mails
418            downPipeline().fire(Event.onCompletion(new Opening<Void>(),
419                o -> downPipeline().fire(
420                    Event.onCompletion(
421                        new MailMonitorOpened(openEvent(), store),
422                        p -> requestPipeline
423                            .fire(new UpdateMailFolders(), this)),
424                    this)),
425                this);
426        }
427
428        /**
429         * According to the documentation,
430         * {@link ConnectionEvent#DISCONNECTED} is currently not
431         * used. It's implemented nevertheless and called explicitly.
432         *
433         * @param event the event or `null` if called explicitly
434         */
435        @Override
436        public void disconnected(ConnectionEvent event) {
437            synchronized (this) {
438                folderCache.clear();
439                if (state.isOpen()) {
440                    state = ChannelState.Reopening;
441                    connect(null);
442                }
443            }
444        }
445
446        /**
447         * Callback that indicates the connection close,
448         * can be called any time by jakarta mail.
449         * 
450         * Whether closing is intended (callback after a call to 
451         * {@link #close}) can be checked by looking at the state. 
452         *
453         * @param event the event
454         */
455        @Override
456        public void closed(ConnectionEvent event) {
457            // Ignore if already closed.
458            if (state == ChannelState.Closed) {
459                return;
460            }
461
462            // Handle involuntary close by reopening.
463            if (state != ChannelState.Closing) {
464                disconnected(event);
465                return;
466            }
467
468            // Cleanup and remove channel.
469            synchronized (this) {
470                state = ChannelState.Closed;
471                folderCache.clear();
472            }
473            downPipeline().fire(new Closed<Void>(), this);
474            super.close();
475        }
476
477        /**
478         * Retrieve the new messages from the folders specified in the
479         * event.
480         * 
481         * @param event
482         */
483        @SuppressWarnings({ "PMD.CognitiveComplexity",
484            "PMD.AvoidInstantiatingObjectsInLoops",
485            "PMD.AvoidDuplicateLiterals" })
486        public void onUpdateFolders(UpdateMailFolders event) {
487            List<Folder> folders = new ArrayList<>();
488            List<Message> newMsgs = new ArrayList<>();
489            if (store.isConnected()) {
490                Set<String> folderNames
491                    = new HashSet<>(Arrays.asList(subscribed));
492                if (event.folderNames().length > 0) {
493                    folderNames.retainAll(Arrays.asList(event.folderNames()));
494                }
495                try {
496                    for (var folderName : folderNames) {
497                        @SuppressWarnings("PMD.CloseResource")
498                        Folder folder = getFolder(folderName);
499                        if (folder == null) {
500                            continue;
501                        }
502                        folders.add(folder);
503                    }
504                } catch (FolderClosedException e) {
505                    disconnected(null);
506                }
507            } else {
508                disconnected(null);
509            }
510            event.setResult(folders);
511            Event.onCompletion(event, e -> downPipeline().fire(Event
512                .onCompletion(new MailFoldersUpdated(folders, newMsgs),
513                    evt -> refreshWatches(evt)),
514                this));
515        }
516
517        @SuppressWarnings({ "PMD.GuardLogStatement",
518            "PMD.AvoidRethrowingException", "PMD.CloseResource" })
519        private Folder getFolder(String folderName)
520                throws FolderClosedException {
521            synchronized (folderCache) {
522                Folder folder = folderCache.get(folderName);
523                if (folder != null) {
524                    return folder;
525                }
526                try {
527                    folder = store.getFolder(folderName);
528                    if (folder == null || !folder.exists()) {
529                        logger.fine(() -> "No folder \"" + folderName
530                            + "\" in store " + store);
531                        return null;
532                    }
533                    folder.open(Folder.READ_WRITE);
534                    folderCache.put(folderName, folder);
535                    // Add MessageCountListener to listen for new messages.
536                    folder.addMessageCountListener(new MessageCountAdapter() {
537                        @Override
538                        public void
539                                messagesAdded(MessageCountEvent countEvent) {
540                            retrievals.submit("UpdateFolder",
541                                () -> updateFolders(countEvent));
542                        }
543
544                        @Override
545                        public void
546                                messagesRemoved(MessageCountEvent countEvent) {
547                            retrievals.submit("UpdateFolder",
548                                () -> updateFolders(countEvent));
549                        }
550                    });
551                    return folder;
552                } catch (FolderClosedException e) {
553                    throw e;
554                } catch (MessagingException e) {
555                    logger.log(Level.FINE,
556                        "Cannot open folder: " + e.getMessage(), e);
557                }
558                return null;
559            }
560        }
561
562        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
563            "PMD.GuardLogStatement" })
564        private void updateFolders(MessageCountEvent event) {
565            List<Message> newMsgs = new ArrayList<>();
566            if (event.getType() == MessageCountEvent.ADDED) {
567                newMsgs.addAll(Arrays.asList(event.getMessages()));
568            } else if (event.getType() != MessageCountEvent.REMOVED) {
569                return;
570            }
571            downPipeline().fire(
572                Event.onCompletion(
573                    new MailFoldersUpdated(
574                        new ArrayList<>(folderCache.values()),
575                        newMsgs),
576                    evt -> refreshWatches(evt)),
577                this);
578        }
579
580        /**
581         * Registers the folders from which messages have been received
582         * with the {@link IdleManager}.
583         *
584         * @param event the event
585         */
586        @SuppressWarnings("PMD.CloseResource")
587        private void refreshWatches(MailFoldersUpdated event) {
588            if (!state.isOpen()) {
589                return;
590            }
591            for (Folder folder : event.folders()) {
592                try {
593                    idleManager.watch(getFolder(folder.getFullName()));
594                } catch (MessagingException e) {
595                    logger.log(Level.WARNING, "Cannot watch folder.",
596                        e);
597                }
598            }
599            idleTimer.reschedule(maxIdleTime);
600        }
601    }
602
603    @Override
604    public String toString() {
605        return Components.objectName(this);
606    }
607
608}