001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2023 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.net;
020
021import java.io.IOException;
022import java.net.SocketAddress;
023import java.nio.ByteBuffer;
024import java.nio.channels.SelectionKey;
025import java.nio.channels.SocketChannel;
026import java.util.ArrayDeque;
027import java.util.HashSet;
028import java.util.Optional;
029import java.util.Queue;
030import java.util.Set;
031import java.util.concurrent.ExecutorService;
032import org.jgrapes.core.Channel;
033import org.jgrapes.core.Component;
034import org.jgrapes.core.Components;
035import org.jgrapes.core.EventPipeline;
036import org.jgrapes.core.Manager;
037import org.jgrapes.core.Subchannel;
038import org.jgrapes.core.annotation.Handler;
039import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel;
040import org.jgrapes.io.NioHandler;
041import org.jgrapes.io.events.Closed;
042import org.jgrapes.io.events.HalfClosed;
043import org.jgrapes.io.events.Input;
044import org.jgrapes.io.events.NioRegistration;
045import org.jgrapes.io.events.NioRegistration.Registration;
046import org.jgrapes.io.events.OpenSocketConnection;
047import org.jgrapes.io.events.Output;
048import org.jgrapes.io.util.ManagedBuffer;
049import org.jgrapes.io.util.ManagedBufferPool;
050
051/**
052 * Provides a base class for the {@link SocketServer} and the 
053 * {@link SocketConnector}.
054 */
055@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount",
056    "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals",
057    "PMD.ExcessiveClassLength" })
058public abstract class SocketConnectionManager extends Component {
059
060    private int bufferSize = 32_768;
061    protected final Set<SocketChannelImpl> channels = new HashSet<>();
062    private ExecutorService executorService;
063
064    /**
065     * Creates a new server using the given channel.
066     * 
067     * @param componentChannel the component's channel
068     */
069    public SocketConnectionManager(Channel componentChannel) {
070        super(componentChannel);
071    }
072
073    /**
074     * Sets the buffer size for the send an receive buffers.
075     * If no size is set, a default value of 32768 will be used.
076     * 
077     * @param bufferSize the size to use for the send and receive buffers
078     * @return the socket connection manager for easy chaining
079     */
080    public SocketConnectionManager setBufferSize(int bufferSize) {
081        this.bufferSize = bufferSize;
082        return this;
083    }
084
085    /**
086     * Return the configured buffer size.
087     *
088     * @return the bufferSize
089     */
090    public int bufferSize() {
091        return bufferSize;
092    }
093
094    /**
095     * Sets an executor service to be used by the event pipelines
096     * that process the data from the network. Setting this
097     * to an executor service with a limited number of threads
098     * allows to control the maximum load from the network.
099     * 
100     * @param executorService the executorService to set
101     * @return the socket connection manager for easy chaining
102     * @see Manager#newEventPipeline(ExecutorService)
103     */
104    public SocketConnectionManager
105            setExecutorService(ExecutorService executorService) {
106        this.executorService = executorService;
107        return this;
108    }
109
110    /**
111     * Returns the executor service.
112     *
113     * @return the executorService
114     */
115    public ExecutorService executorService() {
116        return executorService;
117    }
118
119    /**
120     * Writes the data passed in the event. 
121     * 
122     * The end of record flag is used to determine if a channel is 
123     * eligible for purging. If the flag is set and all output has 
124     * been processed, the channel is purgeable until input is 
125     * received or another output event causes the state to be 
126     * reevaluated. 
127     *
128     * @param event the event
129     * @param channel the channel
130     * @throws InterruptedException the interrupted exception
131     */
132    @Handler
133    public void onOutput(Output<ByteBuffer> event,
134            SocketChannelImpl channel) throws InterruptedException {
135        if (channels.contains(channel)) {
136            channel.write(event);
137        }
138    }
139
140    /**
141     * Removes the channel from the set of registered channels.
142     *
143     * @param channel the channel
144     * @return true, if channel was registered
145     */
146    protected boolean removeChannel(SocketChannelImpl channel) {
147        synchronized (channels) {
148            return channels.remove(channel);
149        }
150    }
151
152    /*
153     * (non-Javadoc)
154     * 
155     * @see java.lang.Object#toString()
156     */
157    @Override
158    public String toString() {
159        return Components.objectName(this);
160    }
161
162    /**
163     * The close state.
164     */
165    private enum ConnectionState {
166        OPEN, DELAYED_EVENT, DELAYED_REQUEST, HALF_CLOSED, CLOSED
167    }
168
169    /**
170     * The purgeable state.
171     */
172    private enum PurgeableState {
173        NO, PENDING, YES
174    }
175
176    /**
177     * The internal representation of a connection. 
178     */
179    /**
180     * 
181     */
182    @SuppressWarnings("PMD.GodClass")
183    protected class SocketChannelImpl
184            extends DefaultIOSubchannel implements NioHandler, SocketIOChannel {
185
186        private final OpenSocketConnection openEvent;
187        private final SocketChannel nioChannel;
188        private final SocketAddress localAddress;
189        private final SocketAddress remoteAddress;
190        private final EventPipeline downPipeline;
191        private final ManagedBufferPool<ManagedBuffer<ByteBuffer>,
192                ByteBuffer> readBuffers;
193        private Registration registration;
194        private int selectionKeys;
195        private final Queue<
196                ManagedBuffer<ByteBuffer>.ByteBufferView> pendingWrites
197                    = new ArrayDeque<>();
198        private ConnectionState connState = ConnectionState.OPEN;
199        private PurgeableState purgeable = PurgeableState.NO;
200        private long becamePurgeableAt;
201
202        /**
203         * @param nioChannel the channel
204         * @throws IOException if an I/O error occurred
205         */
206        public SocketChannelImpl(OpenSocketConnection openEvent,
207                SocketChannel nioChannel) throws IOException {
208            super(channel(), newEventPipeline());
209            this.openEvent = openEvent;
210            this.nioChannel = nioChannel;
211            // Copy, because they are only available while channel is open.
212            localAddress = nioChannel.getLocalAddress();
213            remoteAddress = nioChannel.getRemoteAddress();
214            if (executorService == null) {
215                downPipeline = newEventPipeline();
216            } else {
217                downPipeline = newEventPipeline(executorService);
218            }
219            String channelName
220                = Components.objectName(SocketConnectionManager.this)
221                    + "." + Components.objectName(this);
222
223            // Prepare write buffers
224            int writeBufferSize = bufferSize < 1500 ? 1500 : bufferSize;
225            setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new,
226                () -> {
227                    return ByteBuffer.allocate(writeBufferSize);
228                }, 2)
229                    .setName(channelName + ".upstream.buffers"));
230
231            // Prepare read buffers
232            int readBufferSize = bufferSize < 1500 ? 1500 : bufferSize;
233            readBuffers = new ManagedBufferPool<>(ManagedBuffer::new,
234                () -> {
235                    return ByteBuffer.allocate(readBufferSize);
236                }, 2)
237                    .setName(channelName + ".downstream.buffers");
238
239            // Register with dispatcher
240            nioChannel.configureBlocking(false);
241            SocketConnectionManager.this.fire(
242                new NioRegistration(this, nioChannel, 0,
243                    SocketConnectionManager.this),
244                Channel.BROADCAST);
245        }
246
247        /**
248         * Returns the event that caused this connection to be opened.
249         * 
250         * May be `null` if the channel was created in response to a
251         * client connecting to the server.
252         * 
253         * @return the event
254         */
255        public Optional<OpenSocketConnection> openEvent() {
256            return Optional.ofNullable(openEvent);
257        }
258
259        /**
260         * Gets the nio channel.
261         *
262         * @return the nioChannel
263         */
264        public SocketChannel nioChannel() {
265            return nioChannel;
266        }
267
268        @Override
269        public SocketAddress localAddress() {
270            return localAddress;
271        }
272
273        @Override
274        public SocketAddress remoteAddress() {
275            return remoteAddress;
276        }
277
278        /**
279         * Gets the read buffers.
280         *
281         * @return the readBuffers
282         */
283        public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
284                readBuffers() {
285            return readBuffers;
286        }
287
288        /**
289         * Gets the down pipeline.
290         *
291         * @return the downPipeline
292         */
293        public EventPipeline downPipeline() {
294            return downPipeline;
295        }
296
297        /**
298         * Invoked when registration has completed.
299         *
300         * @param registration the registration (result from the
301         * {@link NioRegistration} event)
302         */
303        public void registrationComplete(Registration registration) {
304            this.registration = registration;
305            selectionKeys |= SelectionKey.OP_READ;
306            registration.updateInterested(selectionKeys);
307        }
308
309        /**
310         * Checks if is purgeable.
311         *
312         * @return true, if is purgeable
313         */
314        public boolean isPurgeable() {
315            return purgeable == PurgeableState.YES;
316        }
317
318        /**
319         * Gets the the time when the connection became purgeable.
320         *
321         * @return the time
322         */
323        public long purgeableSince() {
324            return becamePurgeableAt;
325        }
326
327        /**
328         * Write the data on this channel.
329         * 
330         * @param event the event
331         */
332        public void write(Output<ByteBuffer> event)
333                throws InterruptedException {
334            synchronized (pendingWrites) {
335                if (!nioChannel.isOpen()) {
336                    return;
337                }
338                ManagedBuffer<ByteBuffer>.ByteBufferView reader
339                    = event.buffer().newByteBufferView();
340                if (!pendingWrites.isEmpty()) {
341                    reader.managedBuffer().lockBuffer();
342                    purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
343                        : PurgeableState.NO;
344                    pendingWrites.add(reader);
345                    return;
346                }
347                try {
348                    nioChannel.write(reader.get());
349                } catch (IOException e) {
350                    forceClose(e);
351                    return;
352                }
353                if (!reader.get().hasRemaining()) {
354                    if (event.isEndOfRecord()) {
355                        becamePurgeableAt = System.currentTimeMillis();
356                        purgeable = PurgeableState.YES;
357                    } else {
358                        purgeable = PurgeableState.NO;
359                    }
360                    return;
361                }
362                reader.managedBuffer().lockBuffer();
363                purgeable = event.isEndOfRecord() ? PurgeableState.PENDING
364                    : PurgeableState.NO;
365                pendingWrites.add(reader);
366                selectionKeys |= SelectionKey.OP_WRITE;
367                registration.updateInterested(selectionKeys);
368            }
369        }
370
371        @Override
372        public void handleOps(int ops) throws InterruptedException {
373            if ((ops & SelectionKey.OP_READ) != 0) {
374                handleReadOp();
375            }
376            if ((ops & SelectionKey.OP_WRITE) != 0) {
377                handleWriteOp();
378            }
379        }
380
381        /**
382         * Gets a buffer from the pool and reads available data into it.
383         * Sends the result as event. 
384         * 
385         * @throws InterruptedException
386         * @throws IOException
387         */
388        @SuppressWarnings("PMD.EmptyCatchBlock")
389        private void handleReadOp() throws InterruptedException {
390            ManagedBuffer<ByteBuffer> buffer;
391            buffer = readBuffers.acquire();
392            try {
393                int bytes = buffer.fillFromChannel(nioChannel);
394                if (bytes == 0) {
395                    buffer.unlockBuffer();
396                    return;
397                }
398                if (bytes > 0) {
399                    purgeable = PurgeableState.NO;
400                    downPipeline.fire(Input.fromSink(buffer, false), this);
401                    return;
402                }
403            } catch (IOException e) {
404                // Buffer already unlocked by fillFromChannel
405                forceClose(e);
406                return;
407            }
408            // EOF (-1) from other end
409            buffer.unlockBuffer();
410            synchronized (nioChannel) {
411                if (connState == ConnectionState.HALF_CLOSED) {
412                    // Other end confirms our close, complete close
413                    try {
414                        nioChannel.close();
415                    } catch (IOException e) {
416                        // Ignored for close
417                    }
418                    connState = ConnectionState.CLOSED;
419                    downPipeline.fire(new Closed<Void>(), this);
420                    return;
421                }
422            }
423            // Other end initiates close
424            selectionKeys &= ~SelectionKey.OP_READ;
425            registration.updateInterested(selectionKeys);
426            downPipeline.submit("SendHalfClosed", () -> {
427                try {
428                    // Inform downstream and wait until everything has settled.
429                    newEventPipeline().fire(new HalfClosed(), this).get();
430                    // All settled.
431                    removeChannel(this);
432                    downPipeline.fire(new Closed<Void>(), this);
433                    // Close our end if everything has been written.
434                    synchronized (pendingWrites) {
435                        synchronized (nioChannel) {
436                            try {
437                                if (!pendingWrites.isEmpty()) {
438                                    // Pending writes, delay close
439                                    connState = ConnectionState.DELAYED_REQUEST;
440                                    return;
441                                }
442                                // Nothing left to do, close
443                                nioChannel.close();
444                                connState = ConnectionState.CLOSED;
445                            } catch (IOException e) {
446                                // Ignored for close
447                            }
448                        }
449                    }
450                } catch (InterruptedException e) {
451                    // Nothing to do about this
452                }
453            });
454        }
455
456        /**
457         * Checks if there is still data to be written. This may be
458         * a left over in an incompletely written buffer or a complete
459         * pending buffer. 
460         * 
461         * @throws IOException
462         * @throws InterruptedException 
463         */
464        @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
465            "PMD.EmptyCatchBlock", "PMD.AvoidBranchingStatementAsLastInLoop",
466            "PMD.CognitiveComplexity" })
467        private void handleWriteOp() throws InterruptedException {
468            while (true) {
469                ManagedBuffer<ByteBuffer>.ByteBufferView head;
470                synchronized (pendingWrites) {
471                    if (pendingWrites.isEmpty()) {
472                        // Nothing left to write, stop getting ops
473                        selectionKeys &= ~SelectionKey.OP_WRITE;
474                        registration.updateInterested(selectionKeys);
475                        // Was the connection closed while we were writing?
476                        if (connState == ConnectionState.DELAYED_REQUEST
477                            || connState == ConnectionState.DELAYED_EVENT) {
478                            synchronized (nioChannel) {
479                                try {
480                                    if (connState == ConnectionState.DELAYED_REQUEST) {
481                                        // Delayed close request from other end,
482                                        // complete
483                                        nioChannel.close();
484                                        connState = ConnectionState.CLOSED;
485                                    }
486                                    if (connState == ConnectionState.DELAYED_EVENT) {
487                                        // Delayed close from this end, initiate
488                                        nioChannel.shutdownOutput();
489                                        connState = ConnectionState.HALF_CLOSED;
490                                    }
491                                } catch (IOException e) {
492                                    // Ignored for close
493                                }
494                            }
495                        } else {
496                            if (purgeable == PurgeableState.PENDING) {
497                                purgeable = PurgeableState.YES;
498                            }
499                        }
500                        break; // Nothing left to do
501                    }
502                    head = pendingWrites.peek();
503                    if (!head.get().hasRemaining()) {
504                        // Nothing left in head buffer, try next
505                        head.managedBuffer().unlockBuffer();
506                        pendingWrites.remove();
507                        continue;
508                    }
509                }
510                try {
511                    nioChannel.write(head.get()); // write...
512                } catch (IOException e) {
513                    forceClose(e);
514                    return;
515                }
516                break; // ... and wait for next op
517            }
518        }
519
520        /**
521         * Closes this channel.
522         * 
523         * @throws IOException if an error occurs
524         * @throws InterruptedException if the execution was interrupted 
525         */
526        public void close() throws IOException, InterruptedException {
527            if (!removeChannel(this)) {
528                return;
529            }
530            synchronized (pendingWrites) {
531                if (!pendingWrites.isEmpty()) {
532                    // Pending writes, delay close until done
533                    connState = ConnectionState.DELAYED_EVENT;
534                    return;
535                }
536                // Nothing left to do, proceed
537                synchronized (nioChannel) {
538                    if (nioChannel.isOpen()) {
539                        // Initiate close, must be confirmed by other end
540                        nioChannel.shutdownOutput();
541                        connState = ConnectionState.HALF_CLOSED;
542                    }
543                }
544            }
545        }
546
547        @SuppressWarnings("PMD.EmptyCatchBlock")
548        private void forceClose(Throwable error) throws InterruptedException {
549            try {
550                nioChannel.close();
551                connState = ConnectionState.CLOSED;
552            } catch (IOException e) {
553                // Closed only to make sure, any failure can be ignored.
554            }
555            if (removeChannel(this)) {
556                var evt = new Closed<Void>(error);
557                downPipeline.fire(evt, this);
558            }
559        }
560
561        /*
562         * (non-Javadoc)
563         * 
564         * @see org.jgrapes.io.IOSubchannel.DefaultSubchannel#toString()
565         */
566        @SuppressWarnings("PMD.CommentRequired")
567        public String toString() {
568            return Subchannel.toString(this);
569        }
570    }
571
572}