001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2022,2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 
013 * License for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License 
016 * along with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.mail;
020
021import jakarta.mail.AuthenticationFailedException;
022import jakarta.mail.Authenticator;
023import jakarta.mail.Folder;
024import jakarta.mail.FolderClosedException;
025import jakarta.mail.Message;
026import jakarta.mail.MessagingException;
027import jakarta.mail.NoSuchProviderException;
028import jakarta.mail.PasswordAuthentication;
029import jakarta.mail.Session;
030import jakarta.mail.Store;
031import jakarta.mail.event.ConnectionEvent;
032import jakarta.mail.event.ConnectionListener;
033import jakarta.mail.event.MessageCountAdapter;
034import jakarta.mail.event.MessageCountEvent;
035import jakarta.mail.event.StoreEvent;
036import jakarta.mail.event.StoreListener;
037import java.io.IOException;
038import java.time.Duration;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.HashMap;
042import java.util.HashSet;
043import java.util.List;
044import java.util.Map;
045import java.util.Optional;
046import java.util.Properties;
047import java.util.Set;
048import java.util.function.Consumer;
049import java.util.logging.Level;
050import org.eclipse.angus.mail.imap.IMAPFolder;
051import org.eclipse.angus.mail.imap.IdleManager;
052import org.jgrapes.core.Channel;
053import org.jgrapes.core.Components;
054import org.jgrapes.core.Components.Timer;
055import org.jgrapes.core.Event;
056import org.jgrapes.core.EventPipeline;
057import org.jgrapes.core.Subchannel;
058import org.jgrapes.core.annotation.Handler;
059import org.jgrapes.io.events.Closed;
060import org.jgrapes.io.events.ConnectError;
061import org.jgrapes.io.events.IOError;
062import org.jgrapes.io.events.Opening;
063import org.jgrapes.mail.events.MailFoldersUpdated;
064import org.jgrapes.mail.events.MailMonitorOpened;
065import org.jgrapes.mail.events.OpenMailMonitor;
066import org.jgrapes.mail.events.UpdateMailFolders;
067import org.jgrapes.util.Password;
068
069/**
070 * A component that opens mail stores and monitors mail folders for 
071 * mails. After establishing a connection to a store and selected 
072 * folders (see {@link #onOpenMailMonitor(OpenMailMonitor, Channel)}), 
073 * the existing and all subsequently arriving mails will be sent 
074 * downstream using {@link MailFoldersUpdated} events.
075 * 
076 * This implementation uses the {@link IdleManager}. The 
077 * {@link IdleManager} works only if its {@link IdleManager#watch}
078 * method is invoked (for a folder) after any operation on that folder. 
079 * Note that operations such as e.g. setting the deleted flag of 
080 * a message is also an operation on a folder.
081 * 
082 * Folders are updated in response to an {@link UpdateMailFolders} event 
083 * or when the store signals the arrival of new messages. Information 
084 * about the folders is delivered by a {@link MailFoldersUpdated} event. 
085 * Folders may be freely used while handling the event, because the
086 * folders will be re-registered with the {@link IdleManager}
087 * when the {@link MailFoldersUpdated} event completes.
088 * Any usage of folders independent of handling the events mentioned
089 * will result in a loss of the monitor function.
090 * 
091 * If required, the monitor function may be reestablished any time
092 * by firing a {@link UpdateMailFolders} event for the folders used.
093 */
094@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
095    "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports",
096    "PMD.CouplingBetweenObjects" })
097public class MailMonitor extends MailConnectionManager<
098        MailMonitor.MonitorChannel, OpenMailMonitor> {
099
100    private Duration maxIdleTime = Duration.ofMinutes(25);
101    private static IdleManager idleManager;
102    private final EventPipeline retrievals = newEventPipeline();
103
104    /**
105     * Creates a new server using the given channel.
106     * 
107     * @param componentChannel the component's channel
108     */
109    public MailMonitor(Channel componentChannel) {
110        super(componentChannel);
111    }
112
113    @Override
114    protected boolean connectionsGenerate() {
115        return true;
116    }
117
118    /**
119     * Sets the maximum idle time. A running {@link IMAPFolder#idle()}
120     * is terminated and renewed after this time. Defaults to 25 minutes.
121     *
122     * @param maxIdleTime the new max idle time
123     */
124    public MailMonitor setMaxIdleTime(Duration maxIdleTime) {
125        this.maxIdleTime = maxIdleTime;
126        return this;
127    }
128
129    /**
130     * Returns the max idle time.
131     *
132     * @return the duration
133     */
134    public Duration maxIdleTime() {
135        return maxIdleTime;
136    }
137
138    /**
139     * Configure the component. Currently, only max idle time
140     * is supported.
141     *
142     * @param values the values
143     */
144    @Override
145    protected void configureComponent(Map<String, String> values) {
146        Optional.ofNullable(values.get("maxIdleTime"))
147            .map(Integer::parseInt).map(Duration::ofSeconds)
148            .ifPresent(this::setMaxIdleTime);
149    }
150
151    /**
152     * Open a store as specified by the event and monitor the folders
153     * (also specified by the event). Information about all existing 
154     * and all subsequently arriving mails will be signaled downstream 
155     * using {@link MailFoldersUpdated} events.
156     *
157     * @param event the event
158     * @param channel the channel
159     */
160    @Handler
161    public void onOpenMailMonitor(OpenMailMonitor event, Channel channel) {
162        Properties sessionProps = new Properties(mailProps);
163        sessionProps.putAll(event.mailProperties());
164        sessionProps.put("mail.imap.usesocketchannels", true);
165        Session session = Session.getInstance(sessionProps,
166            // Workaround for class loading problem in OSGi with j.m. 2.1.
167            // Authenticator's classpath allows accessing provider's service.
168            // See https://github.com/eclipse-ee4j/mail/issues/631
169            new Authenticator() {
170                @Override
171                @SuppressWarnings("PMD.StringInstantiation")
172                protected PasswordAuthentication
173                        getPasswordAuthentication() {
174                    return new PasswordAuthentication(
175                        sessionProps.getProperty("mail.user"),
176                        new String(event.password().or(() -> password())
177                            .map(Password::password).orElse(new char[0])));
178                }
179            });
180
181        try {
182            synchronized (MailMonitor.class) {
183                // Cannot be created earlier, need session.
184                if (idleManager == null) {
185                    idleManager = new IdleManager(session,
186                        Components.defaultExecutorService());
187                }
188            }
189            new MonitorChannel(event, channel, session.getStore(),
190                sessionProps.getProperty("mail.user"),
191                event.password().or(this::password).orElse(null));
192        } catch (NoSuchProviderException e) {
193            fire(new ConnectError(event, "Cannot create store.", e));
194        } catch (IOException e) {
195            fire(new IOError(event, "Cannot create resource.", e));
196        }
197    }
198
199    /**
200     * Retrieves the folders specified in the event.
201     *
202     * @param event the event
203     * @param channel the channel
204     */
205    @Handler
206    public void onUpdateFolders(UpdateMailFolders event, MailChannel channel) {
207        if (!connections.contains(channel)) {
208            return;
209        }
210        // This can take very long.
211        retrievals
212            .submit(() -> ((MonitorChannel) channel).onUpdateFolders(event));
213    }
214
215    /**
216     * The Enum ChannelState.
217     */
218    @SuppressWarnings("PMD.FieldNamingConventions")
219    private enum ChannelState {
220        Opening {
221            @Override
222            public boolean isOpening() {
223                return true;
224            }
225        },
226        Open {
227            @Override
228            public boolean isOpen() {
229                return true;
230            }
231        },
232        Reopening {
233            @Override
234            public boolean isOpening() {
235                return true;
236            }
237        },
238        Reopened {
239            @Override
240            public boolean isOpen() {
241                return true;
242            }
243        },
244        Closing,
245        Closed;
246
247        /**
248         * Checks if is open.
249         *
250         * @return true, if is open
251         */
252        public boolean isOpen() {
253            return false;
254        }
255
256        /**
257         * Checks if is opening.
258         *
259         * @return true, if is opening
260         */
261        public boolean isOpening() {
262            return false;
263        }
264    }
265
266    /**
267     * The specific implementation of the {@link MailChannel}.
268     */
269    protected class MonitorChannel extends
270            MailConnectionManager<MailMonitor.MonitorChannel,
271                    OpenMailMonitor>.AbstractMailChannel
272            implements ConnectionListener, StoreListener {
273
274        private final EventPipeline requestPipeline;
275        private ChannelState state = ChannelState.Opening;
276        private final Store store;
277        private final String user;
278        private final Password password;
279        private final String[] subscribed;
280        @SuppressWarnings("PMD.UseConcurrentHashMap")
281        private final Map<String, Folder> folderCache = new HashMap<>();
282        private final Timer idleTimer;
283
284        /**
285         * Instantiates a new monitor channel.
286         *
287         * @param event the event that triggered the creation
288         * @param mainChannel the main channel (of this {@link Subchannel})
289         * @param store the store
290         * @param user the user
291         * @param password the password
292         */
293        public MonitorChannel(OpenMailMonitor event, Channel mainChannel,
294                Store store, String user, Password password) {
295            super(event, mainChannel);
296            this.store = store;
297            this.user = user;
298            this.password = password;
299            this.subscribed = event.folderNames();
300            requestPipeline = event.processedBy().get();
301            store.addConnectionListener(this);
302            store.addStoreListener(this);
303            idleTimer = Components.schedule(t -> {
304                requestPipeline.fire(new UpdateMailFolders(), this);
305            }, maxIdleTime);
306            connect(
307                t -> downPipeline().fire(new ConnectError(event, t),
308                    mainChannel));
309        }
310
311        /**
312         * Attempt connections until connected. Attempts are stopped
313         * if it is the first time that the connection is to be
314         * established and the error indicates that the connection
315         * will never succeed (e.g. due to an authentication
316         * problem).
317         *
318         * @param onOpenFailed the on open failed
319         */
320        private void connect(Consumer<Throwable> onOpenFailed) {
321            synchronized (this) {
322                if (state.isOpen()) {
323                    return;
324                }
325                activeEventPipeline().executorService().submit(() -> {
326                    while (state.isOpening()) {
327                        try {
328                            attemptConnect(onOpenFailed);
329                        } catch (InterruptedException e) {
330                            break;
331                        }
332                    }
333                });
334            }
335        }
336
337        /**
338         * Single connection attempt.
339         *
340         * @param onOpenFailed the on open failed
341         * @throws InterruptedException the interrupted exception
342         */
343        @SuppressWarnings({ "PMD.AvoidInstanceofChecksInCatchClause",
344            "PMD.StringInstantiation" })
345        private void attemptConnect(Consumer<Throwable> onOpenFailed)
346                throws InterruptedException {
347            try {
348                store.connect(user, new String(password.password()));
349                synchronized (this) {
350                    if (state == ChannelState.Opening) {
351                        state = ChannelState.Open;
352                    } else {
353                        state = ChannelState.Reopened;
354                    }
355                }
356            } catch (MessagingException e) {
357                synchronized (this) {
358                    if (state == ChannelState.Opening
359                        && (e instanceof AuthenticationFailedException
360                            || e instanceof NoSuchProviderException)) {
361                        logger.log(Level.WARNING,
362                            "Connecting to store failed, closing.", e);
363                        state = ChannelState.Closed;
364                        super.close();
365                        if (onOpenFailed != null) {
366                            onOpenFailed.accept(e);
367                        }
368                        return;
369                    }
370                }
371                logger.log(Level.WARNING,
372                    "(Re)connecting to store failed, retrying.", e);
373                Thread.sleep(5000);
374            }
375        }
376
377        /**
378         * Close the connection to the store.
379         */
380        @Override
381        public void close() {
382            synchronized (this) {
383                if (state == ChannelState.Closing
384                    || state == ChannelState.Closed) {
385                    return;
386                }
387                state = ChannelState.Closing;
388            }
389
390            idleTimer.cancel();
391            try {
392                // Initiate close, callback will inform downstream components.
393                store.close();
394            } catch (MessagingException e) {
395                // According to the documentation, the listeners should
396                // be invoked nevertheless.
397                logger.log(Level.WARNING, "Cannot close connection properly.",
398                    e);
399            }
400        }
401
402        /**
403         * Callback from store.connect is the connection is successful.
404         *
405         * @param event the event
406         */
407        @Override
408        @SuppressWarnings({ "PMD.GuardLogStatement",
409            "PMD.AvoidDuplicateLiterals" })
410        public void opened(ConnectionEvent event) {
411            folderCache.clear();
412            if (state == ChannelState.Reopened) {
413                // This is a re-open, only retrieve messages.
414                requestPipeline.fire(new UpdateMailFolders(), this);
415                return;
416            }
417            // (1) Opening, (2) Opened, (3) start retrieving mails
418            downPipeline().fire(Event.onCompletion(new Opening<Void>(),
419                o -> downPipeline().fire(
420                    Event.onCompletion(
421                        new MailMonitorOpened(openEvent(), store),
422                        p -> requestPipeline
423                            .fire(new UpdateMailFolders(), this)),
424                    this)),
425                this);
426        }
427
428        /**
429         * According to the documentation,
430         * {@link ConnectionEvent#DISCONNECTED} is currently not
431         * used. It's implemented nevertheless and called explicitly.
432         *
433         * @param event the event or `null` if called explicitly
434         */
435        @Override
436        public void disconnected(ConnectionEvent event) {
437            synchronized (this) {
438                folderCache.clear();
439                if (state.isOpen()) {
440                    state = ChannelState.Reopening;
441                    connect(null);
442                }
443            }
444        }
445
446        /**
447         * Callback that indicates the connection close,
448         * can be called any time by jakarta mail.
449         * 
450         * Whether closing is intended (callback after a call to 
451         * {@link #close}) can be checked by looking at the state. 
452         *
453         * @param event the event
454         */
455        @Override
456        public void closed(ConnectionEvent event) {
457            // Ignore if already closed.
458            if (state == ChannelState.Closed) {
459                return;
460            }
461
462            // Handle involuntary close by reopening.
463            if (state != ChannelState.Closing) {
464                disconnected(event);
465                return;
466            }
467
468            // Cleanup and remove channel.
469            synchronized (this) {
470                state = ChannelState.Closed;
471                folderCache.clear();
472            }
473            downPipeline().fire(new Closed<Void>(), this);
474            super.close();
475        }
476
477        @Override
478        public void notification(StoreEvent event) {
479            if (event.getMessage().contains("SocketException")) {
480                logger.fine(() -> "Problem with store: " + event.getMessage());
481                if (store.isConnected()) {
482                    logger.fine(() -> "Updating folders to resume");
483                    requestPipeline.fire(new UpdateMailFolders(), this);
484                    return;
485                }
486                logger.fine(() -> "Reconnecting to resume");
487                disconnected(null);
488            }
489        }
490
491        /**
492         * Retrieve the new messages from the folders specified in the
493         * event.
494         * 
495         * @param event
496         */
497        @SuppressWarnings({ "PMD.CognitiveComplexity",
498            "PMD.AvoidInstantiatingObjectsInLoops",
499            "PMD.AvoidDuplicateLiterals" })
500        public void onUpdateFolders(UpdateMailFolders event) {
501            List<Folder> folders = new ArrayList<>();
502            List<Message> newMsgs = new ArrayList<>();
503            if (store.isConnected()) {
504                Set<String> folderNames
505                    = new HashSet<>(Arrays.asList(subscribed));
506                if (event.folderNames().length > 0) {
507                    folderNames.retainAll(Arrays.asList(event.folderNames()));
508                }
509                try {
510                    for (var folderName : folderNames) {
511                        @SuppressWarnings("PMD.CloseResource")
512                        Folder folder = getFolder(folderName);
513                        if (folder == null) {
514                            continue;
515                        }
516                        folders.add(folder);
517                    }
518                } catch (FolderClosedException e) {
519                    disconnected(null);
520                }
521            } else {
522                disconnected(null);
523            }
524            event.setResult(folders);
525            Event.onCompletion(event, e -> downPipeline().fire(Event
526                .onCompletion(new MailFoldersUpdated(folders, newMsgs),
527                    this::refreshWatches),
528                this));
529        }
530
531        @SuppressWarnings({ "PMD.GuardLogStatement",
532            "PMD.AvoidRethrowingException", "PMD.CloseResource" })
533        private Folder getFolder(String folderName)
534                throws FolderClosedException {
535            synchronized (folderCache) {
536                Folder folder = folderCache.get(folderName);
537                if (folder != null) {
538                    return folder;
539                }
540                try {
541                    folder = store.getFolder(folderName);
542                    if (folder == null || !folder.exists()) {
543                        logger.fine(() -> "No folder \"" + folderName
544                            + "\" in store " + store);
545                        return null;
546                    }
547                    folder.open(Folder.READ_WRITE);
548                    folderCache.put(folderName, folder);
549                    // Add MessageCountListener to listen for new messages.
550                    folder.addMessageCountListener(new MessageCountAdapter() {
551                        @Override
552                        public void
553                                messagesAdded(MessageCountEvent countEvent) {
554                            retrievals.submit("UpdateFolder",
555                                () -> updateFolders(countEvent));
556                        }
557
558                        @Override
559                        public void
560                                messagesRemoved(MessageCountEvent countEvent) {
561                            retrievals.submit("UpdateFolder",
562                                () -> updateFolders(countEvent));
563                        }
564                    });
565                    return folder;
566                } catch (FolderClosedException e) {
567                    throw e;
568                } catch (MessagingException e) {
569                    logger.log(Level.FINE,
570                        "Cannot open folder: " + e.getMessage(), e);
571                }
572                return null;
573            }
574        }
575
576        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
577            "PMD.GuardLogStatement" })
578        private void updateFolders(MessageCountEvent event) {
579            List<Message> newMsgs = new ArrayList<>();
580            if (event.getType() == MessageCountEvent.ADDED) {
581                newMsgs.addAll(Arrays.asList(event.getMessages()));
582            } else if (event.getType() != MessageCountEvent.REMOVED) {
583                return;
584            }
585            downPipeline().fire(
586                Event.onCompletion(
587                    new MailFoldersUpdated(
588                        new ArrayList<>(folderCache.values()), newMsgs),
589                    this::refreshWatches),
590                this);
591        }
592
593        /**
594         * Registers the folders from which messages have been received
595         * with the {@link IdleManager}.
596         *
597         * @param event the event
598         */
599        @SuppressWarnings({ "PMD.CloseResource", "PMD.UnusedPrivateMethod" })
600        private void refreshWatches(MailFoldersUpdated event) {
601            if (!state.isOpen()) {
602                return;
603            }
604            for (Folder folder : event.folders()) {
605                try {
606                    idleManager.watch(getFolder(folder.getFullName()));
607                } catch (MessagingException e) {
608                    logger.log(Level.WARNING, "Cannot watch folder.",
609                        e);
610                }
611            }
612            idleTimer.reschedule(maxIdleTime);
613        }
614    }
615
616    @Override
617    public String toString() {
618        return Components.objectName(this);
619    }
620
621}