001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.SocketAddress;
023import java.nio.ByteBuffer;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.SocketChannel;
026import java.util.ArrayDeque;
027import java.util.HashSet;
028import java.util.Queue;
029import java.util.Set;
030import java.util.concurrent.ExecutorService;
031
032import org.jgrapes.core.Channel;
033import org.jgrapes.core.Component;
034import org.jgrapes.core.Components;
035import org.jgrapes.core.EventPipeline;
036import org.jgrapes.core.Manager;
037import org.jgrapes.core.Subchannel;
038import org.jgrapes.core.annotation.Handler;
039import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
040import org.jgrapes.io.NioHandler;
041import org.jgrapes.io.events.Closed;
042import org.jgrapes.io.events.HalfClosed;
043import org.jgrapes.io.events.Input;
044import org.jgrapes.io.events.NioRegistration;
045import org.jgrapes.io.events.NioRegistration.Registration;
046import org.jgrapes.io.events.Output;
047import org.jgrapes.io.util.ManagedBuffer;
048import org.jgrapes.io.util.ManagedBufferPool;
049
050/**
051 * Provides a base class for the {@link TcpServer} and the {@link TcpConnector}.
052 */
053@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount",
054    "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals",
055    "PMD.ExcessiveClassLength" })
056public abstract class TcpConnectionManager extends Component {
057
058    private int bufferSize;
059    protected final Set<TcpChannelImpl> channels = new HashSet<>();
060    private ExecutorService executorService;
061
062    /**
063     * Creates a new server using the given channel.
064     * 
065     * @param componentChannel the component's channel
066     */
067    public TcpConnectionManager(Channel componentChannel) {
068        super(componentChannel);
069    }
070
071    /**
072     * Sets the buffer size for the send an receive buffers.
073     * If no size is set, the system defaults will be used.
074     * 
075     * @param bufferSize the size to use for the send and receive buffers
076     * @return the TCP connection manager for easy chaining
077     */
078    public TcpConnectionManager setBufferSize(int bufferSize) {
079        this.bufferSize = bufferSize;
080        return this;
081    }
082
083    /**
084     * Return the configured buffer size.
085     *
086     * @return the bufferSize
087     */
088    public int bufferSize() {
089        return bufferSize;
090    }
091
092    /**
093     * Sets an executor service to be used by the event pipelines
094     * that process the data from the network. Setting this
095     * to an executor service with a limited number of threads
096     * allows to control the maximum load from the network.
097     * 
098     * @param executorService the executorService to set
099     * @return the TCP connection manager for easy chaining
100     * @see Manager#newEventPipeline(ExecutorService)
101     */
102    public TcpConnectionManager
103            setExecutorService(ExecutorService executorService) {
104        this.executorService = executorService;
105        return this;
106    }
107
108    /**
109     * Returns the executor service.
110     *
111     * @return the executorService
112     */
113    public ExecutorService executorService() {
114        return executorService;
115    }
116
117    /**
118     * Writes the data passed in the event. 
119     * 
120     * The end of record flag is used to determine if a channel is 
121     * eligible for purging. If the flag is set and all output has 
122     * been processed, the channel is purgeable until input is 
123     * received or another output event causes the state to be 
124     * reevaluated. 
125     *
126     * @param event the event
127     * @param channel the channel
128     * @throws InterruptedException the interrupted exception
129     */
130    @Handler
131    public void onOutput(Output<ByteBuffer> event,
132            TcpChannelImpl channel) throws InterruptedException {
133        if (channels.contains(channel)) {
134            channel.write(event);
135        }
136    }
137
138    /**
139     * Removes the channel from the set of registered channels.
140     *
141     * @param channel the channel
142     * @return true, if channel was registered
143     */
144    protected boolean removeChannel(TcpChannelImpl channel) {
145        synchronized (channels) {
146            return channels.remove(channel);
147        }
148    }
149
150    /*
151     * (non-Javadoc)
152     * 
153     * @see java.lang.Object#toString()
154     */
155    @Override
156    public String toString() {
157        return Components.objectName(this);
158    }
159
160    /**
161     * The close state.
162     */
163    private enum ConnectionState {
164        OPEN, DELAYED_EVENT, DELAYED_REQUEST, HALF_CLOSED, CLOSED
165    }
166
167    /**
168     * The purgeable state.
169     */
170    private enum PurgeableState {
171        NO, PENDING, YES
172    }
173
174    /**
175     * The internal representation of a connection. 
176     */
177    protected class TcpChannelImpl
178            extends DefaultIOSubchannel implements NioHandler, TcpChannel {
179
180        private final SocketChannel nioChannel;
181        private final SocketAddress localAddress;
182        private final SocketAddress remoteAddress;
183        private EventPipeline downPipeline;
184        private final ManagedBufferPool<ManagedBuffer<ByteBuffer>,
185                ByteBuffer> readBuffers;
186        private Registration registration;
187        private int selectionKeys;
188        private final Queue<
189                ManagedBuffer<ByteBuffer>.ByteBufferView> pendingWrites
190                    = new ArrayDeque<>();
191        private ConnectionState connState = ConnectionState.OPEN;
192        private PurgeableState purgeable = PurgeableState.NO;
193        private long becamePurgeableAt;
194
195        /**
196         * @param nioChannel the channel
197         * @throws IOException if an I/O error occured
198         */
199        public TcpChannelImpl(SocketChannel nioChannel) throws IOException {
200            super(channel(), newEventPipeline());
201            this.nioChannel = nioChannel;
202            // Copy, because they are only available while channel is open.
203            localAddress = nioChannel.getLocalAddress();
204            remoteAddress = nioChannel.getRemoteAddress();
205            if (executorService == null) {
206                downPipeline = newEventPipeline();
207            } else {
208                downPipeline = newEventPipeline(executorService);
209            }
210            String channelName
211                = Components.objectName(TcpConnectionManager.this)
212                    + "." + Components.objectName(this);
213
214            // Prepare write buffers
215            int writeBufferSize = bufferSize == 0
216                ? nioChannel.socket().getSendBufferSize()
217                : bufferSize;
218            setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new,
219                () -> {
220                    return ByteBuffer.allocate(writeBufferSize);
221                }, 2)
222                    .setName(channelName + ".upstream.buffers"));
223
224            // Prepare read buffers
225            int readBufferSize = bufferSize == 0
226                ? nioChannel.socket().getReceiveBufferSize()
227                : bufferSize;
228            readBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
229                () -> {
230                    return ByteBuffer.allocate(readBufferSize);
231                }, 2)
232                    .setName(channelName + ".downstream.buffers");
233
234            // Register with dispatcher
235            nioChannel.configureBlocking(false);
236            TcpConnectionManager.this.fire(
237                new NioRegistration(this, nioChannel, 0,
238                    TcpConnectionManager.this),
239                Channel.BROADCAST);
240        }
241
242        /**
243         * Gets the nio channel.
244         *
245         * @return the nioChannel
246         */
247        public SocketChannel nioChannel() {
248            return nioChannel;
249        }
250
251        @Override
252        public SocketAddress localAddress() {
253            return localAddress;
254        }
255
256        @Override
257        public SocketAddress remoteAddress() {
258            return remoteAddress;
259        }
260
261        /**
262         * Gets the read buffers.
263         *
264         * @return the readBuffers
265         */
266        public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
267                readBuffers() {
268            return readBuffers;
269        }
270
271        /**
272         * Gets the down pipeline.
273         *
274         * @return the downPipeline
275         */
276        public EventPipeline downPipeline() {
277            return downPipeline;
278        }
279
280        /**
281         * Invoked when registration has completed.
282         * 
283         * @param event the completed event
284         * @throws InterruptedException if the execution was interrupted
285         * @throws IOException if an I/O error occurred
286         */
287        public void registrationComplete(NioRegistration event)
288                throws InterruptedException, IOException {
289            registration = event.get();
290            selectionKeys |= SelectionKey.OP_READ;
291            registration.updateInterested(selectionKeys);
292        }
293
294        /**
295         * Checks if is purgeable.
296         *
297         * @return true, if is purgeable
298         */
299        public boolean isPurgeable() {
300            return purgeable == PurgeableState.YES;
301        }
302
303        /**
304         * Gets the the time when the connection became purgeable.
305         *
306         * @return the time
307         */
308        public long purgeableSince() {
309            return becamePurgeableAt;
310        }
311
312        /**
313         * Write the data on this channel.
314         * 
315         * @param event the event
316         */
317        public void write(Output<ByteBuffer> event)
318                throws InterruptedException {
319            synchronized (pendingWrites) {
320                if (!nioChannel.isOpen()) {
321                    return;
322                }
323                ManagedBuffer<ByteBuffer>.ByteBufferView reader
324                    = event.buffer().newByteBufferView();
325                if (!pendingWrites.isEmpty()) {
326                    reader.managedBuffer().lockBuffer();
327                    purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
328                        : PurgeableState.NO;
329                    pendingWrites.add(reader);
330                    return;
331                }
332                try {
333                    nioChannel.write(reader.get());
334                } catch (IOException e) {
335                    forceClose(e);
336                    return;
337                }
338                if (!reader.get().hasRemaining()) {
339                    if (event.isEndOfRecord()) {
340                        becamePurgeableAt = System.currentTimeMillis();
341                        purgeable = PurgeableState.YES;
342                    } else {
343                        purgeable = PurgeableState.NO;
344                    }
345                    return;
346                }
347                reader.managedBuffer().lockBuffer();
348                purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
349                    : PurgeableState.NO;
350                pendingWrites.add(reader);
351                selectionKeys |= SelectionKey.OP_WRITE;
352                registration.updateInterested(selectionKeys);
353            }
354        }
355
356        @Override
357        public void handleOps(int ops) throws InterruptedException {
358            if ((ops & SelectionKey.OP_READ) != 0) {
359                handleReadOp();
360            }
361            if ((ops & SelectionKey.OP_WRITE) != 0) {
362                handleWriteOp();
363            }
364        }
365
366        /**
367         * Gets a buffer from the pool and reads available data into it.
368         * Sends the result as event. 
369         * 
370         * @throws InterruptedException
371         * @throws IOException
372         */
373        @SuppressWarnings("PMD.EmptyCatchBlock")
374        private void handleReadOp() throws InterruptedException {
375            ManagedBuffer<ByteBuffer> buffer;
376            buffer = readBuffers.acquire();
377            try {
378                int bytes = buffer.fillFromChannel(nioChannel);
379                if (bytes == 0) {
380                    buffer.unlockBuffer();
381                    return;
382                }
383                if (bytes > 0) {
384                    purgeable = PurgeableState.NO;
385                    downPipeline.fire(Input.fromSink(buffer, false), this);
386                    return;
387                }
388            } catch (IOException e) {
389                // Buffer already unlocked by fillFromChannel
390                forceClose(e);
391                return;
392            }
393            // EOF (-1) from other end
394            buffer.unlockBuffer();
395            synchronized (nioChannel) {
396                if (connState == ConnectionState.HALF_CLOSED) {
397                    // Other end confirms our close, complete close
398                    try {
399                        nioChannel.close();
400                    } catch (IOException e) {
401                        // Ignored for close
402                    }
403                    connState = ConnectionState.CLOSED;
404                    downPipeline.fire(new Closed(), this);
405                    return;
406                }
407            }
408            // Other end initiates close
409            selectionKeys &= ~SelectionKey.OP_READ;
410            registration.updateInterested(selectionKeys);
411            downPipeline.submit("SendHalfClosed", () -> {
412                try {
413                    // Inform downstream and wait until everything has settled.
414                    newEventPipeline().fire(new HalfClosed(), this).get();
415                    // All settled.
416                    removeChannel(this);
417                    downPipeline.fire(new Closed(), this);
418                    // Close our end if everything has been written.
419                    synchronized (pendingWrites) {
420                        synchronized (nioChannel) {
421                            try {
422                                if (!pendingWrites.isEmpty()) {
423                                    // Pending writes, delay close
424                                    connState = ConnectionState.DELAYED_REQUEST;
425                                    return;
426                                }
427                                // Nothing left to do, close
428                                nioChannel.close();
429                                connState = ConnectionState.CLOSED;
430                            } catch (IOException e) {
431                                // Ignored for close
432                            }
433                        }
434                    }
435                } catch (InterruptedException e) {
436                    // Nothing to do about this
437                }
438            });
439        }
440
441        /**
442         * Checks if there is still data to be written. This may be
443         * a left over in an incompletely written buffer or a complete
444         * pending buffer. 
445         * 
446         * @throws IOException
447         * @throws InterruptedException 
448         */
449        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
450            "PMD.EmptyCatchBlock",
451            "PMD.AvoidBranchingStatementAsLastInLoop" })
452        private void handleWriteOp() throws InterruptedException {
453            while (true) {
454                ManagedBuffer<ByteBuffer>.ByteBufferView head = null;
455                synchronized (pendingWrites) {
456                    if (pendingWrites.isEmpty()) {
457                        // Nothing left to write, stop getting ops
458                        selectionKeys &= ~SelectionKey.OP_WRITE;
459                        registration.updateInterested(selectionKeys);
460                        // Was the connection closed while we were writing?
461                        if (connState == ConnectionState.DELAYED_REQUEST
462                            || connState == ConnectionState.DELAYED_EVENT) {
463                            synchronized (nioChannel) {
464                                try {
465                                    if (connState == ConnectionState.DELAYED_REQUEST) {
466                                        // Delayed close request from other end,
467                                        // complete
468                                        nioChannel.close();
469                                        connState = ConnectionState.CLOSED;
470                                    }
471                                    if (connState == ConnectionState.DELAYED_EVENT) {
472                                        // Delayed close from this end, initiate
473                                        nioChannel.shutdownOutput();
474                                        connState = ConnectionState.HALF_CLOSED;
475                                    }
476                                } catch (IOException e) {
477                                    // Ignored for close
478                                }
479                            }
480                        } else {
481                            if (purgeable == PurgeableState.PENDING) {
482                                purgeable = PurgeableState.YES;
483                            }
484                        }
485                        break; // Nothing left to do
486                    }
487                    head = pendingWrites.peek();
488                    if (!head.get().hasRemaining()) {
489                        // Nothing left in head buffer, try next
490                        head.managedBuffer().unlockBuffer();
491                        pendingWrites.remove();
492                        continue;
493                    }
494                }
495                try {
496                    nioChannel.write(head.get()); // write...
497                } catch (IOException e) {
498                    forceClose(e);
499                    return;
500                }
501                break; // ... and wait for next op
502            }
503        }
504
505        /**
506         * Closes this channel.
507         * 
508         * @throws IOException if an error occurs
509         * @throws InterruptedException if the execution was interrupted 
510         */
511        public void close() throws IOException, InterruptedException {
512            if (!removeChannel(this)) {
513                return;
514            }
515            synchronized (pendingWrites) {
516                if (!pendingWrites.isEmpty()) {
517                    // Pending writes, delay close until done
518                    connState = ConnectionState.DELAYED_EVENT;
519                    return;
520                }
521                // Nothing left to do, proceed
522                synchronized (nioChannel) {
523                    if (nioChannel.isOpen()) {
524                        // Initiate close, must be confirmed by other end
525                        nioChannel.shutdownOutput();
526                        connState = ConnectionState.HALF_CLOSED;
527                    }
528                }
529            }
530        }
531
532        @SuppressWarnings("PMD.EmptyCatchBlock")
533        private void forceClose(Throwable error) throws InterruptedException {
534            try {
535                nioChannel.close();
536                connState = ConnectionState.CLOSED;
537            } catch (IOException e) {
538                // Closed only to make sure, any failure can be ignored.
539            }
540            if (removeChannel(this)) {
541                Closed evt = new Closed(error);
542                downPipeline.fire(evt, this);
543            }
544        }
545
546        /*
547         * (non-Javadoc)
548         * 
549         * @see org.jgrapes.io.IOSubchannel.DefaultSubchannel#toString()
550         */
551        @SuppressWarnings("PMD.CommentRequired")
552        public String toString() {
553            return Subchannel.toString(this);
554        }
555    }
556
557}