001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2023 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.net; 020 021import java.io.IOException; 022import java.net.SocketAddress; 023import java.nio.ByteBuffer; 024import java.nio.channels.SelectionKey; 025import java.nio.channels.SocketChannel; 026import java.util.ArrayDeque; 027import java.util.HashSet; 028import java.util.Optional; 029import java.util.Queue; 030import java.util.Set; 031import java.util.concurrent.ExecutorService; 032import org.jgrapes.core.Channel; 033import org.jgrapes.core.Component; 034import org.jgrapes.core.Components; 035import org.jgrapes.core.EventPipeline; 036import org.jgrapes.core.Manager; 037import org.jgrapes.core.Subchannel; 038import org.jgrapes.core.annotation.Handler; 039import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel; 040import org.jgrapes.io.NioHandler; 041import org.jgrapes.io.events.Closed; 042import org.jgrapes.io.events.HalfClosed; 043import org.jgrapes.io.events.Input; 044import org.jgrapes.io.events.NioRegistration; 045import org.jgrapes.io.events.NioRegistration.Registration; 046import org.jgrapes.io.events.OpenSocketConnection; 047import org.jgrapes.io.events.Output; 048import org.jgrapes.io.util.ManagedBuffer; 049import org.jgrapes.io.util.ManagedBufferPool; 050 051/** 052 * Provides a base class for the {@link SocketServer} and the 053 * {@link SocketConnector}. 054 */ 055@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount", 056 "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals", 057 "PMD.ExcessiveClassLength" }) 058public abstract class SocketConnectionManager extends Component { 059 060 private int bufferSize = 32_768; 061 protected final Set<SocketChannelImpl> channels = new HashSet<>(); 062 private ExecutorService executorService; 063 064 /** 065 * Creates a new server using the given channel. 066 * 067 * @param componentChannel the component's channel 068 */ 069 public SocketConnectionManager(Channel componentChannel) { 070 super(componentChannel); 071 } 072 073 /** 074 * Sets the buffer size for the send an receive buffers. 075 * If no size is set, a default value of 32768 will be used. 076 * 077 * @param bufferSize the size to use for the send and receive buffers 078 * @return the socket connection manager for easy chaining 079 */ 080 public SocketConnectionManager setBufferSize(int bufferSize) { 081 this.bufferSize = bufferSize; 082 return this; 083 } 084 085 /** 086 * Return the configured buffer size. 087 * 088 * @return the bufferSize 089 */ 090 public int bufferSize() { 091 return bufferSize; 092 } 093 094 /** 095 * Sets an executor service to be used by the event pipelines 096 * that process the data from the network. Setting this 097 * to an executor service with a limited number of threads 098 * allows to control the maximum load from the network. 099 * 100 * @param executorService the executorService to set 101 * @return the socket connection manager for easy chaining 102 * @see Manager#newEventPipeline(ExecutorService) 103 */ 104 public SocketConnectionManager 105 setExecutorService(ExecutorService executorService) { 106 this.executorService = executorService; 107 return this; 108 } 109 110 /** 111 * Returns the executor service. 112 * 113 * @return the executorService 114 */ 115 public ExecutorService executorService() { 116 return executorService; 117 } 118 119 /** 120 * Writes the data passed in the event. 121 * 122 * The end of record flag is used to determine if a channel is 123 * eligible for purging. If the flag is set and all output has 124 * been processed, the channel is purgeable until input is 125 * received or another output event causes the state to be 126 * reevaluated. 127 * 128 * @param event the event 129 * @param channel the channel 130 * @throws InterruptedException the interrupted exception 131 */ 132 @Handler 133 public void onOutput(Output<ByteBuffer> event, 134 SocketChannelImpl channel) throws InterruptedException { 135 if (channels.contains(channel)) { 136 channel.write(event); 137 } 138 } 139 140 /** 141 * Removes the channel from the set of registered channels. 142 * 143 * @param channel the channel 144 * @return true, if channel was registered 145 */ 146 protected boolean removeChannel(SocketChannelImpl channel) { 147 synchronized (channels) { 148 return channels.remove(channel); 149 } 150 } 151 152 /* 153 * (non-Javadoc) 154 * 155 * @see java.lang.Object#toString() 156 */ 157 @Override 158 public String toString() { 159 return Components.objectName(this); 160 } 161 162 /** 163 * The close state. 164 */ 165 private enum ConnectionState { 166 OPEN, DELAYED_EVENT, DELAYED_REQUEST, HALF_CLOSED, CLOSED 167 } 168 169 /** 170 * The purgeable state. 171 */ 172 private enum PurgeableState { 173 NO, PENDING, YES 174 } 175 176 /** 177 * The internal representation of a connection. 178 */ 179 /** 180 * 181 */ 182 @SuppressWarnings("PMD.GodClass") 183 protected class SocketChannelImpl 184 extends DefaultIOSubchannel implements NioHandler, SocketIOChannel { 185 186 private final OpenSocketConnection openEvent; 187 private final SocketChannel nioChannel; 188 private final SocketAddress localAddress; 189 private final SocketAddress remoteAddress; 190 private final EventPipeline downPipeline; 191 private final ManagedBufferPool<ManagedBuffer<ByteBuffer>, 192 ByteBuffer> readBuffers; 193 private Registration registration; 194 private int selectionKeys; 195 private final Queue< 196 ManagedBuffer<ByteBuffer>.ByteBufferView> pendingWrites 197 = new ArrayDeque<>(); 198 private ConnectionState connState = ConnectionState.OPEN; 199 private PurgeableState purgeable = PurgeableState.NO; 200 private long becamePurgeableAt; 201 202 /** 203 * @param nioChannel the channel 204 * @throws IOException if an I/O error occurred 205 */ 206 public SocketChannelImpl(OpenSocketConnection openEvent, 207 SocketChannel nioChannel) throws IOException { 208 super(channel(), newEventPipeline()); 209 this.openEvent = openEvent; 210 this.nioChannel = nioChannel; 211 // Copy, because they are only available while channel is open. 212 localAddress = nioChannel.getLocalAddress(); 213 remoteAddress = nioChannel.getRemoteAddress(); 214 if (executorService == null) { 215 downPipeline = newEventPipeline(); 216 } else { 217 downPipeline = newEventPipeline(executorService); 218 } 219 String channelName 220 = Components.objectName(SocketConnectionManager.this) 221 + "." + Components.objectName(this); 222 223 // Prepare write buffers 224 int writeBufferSize = bufferSize < 1500 ? 1500 : bufferSize; 225 setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new, 226 () -> { 227 return ByteBuffer.allocate(writeBufferSize); 228 }, 2) 229 .setName(channelName + ".upstream.buffers")); 230 231 // Prepare read buffers 232 int readBufferSize = bufferSize < 1500 ? 1500 : bufferSize; 233 readBuffers = new ManagedBufferPool<>(ManagedBuffer::new, 234 () -> { 235 return ByteBuffer.allocate(readBufferSize); 236 }, 2) 237 .setName(channelName + ".downstream.buffers"); 238 239 // Register with dispatcher 240 nioChannel.configureBlocking(false); 241 SocketConnectionManager.this.fire( 242 new NioRegistration(this, nioChannel, 0, 243 SocketConnectionManager.this), 244 Channel.BROADCAST); 245 } 246 247 /** 248 * Returns the event that caused this connection to be opened. 249 * 250 * May be `null` if the channel was created in response to a 251 * client connecting to the server. 252 * 253 * @return the event 254 */ 255 public Optional<OpenSocketConnection> openEvent() { 256 return Optional.ofNullable(openEvent); 257 } 258 259 /** 260 * Gets the nio channel. 261 * 262 * @return the nioChannel 263 */ 264 public SocketChannel nioChannel() { 265 return nioChannel; 266 } 267 268 @Override 269 public SocketAddress localAddress() { 270 return localAddress; 271 } 272 273 @Override 274 public SocketAddress remoteAddress() { 275 return remoteAddress; 276 } 277 278 /** 279 * Gets the read buffers. 280 * 281 * @return the readBuffers 282 */ 283 public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> 284 readBuffers() { 285 return readBuffers; 286 } 287 288 /** 289 * Gets the down pipeline. 290 * 291 * @return the downPipeline 292 */ 293 public EventPipeline downPipeline() { 294 return downPipeline; 295 } 296 297 /** 298 * Invoked when registration has completed. 299 * 300 * @param registration the registration (result from the 301 * {@link NioRegistration} event) 302 */ 303 public void registrationComplete(Registration registration) { 304 this.registration = registration; 305 selectionKeys |= SelectionKey.OP_READ; 306 registration.updateInterested(selectionKeys); 307 } 308 309 /** 310 * Checks if is purgeable. 311 * 312 * @return true, if is purgeable 313 */ 314 public boolean isPurgeable() { 315 return purgeable == PurgeableState.YES; 316 } 317 318 /** 319 * Gets the the time when the connection became purgeable. 320 * 321 * @return the time 322 */ 323 public long purgeableSince() { 324 return becamePurgeableAt; 325 } 326 327 /** 328 * Write the data on this channel. 329 * 330 * @param event the event 331 */ 332 public void write(Output<ByteBuffer> event) 333 throws InterruptedException { 334 synchronized (pendingWrites) { 335 if (!nioChannel.isOpen()) { 336 return; 337 } 338 ManagedBuffer<ByteBuffer>.ByteBufferView reader 339 = event.buffer().newByteBufferView(); 340 if (!pendingWrites.isEmpty()) { 341 reader.managedBuffer().lockBuffer(); 342 purgeable = event.isEndOfRecord() ? PurgeableState.PENDING 343 : PurgeableState.NO; 344 pendingWrites.add(reader); 345 return; 346 } 347 try { 348 nioChannel.write(reader.get()); 349 } catch (IOException e) { 350 forceClose(e); 351 return; 352 } 353 if (!reader.get().hasRemaining()) { 354 if (event.isEndOfRecord()) { 355 becamePurgeableAt = System.currentTimeMillis(); 356 purgeable = PurgeableState.YES; 357 } else { 358 purgeable = PurgeableState.NO; 359 } 360 return; 361 } 362 reader.managedBuffer().lockBuffer(); 363 purgeable = event.isEndOfRecord() ? PurgeableState.PENDING 364 : PurgeableState.NO; 365 pendingWrites.add(reader); 366 selectionKeys |= SelectionKey.OP_WRITE; 367 registration.updateInterested(selectionKeys); 368 } 369 } 370 371 @Override 372 public void handleOps(int ops) throws InterruptedException { 373 if ((ops & SelectionKey.OP_READ) != 0) { 374 handleReadOp(); 375 } 376 if ((ops & SelectionKey.OP_WRITE) != 0) { 377 handleWriteOp(); 378 } 379 } 380 381 /** 382 * Gets a buffer from the pool and reads available data into it. 383 * Sends the result as event. 384 * 385 * @throws InterruptedException 386 * @throws IOException 387 */ 388 @SuppressWarnings("PMD.EmptyCatchBlock") 389 private void handleReadOp() throws InterruptedException { 390 ManagedBuffer<ByteBuffer> buffer; 391 buffer = readBuffers.acquire(); 392 try { 393 int bytes = buffer.fillFromChannel(nioChannel); 394 if (bytes == 0) { 395 buffer.unlockBuffer(); 396 return; 397 } 398 if (bytes > 0) { 399 purgeable = PurgeableState.NO; 400 downPipeline.fire(Input.fromSink(buffer, false), this); 401 return; 402 } 403 } catch (IOException e) { 404 // Buffer already unlocked by fillFromChannel 405 forceClose(e); 406 return; 407 } 408 // EOF (-1) from other end 409 buffer.unlockBuffer(); 410 synchronized (nioChannel) { 411 if (connState == ConnectionState.HALF_CLOSED) { 412 // Other end confirms our close, complete close 413 try { 414 nioChannel.close(); 415 } catch (IOException e) { 416 // Ignored for close 417 } 418 connState = ConnectionState.CLOSED; 419 downPipeline.fire(new Closed<Void>(), this); 420 return; 421 } 422 } 423 // Other end initiates close 424 selectionKeys &= ~SelectionKey.OP_READ; 425 registration.updateInterested(selectionKeys); 426 downPipeline.submit("SendHalfClosed", () -> { 427 try { 428 // Inform downstream and wait until everything has settled. 429 newEventPipeline().fire(new HalfClosed(), this).get(); 430 // All settled. 431 removeChannel(this); 432 downPipeline.fire(new Closed<Void>(), this); 433 // Close our end if everything has been written. 434 synchronized (pendingWrites) { 435 synchronized (nioChannel) { 436 try { 437 if (!pendingWrites.isEmpty()) { 438 // Pending writes, delay close 439 connState = ConnectionState.DELAYED_REQUEST; 440 return; 441 } 442 // Nothing left to do, close 443 nioChannel.close(); 444 connState = ConnectionState.CLOSED; 445 } catch (IOException e) { 446 // Ignored for close 447 } 448 } 449 } 450 } catch (InterruptedException e) { 451 // Nothing to do about this 452 } 453 }); 454 } 455 456 /** 457 * Checks if there is still data to be written. This may be 458 * a left over in an incompletely written buffer or a complete 459 * pending buffer. 460 * 461 * @throws IOException 462 * @throws InterruptedException 463 */ 464 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 465 "PMD.EmptyCatchBlock", "PMD.AvoidBranchingStatementAsLastInLoop", 466 "PMD.CognitiveComplexity" }) 467 private void handleWriteOp() throws InterruptedException { 468 while (true) { 469 ManagedBuffer<ByteBuffer>.ByteBufferView head; 470 synchronized (pendingWrites) { 471 if (pendingWrites.isEmpty()) { 472 // Nothing left to write, stop getting ops 473 selectionKeys &= ~SelectionKey.OP_WRITE; 474 registration.updateInterested(selectionKeys); 475 // Was the connection closed while we were writing? 476 if (connState == ConnectionState.DELAYED_REQUEST 477 || connState == ConnectionState.DELAYED_EVENT) { 478 synchronized (nioChannel) { 479 try { 480 if (connState == ConnectionState.DELAYED_REQUEST) { 481 // Delayed close request from other end, 482 // complete 483 nioChannel.close(); 484 connState = ConnectionState.CLOSED; 485 } 486 if (connState == ConnectionState.DELAYED_EVENT) { 487 // Delayed close from this end, initiate 488 nioChannel.shutdownOutput(); 489 connState = ConnectionState.HALF_CLOSED; 490 } 491 } catch (IOException e) { 492 // Ignored for close 493 } 494 } 495 } else { 496 if (purgeable == PurgeableState.PENDING) { 497 purgeable = PurgeableState.YES; 498 } 499 } 500 break; // Nothing left to do 501 } 502 head = pendingWrites.peek(); 503 if (!head.get().hasRemaining()) { 504 // Nothing left in head buffer, try next 505 head.managedBuffer().unlockBuffer(); 506 pendingWrites.remove(); 507 continue; 508 } 509 } 510 try { 511 nioChannel.write(head.get()); // write... 512 } catch (IOException e) { 513 forceClose(e); 514 return; 515 } 516 break; // ... and wait for next op 517 } 518 } 519 520 /** 521 * Closes this channel. 522 * 523 * @throws IOException if an error occurs 524 * @throws InterruptedException if the execution was interrupted 525 */ 526 public void close() throws IOException, InterruptedException { 527 if (!removeChannel(this)) { 528 return; 529 } 530 synchronized (pendingWrites) { 531 if (!pendingWrites.isEmpty()) { 532 // Pending writes, delay close until done 533 connState = ConnectionState.DELAYED_EVENT; 534 return; 535 } 536 // Nothing left to do, proceed 537 synchronized (nioChannel) { 538 if (nioChannel.isOpen()) { 539 // Initiate close, must be confirmed by other end 540 nioChannel.shutdownOutput(); 541 connState = ConnectionState.HALF_CLOSED; 542 } 543 } 544 } 545 } 546 547 @SuppressWarnings("PMD.EmptyCatchBlock") 548 private void forceClose(Throwable error) throws InterruptedException { 549 try { 550 nioChannel.close(); 551 connState = ConnectionState.CLOSED; 552 } catch (IOException e) { 553 // Closed only to make sure, any failure can be ignored. 554 } 555 if (removeChannel(this)) { 556 var evt = new Closed<Void>(error); 557 downPipeline.fire(evt, this); 558 } 559 } 560 561 /* 562 * (non-Javadoc) 563 * 564 * @see org.jgrapes.io.IOSubchannel.DefaultSubchannel#toString() 565 */ 566 @SuppressWarnings("PMD.CommentRequired") 567 public String toString() { 568 return Subchannel.toString(this); 569 } 570 } 571 572}