001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.SocketAddress;
023import java.nio.ByteBuffer;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.SocketChannel;
026import java.util.ArrayDeque;
027import java.util.HashSet;
028import java.util.Optional;
029import java.util.Queue;
030import java.util.Set;
031import java.util.concurrent.ExecutorService;
032import org.jgrapes.core.Channel;
033import org.jgrapes.core.Component;
034import org.jgrapes.core.Components;
035import org.jgrapes.core.EventPipeline;
036import org.jgrapes.core.Manager;
037import org.jgrapes.core.Subchannel;
038import org.jgrapes.core.annotation.Handler;
039import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
040import org.jgrapes.io.NioHandler;
041import org.jgrapes.io.events.Closed;
042import org.jgrapes.io.events.HalfClosed;
043import org.jgrapes.io.events.Input;
044import org.jgrapes.io.events.NioRegistration;
045import org.jgrapes.io.events.NioRegistration.Registration;
046import org.jgrapes.io.events.OpenTcpConnection;
047import org.jgrapes.io.events.Output;
048import org.jgrapes.io.util.ManagedBuffer;
049import org.jgrapes.io.util.ManagedBufferPool;
050
051/**
052 * Provides a base class for the {@link TcpServer} and the {@link TcpConnector}.
053 */
054@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount",
055    "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals",
056    "PMD.ExcessiveClassLength" })
057public abstract class TcpConnectionManager extends Component {
058
059    private int bufferSize = 32_768;
060    protected final Set<TcpChannelImpl> channels = new HashSet<>();
061    private ExecutorService executorService;
062
063    /**
064     * Creates a new server using the given channel.
065     * 
066     * @param componentChannel the component's channel
067     */
068    public TcpConnectionManager(Channel componentChannel) {
069        super(componentChannel);
070    }
071
072    /**
073     * Sets the buffer size for the send an receive buffers.
074     * If no size is set, a default value of 32768 will be used.
075     * 
076     * @param bufferSize the size to use for the send and receive buffers
077     * @return the TCP connection manager for easy chaining
078     */
079    public TcpConnectionManager setBufferSize(int bufferSize) {
080        this.bufferSize = bufferSize;
081        return this;
082    }
083
084    /**
085     * Return the configured buffer size.
086     *
087     * @return the bufferSize
088     */
089    public int bufferSize() {
090        return bufferSize;
091    }
092
093    /**
094     * Sets an executor service to be used by the event pipelines
095     * that process the data from the network. Setting this
096     * to an executor service with a limited number of threads
097     * allows to control the maximum load from the network.
098     * 
099     * @param executorService the executorService to set
100     * @return the TCP connection manager for easy chaining
101     * @see Manager#newEventPipeline(ExecutorService)
102     */
103    public TcpConnectionManager
104            setExecutorService(ExecutorService executorService) {
105        this.executorService = executorService;
106        return this;
107    }
108
109    /**
110     * Returns the executor service.
111     *
112     * @return the executorService
113     */
114    public ExecutorService executorService() {
115        return executorService;
116    }
117
118    /**
119     * Writes the data passed in the event. 
120     * 
121     * The end of record flag is used to determine if a channel is 
122     * eligible for purging. If the flag is set and all output has 
123     * been processed, the channel is purgeable until input is 
124     * received or another output event causes the state to be 
125     * reevaluated. 
126     *
127     * @param event the event
128     * @param channel the channel
129     * @throws InterruptedException the interrupted exception
130     */
131    @Handler
132    public void onOutput(Output<ByteBuffer> event,
133            TcpChannelImpl channel) throws InterruptedException {
134        if (channels.contains(channel)) {
135            channel.write(event);
136        }
137    }
138
139    /**
140     * Removes the channel from the set of registered channels.
141     *
142     * @param channel the channel
143     * @return true, if channel was registered
144     */
145    protected boolean removeChannel(TcpChannelImpl channel) {
146        synchronized (channels) {
147            return channels.remove(channel);
148        }
149    }
150
151    /*
152     * (non-Javadoc)
153     * 
154     * @see java.lang.Object#toString()
155     */
156    @Override
157    public String toString() {
158        return Components.objectName(this);
159    }
160
161    /**
162     * The close state.
163     */
164    private enum ConnectionState {
165        OPEN, DELAYED_EVENT, DELAYED_REQUEST, HALF_CLOSED, CLOSED
166    }
167
168    /**
169     * The purgeable state.
170     */
171    private enum PurgeableState {
172        NO, PENDING, YES
173    }
174
175    /**
176     * The internal representation of a connection. 
177     */
178    /**
179     * 
180     */
181    @SuppressWarnings("PMD.GodClass")
182    protected class TcpChannelImpl
183            extends DefaultIOSubchannel implements NioHandler, TcpChannel {
184
185        private final OpenTcpConnection openEvent;
186        private final SocketChannel nioChannel;
187        private final SocketAddress localAddress;
188        private final SocketAddress remoteAddress;
189        private final EventPipeline downPipeline;
190        private final ManagedBufferPool<ManagedBuffer<ByteBuffer>,
191                ByteBuffer> readBuffers;
192        private Registration registration;
193        private int selectionKeys;
194        private final Queue<
195                ManagedBuffer<ByteBuffer>.ByteBufferView> pendingWrites
196                    = new ArrayDeque<>();
197        private ConnectionState connState = ConnectionState.OPEN;
198        private PurgeableState purgeable = PurgeableState.NO;
199        private long becamePurgeableAt;
200
201        /**
202         * @param nioChannel the channel
203         * @throws IOException if an I/O error occured
204         */
205        public TcpChannelImpl(OpenTcpConnection openEvent,
206                SocketChannel nioChannel) throws IOException {
207            super(channel(), newEventPipeline());
208            this.openEvent = openEvent;
209            this.nioChannel = nioChannel;
210            // Copy, because they are only available while channel is open.
211            localAddress = nioChannel.getLocalAddress();
212            remoteAddress = nioChannel.getRemoteAddress();
213            if (executorService == null) {
214                downPipeline = newEventPipeline();
215            } else {
216                downPipeline = newEventPipeline(executorService);
217            }
218            String channelName
219                = Components.objectName(TcpConnectionManager.this)
220                    + "." + Components.objectName(this);
221
222            // Prepare write buffers
223            int writeBufferSize = bufferSize < 1500 ? 1500 : bufferSize;
224            setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new,
225                () -> {
226                    return ByteBuffer.allocate(writeBufferSize);
227                }, 2)
228                    .setName(channelName + ".upstream.buffers"));
229
230            // Prepare read buffers
231            int readBufferSize = bufferSize < 1500 ? 1500 : bufferSize;
232            readBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
233                () -> {
234                    return ByteBuffer.allocate(readBufferSize);
235                }, 2)
236                    .setName(channelName + ".downstream.buffers");
237
238            // Register with dispatcher
239            nioChannel.configureBlocking(false);
240            TcpConnectionManager.this.fire(
241                new NioRegistration(this, nioChannel, 0,
242                    TcpConnectionManager.this),
243                Channel.BROADCAST);
244        }
245
246        /**
247         * Returns the event that caused this connection to be opened.
248         * 
249         * May be `null` if the channel was created in response to a
250         * client connecting to the server.
251         * 
252         * @return the event
253         */
254        public Optional<OpenTcpConnection> openEvent() {
255            return Optional.ofNullable(openEvent);
256        }
257
258        /**
259         * Gets the nio channel.
260         *
261         * @return the nioChannel
262         */
263        public SocketChannel nioChannel() {
264            return nioChannel;
265        }
266
267        @Override
268        public SocketAddress localAddress() {
269            return localAddress;
270        }
271
272        @Override
273        public SocketAddress remoteAddress() {
274            return remoteAddress;
275        }
276
277        /**
278         * Gets the read buffers.
279         *
280         * @return the readBuffers
281         */
282        public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
283                readBuffers() {
284            return readBuffers;
285        }
286
287        /**
288         * Gets the down pipeline.
289         *
290         * @return the downPipeline
291         */
292        public EventPipeline downPipeline() {
293            return downPipeline;
294        }
295
296        /**
297         * Invoked when registration has completed.
298         *
299         * @param registration the registration (result from the
300         * {@link NioRegistration} event)
301         */
302        public void registrationComplete(Registration registration) {
303            this.registration = registration;
304            selectionKeys |= SelectionKey.OP_READ;
305            registration.updateInterested(selectionKeys);
306        }
307
308        /**
309         * Checks if is purgeable.
310         *
311         * @return true, if is purgeable
312         */
313        public boolean isPurgeable() {
314            return purgeable == PurgeableState.YES;
315        }
316
317        /**
318         * Gets the the time when the connection became purgeable.
319         *
320         * @return the time
321         */
322        public long purgeableSince() {
323            return becamePurgeableAt;
324        }
325
326        /**
327         * Write the data on this channel.
328         * 
329         * @param event the event
330         */
331        public void write(Output<ByteBuffer> event)
332                throws InterruptedException {
333            synchronized (pendingWrites) {
334                if (!nioChannel.isOpen()) {
335                    return;
336                }
337                ManagedBuffer<ByteBuffer>.ByteBufferView reader
338                    = event.buffer().newByteBufferView();
339                if (!pendingWrites.isEmpty()) {
340                    reader.managedBuffer().lockBuffer();
341                    purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
342                        : PurgeableState.NO;
343                    pendingWrites.add(reader);
344                    return;
345                }
346                try {
347                    nioChannel.write(reader.get());
348                } catch (IOException e) {
349                    forceClose(e);
350                    return;
351                }
352                if (!reader.get().hasRemaining()) {
353                    if (event.isEndOfRecord()) {
354                        becamePurgeableAt = System.currentTimeMillis();
355                        purgeable = PurgeableState.YES;
356                    } else {
357                        purgeable = PurgeableState.NO;
358                    }
359                    return;
360                }
361                reader.managedBuffer().lockBuffer();
362                purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
363                    : PurgeableState.NO;
364                pendingWrites.add(reader);
365                selectionKeys |= SelectionKey.OP_WRITE;
366                registration.updateInterested(selectionKeys);
367            }
368        }
369
370        @Override
371        public void handleOps(int ops) throws InterruptedException {
372            if ((ops & SelectionKey.OP_READ) != 0) {
373                handleReadOp();
374            }
375            if ((ops & SelectionKey.OP_WRITE) != 0) {
376                handleWriteOp();
377            }
378        }
379
380        /**
381         * Gets a buffer from the pool and reads available data into it.
382         * Sends the result as event. 
383         * 
384         * @throws InterruptedException
385         * @throws IOException
386         */
387        @SuppressWarnings("PMD.EmptyCatchBlock")
388        private void handleReadOp() throws InterruptedException {
389            ManagedBuffer<ByteBuffer> buffer;
390            buffer = readBuffers.acquire();
391            try {
392                int bytes = buffer.fillFromChannel(nioChannel);
393                if (bytes == 0) {
394                    buffer.unlockBuffer();
395                    return;
396                }
397                if (bytes > 0) {
398                    purgeable = PurgeableState.NO;
399                    downPipeline.fire(Input.fromSink(buffer, false), this);
400                    return;
401                }
402            } catch (IOException e) {
403                // Buffer already unlocked by fillFromChannel
404                forceClose(e);
405                return;
406            }
407            // EOF (-1) from other end
408            buffer.unlockBuffer();
409            synchronized (nioChannel) {
410                if (connState == ConnectionState.HALF_CLOSED) {
411                    // Other end confirms our close, complete close
412                    try {
413                        nioChannel.close();
414                    } catch (IOException e) {
415                        // Ignored for close
416                    }
417                    connState = ConnectionState.CLOSED;
418                    downPipeline.fire(new Closed<Void>(), this);
419                    return;
420                }
421            }
422            // Other end initiates close
423            selectionKeys &= ~SelectionKey.OP_READ;
424            registration.updateInterested(selectionKeys);
425            downPipeline.submit("SendHalfClosed", () -> {
426                try {
427                    // Inform downstream and wait until everything has settled.
428                    newEventPipeline().fire(new HalfClosed(), this).get();
429                    // All settled.
430                    removeChannel(this);
431                    downPipeline.fire(new Closed<Void>(), this);
432                    // Close our end if everything has been written.
433                    synchronized (pendingWrites) {
434                        synchronized (nioChannel) {
435                            try {
436                                if (!pendingWrites.isEmpty()) {
437                                    // Pending writes, delay close
438                                    connState = ConnectionState.DELAYED_REQUEST;
439                                    return;
440                                }
441                                // Nothing left to do, close
442                                nioChannel.close();
443                                connState = ConnectionState.CLOSED;
444                            } catch (IOException e) {
445                                // Ignored for close
446                            }
447                        }
448                    }
449                } catch (InterruptedException e) {
450                    // Nothing to do about this
451                }
452            });
453        }
454
455        /**
456         * Checks if there is still data to be written. This may be
457         * a left over in an incompletely written buffer or a complete
458         * pending buffer. 
459         * 
460         * @throws IOException
461         * @throws InterruptedException 
462         */
463        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
464            "PMD.EmptyCatchBlock", "PMD.AvoidBranchingStatementAsLastInLoop",
465            "PMD.CognitiveComplexity" })
466        private void handleWriteOp() throws InterruptedException {
467            while (true) {
468                ManagedBuffer<ByteBuffer>.ByteBufferView head;
469                synchronized (pendingWrites) {
470                    if (pendingWrites.isEmpty()) {
471                        // Nothing left to write, stop getting ops
472                        selectionKeys &= ~SelectionKey.OP_WRITE;
473                        registration.updateInterested(selectionKeys);
474                        // Was the connection closed while we were writing?
475                        if (connState == ConnectionState.DELAYED_REQUEST
476                            || connState == ConnectionState.DELAYED_EVENT) {
477                            synchronized (nioChannel) {
478                                try {
479                                    if (connState == ConnectionState.DELAYED_REQUEST) {
480                                        // Delayed close request from other end,
481                                        // complete
482                                        nioChannel.close();
483                                        connState = ConnectionState.CLOSED;
484                                    }
485                                    if (connState == ConnectionState.DELAYED_EVENT) {
486                                        // Delayed close from this end, initiate
487                                        nioChannel.shutdownOutput();
488                                        connState = ConnectionState.HALF_CLOSED;
489                                    }
490                                } catch (IOException e) {
491                                    // Ignored for close
492                                }
493                            }
494                        } else {
495                            if (purgeable == PurgeableState.PENDING) {
496                                purgeable = PurgeableState.YES;
497                            }
498                        }
499                        break; // Nothing left to do
500                    }
501                    head = pendingWrites.peek();
502                    if (!head.get().hasRemaining()) {
503                        // Nothing left in head buffer, try next
504                        head.managedBuffer().unlockBuffer();
505                        pendingWrites.remove();
506                        continue;
507                    }
508                }
509                try {
510                    nioChannel.write(head.get()); // write...
511                } catch (IOException e) {
512                    forceClose(e);
513                    return;
514                }
515                break; // ... and wait for next op
516            }
517        }
518
519        /**
520         * Closes this channel.
521         * 
522         * @throws IOException if an error occurs
523         * @throws InterruptedException if the execution was interrupted 
524         */
525        public void close() throws IOException, InterruptedException {
526            if (!removeChannel(this)) {
527                return;
528            }
529            synchronized (pendingWrites) {
530                if (!pendingWrites.isEmpty()) {
531                    // Pending writes, delay close until done
532                    connState = ConnectionState.DELAYED_EVENT;
533                    return;
534                }
535                // Nothing left to do, proceed
536                synchronized (nioChannel) {
537                    if (nioChannel.isOpen()) {
538                        // Initiate close, must be confirmed by other end
539                        nioChannel.shutdownOutput();
540                        connState = ConnectionState.HALF_CLOSED;
541                    }
542                }
543            }
544        }
545
546        @SuppressWarnings("PMD.EmptyCatchBlock")
547        private void forceClose(Throwable error) throws InterruptedException {
548            try {
549                nioChannel.close();
550                connState = ConnectionState.CLOSED;
551            } catch (IOException e) {
552                // Closed only to make sure, any failure can be ignored.
553            }
554            if (removeChannel(this)) {
555                var evt = new Closed<Void>(error);
556                downPipeline.fire(evt, this);
557            }
558        }
559
560        /*
561         * (non-Javadoc)
562         * 
563         * @see org.jgrapes.io.IOSubchannel.DefaultSubchannel#toString()
564         */
565        @SuppressWarnings("PMD.CommentRequired")
566        public String toString() {
567            return Subchannel.toString(this);
568        }
569    }
570
571}