001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2022,2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 
013 * License for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License 
016 * along with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.mail;
020
021import jakarta.mail.AuthenticationFailedException;
022import jakarta.mail.Authenticator;
023import jakarta.mail.Folder;
024import jakarta.mail.FolderClosedException;
025import jakarta.mail.Message;
026import jakarta.mail.MessagingException;
027import jakarta.mail.NoSuchProviderException;
028import jakarta.mail.PasswordAuthentication;
029import jakarta.mail.Session;
030import jakarta.mail.Store;
031import jakarta.mail.event.ConnectionEvent;
032import jakarta.mail.event.ConnectionListener;
033import jakarta.mail.event.MessageCountAdapter;
034import jakarta.mail.event.MessageCountEvent;
035import jakarta.mail.event.StoreEvent;
036import jakarta.mail.event.StoreListener;
037import java.io.IOException;
038import java.time.Duration;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.HashMap;
042import java.util.HashSet;
043import java.util.List;
044import java.util.Map;
045import java.util.Optional;
046import java.util.Properties;
047import java.util.Set;
048import java.util.function.Consumer;
049import java.util.logging.Level;
050import org.eclipse.angus.mail.imap.IMAPFolder;
051import org.eclipse.angus.mail.imap.IdleManager;
052import org.jgrapes.core.Channel;
053import org.jgrapes.core.Components;
054import org.jgrapes.core.Components.Timer;
055import org.jgrapes.core.Event;
056import org.jgrapes.core.EventPipeline;
057import org.jgrapes.core.Subchannel;
058import org.jgrapes.core.annotation.Handler;
059import org.jgrapes.io.events.Closed;
060import org.jgrapes.io.events.ConnectError;
061import org.jgrapes.io.events.IOError;
062import org.jgrapes.io.events.Opening;
063import org.jgrapes.mail.events.MailFoldersUpdated;
064import org.jgrapes.mail.events.MailMonitorOpened;
065import org.jgrapes.mail.events.OpenMailMonitor;
066import org.jgrapes.mail.events.UpdateMailFolders;
067import org.jgrapes.util.Password;
068
069/**
070 * A component that opens mail stores and monitors mail folders for 
071 * mails. After establishing a connection to a store and selected 
072 * folders (see {@link #onOpenMailMonitor(OpenMailMonitor, Channel)}), 
073 * the existing and all subsequently arriving mails will be sent 
074 * downstream using {@link MailFoldersUpdated} events.
075 * 
076 * This implementation uses the {@link IdleManager}. The 
077 * {@link IdleManager} works only if its {@link IdleManager#watch}
078 * method is invoked (for a folder) after any operation on that folder. 
079 * Note that operations such as e.g. setting the deleted flag of 
080 * a message is also an operation on a folder.
081 * 
082 * Folders are updated in response to an {@link UpdateMailFolders} event 
083 * or when the store signals the arrival of new messages. Information 
084 * about the folders is delivered by a {@link MailFoldersUpdated} event. 
085 * Folders may be freely used while handling the event, because the
086 * folders will be re-registered with the {@link IdleManager}
087 * when the {@link MailFoldersUpdated} event completes.
088 * Any usage of folders independent of handling the events mentioned
089 * will result in a loss of the monitor function.
090 * 
091 * If required, the monitor function may be reestablished any time
092 * by firing a {@link UpdateMailFolders} event for the folders used.
093 */
094@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
095    "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
096public class MailMonitor extends MailConnectionManager<
097        MailMonitor.MonitorChannel, OpenMailMonitor> {
098
099    private Duration maxIdleTime = Duration.ofMinutes(25);
100    private static IdleManager idleManager;
101    private final EventPipeline retrievals = newEventPipeline();
102
103    /**
104     * Creates a new server using the given channel.
105     * 
106     * @param componentChannel the component's channel
107     */
108    public MailMonitor(Channel componentChannel) {
109        super(componentChannel);
110    }
111
112    @Override
113    protected boolean connectionsGenerate() {
114        return true;
115    }
116
117    /**
118     * Sets the maximum idle time. A running {@link IMAPFolder#idle()}
119     * is terminated and renewed after this time. Defaults to 25 minutes.
120     *
121     * @param maxIdleTime the new max idle time
122     */
123    public MailMonitor setMaxIdleTime(Duration maxIdleTime) {
124        this.maxIdleTime = maxIdleTime;
125        return this;
126    }
127
128    /**
129     * Returns the max idle time.
130     *
131     * @return the duration
132     */
133    public Duration maxIdleTime() {
134        return maxIdleTime;
135    }
136
137    /**
138     * Configure the component. Currently, only max idle time
139     * is supported.
140     *
141     * @param values the values
142     */
143    @Override
144    protected void configureComponent(Map<String, String> values) {
145        Optional.ofNullable(values.get("maxIdleTime"))
146            .map(Integer::parseInt).map(Duration::ofSeconds)
147            .ifPresent(d -> setMaxIdleTime(d));
148    }
149
150    /**
151     * Open a store as specified by the event and monitor the folders
152     * (also specified by the event). Information about all existing 
153     * and all subsequently arriving mails will be signaled downstream 
154     * using {@link MailFoldersUpdated} events.
155     *
156     * @param event the event
157     * @param channel the channel
158     */
159    @Handler
160    public void onOpenMailMonitor(OpenMailMonitor event, Channel channel) {
161        Properties sessionProps = new Properties(mailProps);
162        sessionProps.putAll(event.mailProperties());
163        sessionProps.put("mail.imap.usesocketchannels", true);
164        Session session = Session.getInstance(sessionProps,
165            // Workaround for class loading problem in OSGi with j.m. 2.1.
166            // Authenticator's classpath allows accessing provider's service.
167            // See https://github.com/eclipse-ee4j/mail/issues/631
168            new Authenticator() {
169                @Override
170                protected PasswordAuthentication
171                        getPasswordAuthentication() {
172                    return new PasswordAuthentication(
173                        sessionProps.getProperty("mail.user"),
174                        new String(event.password().or(() -> password())
175                            .map(Password::password).orElse(new char[0])));
176                }
177            });
178
179        try {
180            synchronized (MailMonitor.class) {
181                // Cannot be created earlier, need session.
182                if (idleManager == null) {
183                    idleManager = new IdleManager(session,
184                        Components.defaultExecutorService());
185                }
186            }
187            new MonitorChannel(event, channel, session.getStore(),
188                sessionProps.getProperty("mail.user"),
189                event.password().or(this::password).orElse(null));
190        } catch (NoSuchProviderException e) {
191            fire(new ConnectError(event, "Cannot create store.", e));
192        } catch (IOException e) {
193            fire(new IOError(event, "Cannot create resource.", e));
194        }
195    }
196
197    /**
198     * Retrieves the folders specified in the event.
199     *
200     * @param event the event
201     * @param channel the channel
202     */
203    @Handler
204    public void onUpdateFolders(UpdateMailFolders event, MailChannel channel) {
205        if (!connections.contains(channel)) {
206            return;
207        }
208        // This can take very long.
209        retrievals
210            .submit(() -> ((MonitorChannel) channel).onUpdateFolders(event));
211    }
212
213    /**
214     * The Enum ChannelState.
215     */
216    @SuppressWarnings("PMD.FieldNamingConventions")
217    private enum ChannelState {
218        Opening {
219            @Override
220            public boolean isOpening() {
221                return true;
222            }
223        },
224        Open {
225            @Override
226            public boolean isOpen() {
227                return true;
228            }
229        },
230        Reopening {
231            @Override
232            public boolean isOpening() {
233                return true;
234            }
235        },
236        Reopened {
237            @Override
238            public boolean isOpen() {
239                return true;
240            }
241        },
242        Closing,
243        Closed;
244
245        /**
246         * Checks if is open.
247         *
248         * @return true, if is open
249         */
250        public boolean isOpen() {
251            return false;
252        }
253
254        /**
255         * Checks if is opening.
256         *
257         * @return true, if is opening
258         */
259        public boolean isOpening() {
260            return false;
261        }
262    }
263
264    /**
265     * The specific implementation of the {@link MailChannel}.
266     */
267    protected class MonitorChannel extends
268            MailConnectionManager<MailMonitor.MonitorChannel,
269                    OpenMailMonitor>.AbstractMailChannel
270            implements ConnectionListener, StoreListener {
271
272        private final EventPipeline requestPipeline;
273        private ChannelState state = ChannelState.Opening;
274        private final Store store;
275        private final String user;
276        private final Password password;
277        private final String[] subscribed;
278        @SuppressWarnings("PMD.UseConcurrentHashMap")
279        private final Map<String, Folder> folderCache = new HashMap<>();
280        private final Timer idleTimer;
281
282        /**
283         * Instantiates a new monitor channel.
284         *
285         * @param event the event that triggered the creation
286         * @param mainChannel the main channel (of this {@link Subchannel})
287         * @param store the store
288         * @param user the user
289         * @param password the password
290         */
291        public MonitorChannel(OpenMailMonitor event, Channel mainChannel,
292                Store store, String user, Password password) {
293            super(event, mainChannel);
294            this.store = store;
295            this.user = user;
296            this.password = password;
297            this.subscribed = event.folderNames();
298            requestPipeline = event.processedBy().get();
299            store.addConnectionListener(this);
300            store.addStoreListener(this);
301            idleTimer = Components.schedule(t -> {
302                requestPipeline.fire(new UpdateMailFolders(), this);
303            }, maxIdleTime);
304            connect(
305                t -> downPipeline().fire(new ConnectError(event, t),
306                    mainChannel));
307        }
308
309        /**
310         * Attempt connections until connected. Attempts are stopped
311         * if it is the first time that the connection is to be
312         * established and the error indicates that the connection
313         * will never succeed (e.g. due to an authentication
314         * problem).
315         *
316         * @param onOpenFailed the on open failed
317         */
318        private void connect(Consumer<Throwable> onOpenFailed) {
319            synchronized (this) {
320                if (state.isOpen()) {
321                    return;
322                }
323                activeEventPipeline().executorService().submit(() -> {
324                    while (state.isOpening()) {
325                        try {
326                            attemptConnect(onOpenFailed);
327                        } catch (InterruptedException e) {
328                            break;
329                        }
330                    }
331                });
332            }
333        }
334
335        /**
336         * Single connection attempt.
337         *
338         * @param onOpenFailed the on open failed
339         * @throws InterruptedException the interrupted exception
340         */
341        @SuppressWarnings("PMD.AvoidInstanceofChecksInCatchClause")
342        private void attemptConnect(Consumer<Throwable> onOpenFailed)
343                throws InterruptedException {
344            try {
345                store.connect(user, new String(password.password()));
346                synchronized (this) {
347                    if (state == ChannelState.Opening) {
348                        state = ChannelState.Open;
349                    } else {
350                        state = ChannelState.Reopened;
351                        return;
352                    }
353                }
354            } catch (MessagingException e) {
355                synchronized (this) {
356                    if (state == ChannelState.Opening
357                        && (e instanceof AuthenticationFailedException
358                            || e instanceof NoSuchProviderException)) {
359                        logger.log(Level.WARNING,
360                            "Connecting to store failed, closing.", e);
361                        state = ChannelState.Closed;
362                        super.close();
363                        if (onOpenFailed != null) {
364                            onOpenFailed.accept(e);
365                        }
366                        return;
367                    }
368                }
369                logger.log(Level.WARNING,
370                    "(Re)connecting to store failed, retrying.", e);
371                Thread.sleep(5000);
372            }
373        }
374
375        /**
376         * Close the connection to the store.
377         */
378        @Override
379        public void close() {
380            synchronized (this) {
381                if (state == ChannelState.Closing
382                    || state == ChannelState.Closed) {
383                    return;
384                }
385                state = ChannelState.Closing;
386            }
387
388            idleTimer.cancel();
389            try {
390                // Initiate close, callback will inform downstream components.
391                store.close();
392            } catch (MessagingException e) {
393                // According to the documentation, the listeners should
394                // be invoked nevertheless.
395                logger.log(Level.WARNING, "Cannot close connection properly.",
396                    e);
397            }
398        }
399
400        /**
401         * Callback from store.connect is the connection is successful.
402         *
403         * @param event the event
404         */
405        @Override
406        @SuppressWarnings({ "PMD.GuardLogStatement",
407            "PMD.AvoidDuplicateLiterals" })
408        public void opened(ConnectionEvent event) {
409            folderCache.clear();
410            if (state == ChannelState.Reopened) {
411                // This is a re-open, only retrieve messages.
412                requestPipeline.fire(new UpdateMailFolders(), this);
413                return;
414            }
415            // (1) Opening, (2) Opened, (3) start retrieving mails
416            downPipeline().fire(Event.onCompletion(new Opening<Void>(),
417                o -> downPipeline().fire(
418                    Event.onCompletion(
419                        new MailMonitorOpened(openEvent(), store),
420                        p -> requestPipeline
421                            .fire(new UpdateMailFolders(), this)),
422                    this)),
423                this);
424        }
425
426        /**
427         * According to the documentation,
428         * {@link ConnectionEvent#DISCONNECTED} is currently not
429         * used. It's implemented nevertheless and called explicitly.
430         *
431         * @param event the event or `null` if called explicitly
432         */
433        @Override
434        public void disconnected(ConnectionEvent event) {
435            synchronized (this) {
436                folderCache.clear();
437                if (state.isOpen()) {
438                    state = ChannelState.Reopening;
439                    connect(null);
440                }
441            }
442        }
443
444        /**
445         * Callback that indicates the connection close,
446         * can be called any time by jakarta mail.
447         * 
448         * Whether closing is intended (callback after a call to 
449         * {@link #close}) can be checked by looking at the state. 
450         *
451         * @param event the event
452         */
453        @Override
454        public void closed(ConnectionEvent event) {
455            // Ignore if already closed.
456            if (state == ChannelState.Closed) {
457                return;
458            }
459
460            // Handle involuntary close by reopening.
461            if (state != ChannelState.Closing) {
462                disconnected(event);
463                return;
464            }
465
466            // Cleanup and remove channel.
467            synchronized (this) {
468                state = ChannelState.Closed;
469                folderCache.clear();
470            }
471            downPipeline().fire(new Closed<Void>(), this);
472            super.close();
473        }
474
475        @Override
476        public void notification(StoreEvent event) {
477            if (event.getMessage().contains("SocketException")) {
478                logger.fine(() -> "Problem with store: " + event.getMessage());
479                if (store.isConnected()) {
480                    logger.fine(() -> "Updating folders to resume");
481                    requestPipeline.fire(new UpdateMailFolders(), this);
482                    return;
483                }
484                logger.fine(() -> "Reconnecting to resume");
485                disconnected(null);
486            }
487        }
488
489        /**
490         * Retrieve the new messages from the folders specified in the
491         * event.
492         * 
493         * @param event
494         */
495        @SuppressWarnings({ "PMD.CognitiveComplexity",
496            "PMD.AvoidInstantiatingObjectsInLoops",
497            "PMD.AvoidDuplicateLiterals" })
498        public void onUpdateFolders(UpdateMailFolders event) {
499            List<Folder> folders = new ArrayList<>();
500            List<Message> newMsgs = new ArrayList<>();
501            if (store.isConnected()) {
502                Set<String> folderNames
503                    = new HashSet<>(Arrays.asList(subscribed));
504                if (event.folderNames().length > 0) {
505                    folderNames.retainAll(Arrays.asList(event.folderNames()));
506                }
507                try {
508                    for (var folderName : folderNames) {
509                        @SuppressWarnings("PMD.CloseResource")
510                        Folder folder = getFolder(folderName);
511                        if (folder == null) {
512                            continue;
513                        }
514                        folders.add(folder);
515                    }
516                } catch (FolderClosedException e) {
517                    disconnected(null);
518                }
519            } else {
520                disconnected(null);
521            }
522            event.setResult(folders);
523            Event.onCompletion(event, e -> downPipeline().fire(Event
524                .onCompletion(new MailFoldersUpdated(folders, newMsgs),
525                    evt -> refreshWatches(evt)),
526                this));
527        }
528
529        @SuppressWarnings({ "PMD.GuardLogStatement",
530            "PMD.AvoidRethrowingException", "PMD.CloseResource" })
531        private Folder getFolder(String folderName)
532                throws FolderClosedException {
533            synchronized (folderCache) {
534                Folder folder = folderCache.get(folderName);
535                if (folder != null) {
536                    return folder;
537                }
538                try {
539                    folder = store.getFolder(folderName);
540                    if (folder == null || !folder.exists()) {
541                        logger.fine(() -> "No folder \"" + folderName
542                            + "\" in store " + store);
543                        return null;
544                    }
545                    folder.open(Folder.READ_WRITE);
546                    folderCache.put(folderName, folder);
547                    // Add MessageCountListener to listen for new messages.
548                    folder.addMessageCountListener(new MessageCountAdapter() {
549                        @Override
550                        public void
551                                messagesAdded(MessageCountEvent countEvent) {
552                            retrievals.submit("UpdateFolder",
553                                () -> updateFolders(countEvent));
554                        }
555
556                        @Override
557                        public void
558                                messagesRemoved(MessageCountEvent countEvent) {
559                            retrievals.submit("UpdateFolder",
560                                () -> updateFolders(countEvent));
561                        }
562                    });
563                    return folder;
564                } catch (FolderClosedException e) {
565                    throw e;
566                } catch (MessagingException e) {
567                    logger.log(Level.FINE,
568                        "Cannot open folder: " + e.getMessage(), e);
569                }
570                return null;
571            }
572        }
573
574        @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
575            "PMD.GuardLogStatement" })
576        private void updateFolders(MessageCountEvent event) {
577            List<Message> newMsgs = new ArrayList<>();
578            if (event.getType() == MessageCountEvent.ADDED) {
579                newMsgs.addAll(Arrays.asList(event.getMessages()));
580            } else if (event.getType() != MessageCountEvent.REMOVED) {
581                return;
582            }
583            downPipeline().fire(
584                Event.onCompletion(
585                    new MailFoldersUpdated(
586                        new ArrayList<>(folderCache.values()),
587                        newMsgs),
588                    evt -> refreshWatches(evt)),
589                this);
590        }
591
592        /**
593         * Registers the folders from which messages have been received
594         * with the {@link IdleManager}.
595         *
596         * @param event the event
597         */
598        @SuppressWarnings("PMD.CloseResource")
599        private void refreshWatches(MailFoldersUpdated event) {
600            if (!state.isOpen()) {
601                return;
602            }
603            for (Folder folder : event.folders()) {
604                try {
605                    idleManager.watch(getFolder(folder.getFullName()));
606                } catch (MessagingException e) {
607                    logger.log(Level.WARNING, "Cannot watch folder.",
608                        e);
609                }
610            }
611            idleTimer.reschedule(maxIdleTime);
612        }
613    }
614
615    @Override
616    public String toString() {
617        return Components.objectName(this);
618    }
619
620}