001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.SocketAddress;
023import java.nio.ByteBuffer;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.SocketChannel;
026import java.util.ArrayDeque;
027import java.util.HashSet;
028import java.util.Optional;
029import java.util.Queue;
030import java.util.Set;
031import java.util.concurrent.ExecutorService;
032import org.jgrapes.core.Channel;
033import org.jgrapes.core.Component;
034import org.jgrapes.core.Components;
035import org.jgrapes.core.EventPipeline;
036import org.jgrapes.core.Manager;
037import org.jgrapes.core.Subchannel;
038import org.jgrapes.core.annotation.Handler;
039import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
040import org.jgrapes.io.NioHandler;
041import org.jgrapes.io.events.Closed;
042import org.jgrapes.io.events.HalfClosed;
043import org.jgrapes.io.events.Input;
044import org.jgrapes.io.events.NioRegistration;
045import org.jgrapes.io.events.NioRegistration.Registration;
046import org.jgrapes.io.events.OpenSocketConnection;
047import org.jgrapes.io.events.Output;
048import org.jgrapes.io.util.ManagedBuffer;
049import org.jgrapes.io.util.ManagedBufferPool;
050
051/**
052 * Provides a base class for the {@link SocketServer} and the 
053 * {@link SocketConnector}.
054 */
055@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount",
056    "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals",
057    "PMD.ExcessiveClassLength" })
058public abstract class SocketConnectionManager extends Component {
059
060    private int bufferSize = 32_768;
061    protected final Set<SocketChannelImpl> channels = new HashSet<>();
062    private ExecutorService executorService;
063
064    /**
065     * Creates a new server using the given channel.
066     * 
067     * @param componentChannel the component's channel
068     */
069    public SocketConnectionManager(Channel componentChannel) {
070        super(componentChannel);
071    }
072
073    /**
074     * Sets the buffer size for the send an receive buffers.
075     * If no size is set, a default value of 32768 will be used.
076     * 
077     * @param bufferSize the size to use for the send and receive buffers
078     * @return the socket connection manager for easy chaining
079     */
080    public SocketConnectionManager setBufferSize(int bufferSize) {
081        this.bufferSize = bufferSize;
082        return this;
083    }
084
085    /**
086     * Return the configured buffer size.
087     *
088     * @return the bufferSize
089     */
090    public int bufferSize() {
091        return bufferSize;
092    }
093
094    /**
095     * Sets an executor service to be used by the event pipelines
096     * that process the data from the network. Setting this
097     * to an executor service with a limited number of threads
098     * allows to control the maximum load from the network.
099     * 
100     * @param executorService the executorService to set
101     * @return the socket connection manager for easy chaining
102     * @see Manager#newEventPipeline(ExecutorService)
103     */
104    public SocketConnectionManager
105            setExecutorService(ExecutorService executorService) {
106        this.executorService = executorService;
107        return this;
108    }
109
110    /**
111     * Returns the executor service.
112     *
113     * @return the executorService
114     */
115    public ExecutorService executorService() {
116        return executorService;
117    }
118
119    /**
120     * Writes the data passed in the event. 
121     * 
122     * The end of record flag is used to determine if a channel is 
123     * eligible for purging. If the flag is set and all output has 
124     * been processed, the channel is purgeable until input is 
125     * received or another output event causes the state to be 
126     * reevaluated. 
127     *
128     * @param event the event
129     * @param channel the channel
130     * @throws InterruptedException the interrupted exception
131     */
132    @Handler
133    public void onOutput(Output<ByteBuffer> event,
134            SocketChannelImpl channel) throws InterruptedException {
135        if (channels.contains(channel)) {
136            channel.write(event);
137        }
138    }
139
140    /**
141     * Removes the channel from the set of registered channels.
142     *
143     * @param channel the channel
144     * @return true, if channel was registered
145     */
146    protected boolean removeChannel(SocketChannelImpl channel) {
147        synchronized (channels) {
148            return channels.remove(channel);
149        }
150    }
151
152    /*
153     * (non-Javadoc)
154     * 
155     * @see java.lang.Object#toString()
156     */
157    @Override
158    public String toString() {
159        return Components.objectName(this);
160    }
161
162    /**
163     * The close state.
164     */
165    private enum ConnectionState {
166        OPEN, DELAYED_EVENT, DELAYED_REQUEST, HALF_CLOSED, CLOSED
167    }
168
169    /**
170     * The purgeable state.
171     */
172    @SuppressWarnings("PMD.ShortVariable")
173    private enum PurgeableState {
174        NO, PENDING, YES
175    }
176
177    /**
178     * The internal representation of a connection. 
179     */
180    /**
181     * 
182     */
183    @SuppressWarnings("PMD.GodClass")
184    protected class SocketChannelImpl
185            extends DefaultIOSubchannel implements NioHandler, SocketIOChannel {
186
187        private final OpenSocketConnection openEvent;
188        private final SocketChannel nioChannel;
189        private final SocketAddress localAddress;
190        private final SocketAddress remoteAddress;
191        private final EventPipeline downPipeline;
192        private final ManagedBufferPool<ManagedBuffer<ByteBuffer>,
193                ByteBuffer> readBuffers;
194        private Registration registration;
195        private int selectionKeys;
196        private final Queue<
197                ManagedBuffer<ByteBuffer>.ByteBufferView> pendingWrites
198                    = new ArrayDeque<>();
199        private ConnectionState connState = ConnectionState.OPEN;
200        private PurgeableState purgeable = PurgeableState.NO;
201        private long becamePurgeableAt;
202
203        /**
204         * @param nioChannel the channel
205         * @throws IOException if an I/O error occurred
206         */
207        public SocketChannelImpl(OpenSocketConnection openEvent,
208                SocketChannel nioChannel) throws IOException {
209            super(channel(), newEventPipeline());
210            this.openEvent = openEvent;
211            this.nioChannel = nioChannel;
212            // Copy, because they are only available while channel is open.
213            localAddress = nioChannel.getLocalAddress();
214            remoteAddress = nioChannel.getRemoteAddress();
215            if (executorService == null) {
216                downPipeline = newEventPipeline();
217            } else {
218                downPipeline = newEventPipeline(executorService);
219            }
220            String channelName
221                = Components.objectName(SocketConnectionManager.this)
222                    + "." + Components.objectName(this);
223
224            // Prepare write buffers
225            int writeBufferSize = bufferSize < 1500 ? 1500 : bufferSize;
226            setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new,
227                () -> {
228                    return ByteBuffer.allocate(writeBufferSize);
229                }, 2)
230                    .setName(channelName + ".upstream.buffers"));
231
232            // Prepare read buffers
233            int readBufferSize = bufferSize < 1500 ? 1500 : bufferSize;
234            readBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
235                () -> {
236                    return ByteBuffer.allocate(readBufferSize);
237                }, 2)
238                    .setName(channelName + ".downstream.buffers");
239
240            // Ready to use
241            channels.add(this);
242
243            // Register with dispatcher
244            nioChannel.configureBlocking(false);
245            SocketConnectionManager.this.fire(
246                new NioRegistration(this, nioChannel, 0,
247                    SocketConnectionManager.this),
248                Channel.BROADCAST);
249        }
250
251        /**
252         * Returns the event that caused this connection to be opened.
253         * 
254         * May be `null` if the channel was created in response to a
255         * client connecting to the server.
256         * 
257         * @return the event
258         */
259        public Optional<OpenSocketConnection> openEvent() {
260            return Optional.ofNullable(openEvent);
261        }
262
263        /**
264         * Gets the nio channel.
265         *
266         * @return the nioChannel
267         */
268        public SocketChannel nioChannel() {
269            return nioChannel;
270        }
271
272        @Override
273        public SocketAddress localAddress() {
274            return localAddress;
275        }
276
277        @Override
278        public SocketAddress remoteAddress() {
279            return remoteAddress;
280        }
281
282        /**
283         * Gets the read buffers.
284         *
285         * @return the readBuffers
286         */
287        public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
288                readBuffers() {
289            return readBuffers;
290        }
291
292        /**
293         * Gets the down pipeline.
294         *
295         * @return the downPipeline
296         */
297        public EventPipeline downPipeline() {
298            return downPipeline;
299        }
300
301        /**
302         * Invoked when registration has completed.
303         *
304         * @param registration the registration (result from the
305         * {@link NioRegistration} event)
306         */
307        public void registrationComplete(Registration registration) {
308            this.registration = registration;
309            selectionKeys |= SelectionKey.OP_READ;
310            registration.updateInterested(selectionKeys);
311        }
312
313        /**
314         * Checks if is purgeable.
315         *
316         * @return true, if is purgeable
317         */
318        public boolean isPurgeable() {
319            return purgeable == PurgeableState.YES;
320        }
321
322        /**
323         * Gets the the time when the connection became purgeable.
324         *
325         * @return the time
326         */
327        public long purgeableSince() {
328            return becamePurgeableAt;
329        }
330
331        /**
332         * Write the data on this channel.
333         * 
334         * @param event the event
335         */
336        public void write(Output<ByteBuffer> event)
337                throws InterruptedException {
338            synchronized (pendingWrites) {
339                if (!nioChannel.isOpen()) {
340                    return;
341                }
342                ManagedBuffer<ByteBuffer>.ByteBufferView reader
343                    = event.buffer().newByteBufferView();
344                if (!pendingWrites.isEmpty()) {
345                    reader.managedBuffer().lockBuffer();
346                    purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
347                        : PurgeableState.NO;
348                    pendingWrites.add(reader);
349                    return;
350                }
351                try {
352                    nioChannel.write(reader.get());
353                } catch (IOException e) {
354                    forceClose(e);
355                    return;
356                }
357                if (!reader.get().hasRemaining()) {
358                    if (event.isEndOfRecord()) {
359                        becamePurgeableAt = System.currentTimeMillis();
360                        purgeable = PurgeableState.YES;
361                    } else {
362                        purgeable = PurgeableState.NO;
363                    }
364                    return;
365                }
366                reader.managedBuffer().lockBuffer();
367                purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
368                    : PurgeableState.NO;
369                pendingWrites.add(reader);
370                selectionKeys |= SelectionKey.OP_WRITE;
371                registration.updateInterested(selectionKeys);
372            }
373        }
374
375        @Override
376        public void handleOps(int ops) throws InterruptedException {
377            if ((ops & SelectionKey.OP_READ) != 0) {
378                handleReadOp();
379            }
380            if ((ops & SelectionKey.OP_WRITE) != 0) {
381                handleWriteOp();
382            }
383        }
384
385        /**
386         * Gets a buffer from the pool and reads available data into it.
387         * Sends the result as event. 
388         * 
389         * @throws InterruptedException
390         * @throws IOException
391         */
392        @SuppressWarnings("PMD.EmptyCatchBlock")
393        private void handleReadOp() throws InterruptedException {
394            ManagedBuffer<ByteBuffer> buffer;
395            buffer = readBuffers.acquire();
396            try {
397                int bytes = buffer.fillFromChannel(nioChannel);
398                if (bytes == 0) {
399                    buffer.unlockBuffer();
400                    return;
401                }
402                if (bytes > 0) {
403                    purgeable = PurgeableState.NO;
404                    downPipeline.fire(Input.fromSink(buffer, false), this);
405                    return;
406                }
407            } catch (IOException e) {
408                // Buffer already unlocked by fillFromChannel
409                forceClose(e);
410                return;
411            }
412            // EOF (-1) from other end
413            buffer.unlockBuffer();
414            synchronized (nioChannel) {
415                if (connState == ConnectionState.HALF_CLOSED) {
416                    // Other end confirms our close, complete close
417                    try {
418                        nioChannel.close();
419                    } catch (IOException e) {
420                        // Ignored for close
421                    }
422                    connState = ConnectionState.CLOSED;
423                    downPipeline.fire(new Closed<Void>(), this);
424                    return;
425                }
426            }
427            // Other end initiates close
428            selectionKeys &= ~SelectionKey.OP_READ;
429            registration.updateInterested(selectionKeys);
430            downPipeline.submit("SendHalfClosed", () -> {
431                try {
432                    // Inform downstream and wait until everything has settled.
433                    newEventPipeline().fire(new HalfClosed(), this).get();
434                    // All settled.
435                    removeChannel(this);
436                    downPipeline.fire(new Closed<Void>(), this);
437                    // Close our end if everything has been written.
438                    synchronized (pendingWrites) {
439                        synchronized (nioChannel) {
440                            try {
441                                if (!pendingWrites.isEmpty()) {
442                                    // Pending writes, delay close
443                                    connState = ConnectionState.DELAYED_REQUEST;
444                                    return;
445                                }
446                                // Nothing left to do, close
447                                nioChannel.close();
448                                connState = ConnectionState.CLOSED;
449                            } catch (IOException e) {
450                                // Ignored for close
451                            }
452                        }
453                    }
454                } catch (InterruptedException e) {
455                    // Nothing to do about this
456                }
457            });
458        }
459
460        /**
461         * Checks if there is still data to be written. This may be
462         * a left over in an incompletely written buffer or a complete
463         * pending buffer. 
464         * 
465         * @throws IOException
466         * @throws InterruptedException 
467         */
468        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
469            "PMD.EmptyCatchBlock", "PMD.AvoidBranchingStatementAsLastInLoop",
470            "PMD.CognitiveComplexity" })
471        private void handleWriteOp() throws InterruptedException {
472            while (true) {
473                ManagedBuffer<ByteBuffer>.ByteBufferView head;
474                synchronized (pendingWrites) {
475                    if (pendingWrites.isEmpty()) {
476                        // Nothing left to write, stop getting ops
477                        selectionKeys &= ~SelectionKey.OP_WRITE;
478                        registration.updateInterested(selectionKeys);
479                        // Was the connection closed while we were writing?
480                        if (connState == ConnectionState.DELAYED_REQUEST
481                            || connState == ConnectionState.DELAYED_EVENT) {
482                            synchronized (nioChannel) {
483                                try {
484                                    if (connState == ConnectionState.DELAYED_REQUEST) {
485                                        // Delayed close request from other end,
486                                        // complete
487                                        nioChannel.close();
488                                        connState = ConnectionState.CLOSED;
489                                    }
490                                    if (connState == ConnectionState.DELAYED_EVENT) {
491                                        // Delayed close from this end, initiate
492                                        nioChannel.shutdownOutput();
493                                        connState = ConnectionState.HALF_CLOSED;
494                                    }
495                                } catch (IOException e) {
496                                    // Ignored for close
497                                }
498                            }
499                        } else {
500                            if (purgeable == PurgeableState.PENDING) {
501                                purgeable = PurgeableState.YES;
502                            }
503                        }
504                        break; // Nothing left to do
505                    }
506                    head = pendingWrites.peek();
507                    if (!head.get().hasRemaining()) {
508                        // Nothing left in head buffer, try next
509                        head.managedBuffer().unlockBuffer();
510                        pendingWrites.remove();
511                        continue;
512                    }
513                }
514                try {
515                    nioChannel.write(head.get()); // write...
516                } catch (IOException e) {
517                    forceClose(e);
518                    return;
519                }
520                break; // ... and wait for next op
521            }
522        }
523
524        /**
525         * Closes this channel.
526         * 
527         * @throws IOException if an error occurs
528         * @throws InterruptedException if the execution was interrupted 
529         */
530        public void close() throws IOException, InterruptedException {
531            if (!removeChannel(this)) {
532                return;
533            }
534            synchronized (pendingWrites) {
535                if (!pendingWrites.isEmpty()) {
536                    // Pending writes, delay close until done
537                    connState = ConnectionState.DELAYED_EVENT;
538                    return;
539                }
540                // Nothing left to do, proceed
541                synchronized (nioChannel) {
542                    if (nioChannel.isOpen()) {
543                        // Initiate close, must be confirmed by other end
544                        nioChannel.shutdownOutput();
545                        connState = ConnectionState.HALF_CLOSED;
546                    }
547                }
548            }
549        }
550
551        @SuppressWarnings("PMD.EmptyCatchBlock")
552        private void forceClose(Throwable error) throws InterruptedException {
553            try {
554                nioChannel.close();
555                connState = ConnectionState.CLOSED;
556            } catch (IOException e) {
557                // Closed only to make sure, any failure can be ignored.
558            }
559            if (removeChannel(this)) {
560                var evt = new Closed<Void>(error);
561                downPipeline.fire(evt, this);
562            }
563        }
564
565        /*
566         * (non-Javadoc)
567         * 
568         * @see org.jgrapes.io.IOSubchannel.DefaultSubchannel#toString()
569         */
570        @Override
571        @SuppressWarnings("PMD.CommentRequired")
572        public String toString() {
573            return Subchannel.toString(this);
574        }
575    }
576
577}