001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public 
013 * License for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License 
016 * along with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io.util;
020
021import java.util.HashSet;
022import java.util.Set;
023import java.util.concurrent.ExecutorService;
024import org.jgrapes.core.Channel;
025import org.jgrapes.core.Component;
026import org.jgrapes.core.Components;
027import org.jgrapes.core.Event;
028import org.jgrapes.core.EventPipeline;
029import org.jgrapes.core.Manager;
030import org.jgrapes.core.Subchannel;
031import org.jgrapes.core.annotation.Handler;
032import org.jgrapes.core.annotation.HandlerDefinition.ChannelReplacements;
033import org.jgrapes.core.events.Stop;
034import org.jgrapes.io.events.Close;
035
036/**
037 * A base class for components that manage {@link Subchannel}s representing
038 * some kind of connection to a server or service.
039 *
040 * @param <C> the type of the managed connections
041 */
042public abstract class ConnectionManager<
043        C extends ConnectionManager<C>.Connection>
044        extends Component {
045
046    protected final Set<C> connections = new HashSet<>();
047    private ExecutorService executorService;
048
049    /**
050     * Creates a new component base with its channel set to
051     * itself.
052     */
053    public ConnectionManager() {
054        super();
055    }
056
057    /**
058     * Creates a new component base with its channel set to the given 
059     * channel. As a special case {@link Channel#SELF} can be
060     * passed to the constructor to make the component use itself
061     * as channel. The special value is necessary as you 
062     * obviously cannot pass an object to be constructed to its 
063     * constructor.
064     *
065     * @param componentChannel the channel that the component's
066     * handlers listen on by default and that 
067     * {@link Manager#fire(Event, Channel...)} sends the event to
068     */
069    public ConnectionManager(Channel componentChannel,
070            ChannelReplacements channelReplacements) {
071        super(componentChannel, channelReplacements);
072    }
073
074    /**
075     * Creates a new component base like 
076     * {@link Component#Component(Channel)} but with channel mappings 
077     * for {@link Handler} annotations.
078     *
079     * @param componentChannel the channel that the component's
080     * handlers listen on by default and that 
081     * {@link Manager#fire(Event, Channel...)} sends the event to
082     */
083    public ConnectionManager(Channel componentChannel) {
084        super(componentChannel);
085    }
086
087    /**
088     * If connections are event generators, register the component as
089     * generator upon the creation of the first connection and unregister
090     * it when closing the last one.  
091     *
092     * @return true, if connections generate
093     */
094    protected abstract boolean connectionsGenerate();
095
096    /**
097     * Sets an executor service to be used by the downstream event 
098     * pipelines. Setting this to an executor service with a limited 
099     * number of threads allows to control the maximum load caused 
100     * by events generated by this component.
101     * 
102     * @param executorService the executorService to set
103     * @return the connection manager for easy chaining
104     * @see Manager#newEventPipeline(ExecutorService)
105     */
106    public ConnectionManager<C>
107            setExecutorService(ExecutorService executorService) {
108        this.executorService = executorService;
109        return this;
110    }
111
112    /**
113     * Returns the executor service.
114     *
115     * @return the executorService
116     */
117    public ExecutorService executorService() {
118        if (executorService == null) {
119            return Components.defaultExecutorService();
120        }
121        return executorService;
122    }
123
124    /**
125     * Closes all connections.
126     * 
127     * @param event the event
128     */
129    @Handler
130    public void onStop(Stop event) {
131        while (true) {
132            C connection;
133            synchronized (connections) {
134                var itr = connections.iterator();
135                if (!itr.hasNext()) {
136                    return;
137                }
138                connection = itr.next();
139            }
140            connection.close();
141        }
142    }
143
144    /**
145     * Closes the given connection.
146     *
147     * @param event the event
148     * @param connection the connection
149     */
150    @Handler
151    public void onClose(Close event, C connection) {
152        synchronized (this) {
153            if (connections.contains(connection)) {
154                connection.close();
155            }
156        }
157    }
158
159    /**
160     * The base class for the connections managed by this component.
161     */
162    public class Connection extends Subchannel.DefaultSubchannel {
163
164        private final EventPipeline downPipeline;
165
166        /**
167         * @param mainChannel
168         */
169        @SuppressWarnings("unchecked")
170        public Connection(Channel mainChannel) {
171            super(mainChannel);
172            synchronized (ConnectionManager.this) {
173                if (connections.isEmpty() && connectionsGenerate()) {
174                    registerAsGenerator();
175                }
176                connections.add((C) this);
177            }
178            if (executorService == null) {
179                downPipeline = newEventPipeline();
180            } else {
181                downPipeline = newEventPipeline(executorService);
182            }
183        }
184
185        /**
186         * Gets the down pipeline.
187         *
188         * @return the downPipeline
189         */
190        public EventPipeline downPipeline() {
191            return downPipeline;
192        }
193
194        /**
195         * Closes the connection. If the last connection is closed
196         * and the component is a generator (see 
197         * {@link ConnectionManager#connectionsGenerate()), the component
198         * is unregistered as generator.
199         */
200        public void close() {
201            synchronized (this) {
202                connections.remove(this);
203                if (connections.isEmpty() && connectionsGenerate()) {
204                    unregisterAsGenerator();
205                }
206            }
207        }
208
209    }
210}