001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.net; 020 021import java.io.IOException; 022import java.net.SocketAddress; 023import java.nio.ByteBuffer; 024import java.nio.channels.SelectionKey; 025import java.nio.channels.SocketChannel; 026import java.util.ArrayDeque; 027import java.util.HashSet; 028import java.util.Optional; 029import java.util.Queue; 030import java.util.Set; 031import java.util.concurrent.ExecutorService; 032import org.jgrapes.core.Channel; 033import org.jgrapes.core.Component; 034import org.jgrapes.core.Components; 035import org.jgrapes.core.EventPipeline; 036import org.jgrapes.core.Manager; 037import org.jgrapes.core.Subchannel; 038import org.jgrapes.core.annotation.Handler; 039import org.jgrapes.io.IOSubchannel.DefaultIOSubchannel; 040import org.jgrapes.io.NioHandler; 041import org.jgrapes.io.events.Closed; 042import org.jgrapes.io.events.HalfClosed; 043import org.jgrapes.io.events.Input; 044import org.jgrapes.io.events.NioRegistration; 045import org.jgrapes.io.events.NioRegistration.Registration; 046import org.jgrapes.io.events.OpenTcpConnection; 047import org.jgrapes.io.events.Output; 048import org.jgrapes.io.util.ManagedBuffer; 049import org.jgrapes.io.util.ManagedBufferPool; 050 051/** 052 * Provides a base class for the {@link TcpServer} and the {@link TcpConnector}. 053 */ 054@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount", 055 "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals", 056 "PMD.ExcessiveClassLength" }) 057public abstract class TcpConnectionManager extends Component { 058 059 private int bufferSize = 32_768; 060 protected final Set<TcpChannelImpl> channels = new HashSet<>(); 061 private ExecutorService executorService; 062 063 /** 064 * Creates a new server using the given channel. 065 * 066 * @param componentChannel the component's channel 067 */ 068 public TcpConnectionManager(Channel componentChannel) { 069 super(componentChannel); 070 } 071 072 /** 073 * Sets the buffer size for the send an receive buffers. 074 * If no size is set, a default value of 32768 will be used. 075 * 076 * @param bufferSize the size to use for the send and receive buffers 077 * @return the TCP connection manager for easy chaining 078 */ 079 public TcpConnectionManager setBufferSize(int bufferSize) { 080 this.bufferSize = bufferSize; 081 return this; 082 } 083 084 /** 085 * Return the configured buffer size. 086 * 087 * @return the bufferSize 088 */ 089 public int bufferSize() { 090 return bufferSize; 091 } 092 093 /** 094 * Sets an executor service to be used by the event pipelines 095 * that process the data from the network. Setting this 096 * to an executor service with a limited number of threads 097 * allows to control the maximum load from the network. 098 * 099 * @param executorService the executorService to set 100 * @return the TCP connection manager for easy chaining 101 * @see Manager#newEventPipeline(ExecutorService) 102 */ 103 public TcpConnectionManager 104 setExecutorService(ExecutorService executorService) { 105 this.executorService = executorService; 106 return this; 107 } 108 109 /** 110 * Returns the executor service. 111 * 112 * @return the executorService 113 */ 114 public ExecutorService executorService() { 115 return executorService; 116 } 117 118 /** 119 * Writes the data passed in the event. 120 * 121 * The end of record flag is used to determine if a channel is 122 * eligible for purging. If the flag is set and all output has 123 * been processed, the channel is purgeable until input is 124 * received or another output event causes the state to be 125 * reevaluated. 126 * 127 * @param event the event 128 * @param channel the channel 129 * @throws InterruptedException the interrupted exception 130 */ 131 @Handler 132 public void onOutput(Output<ByteBuffer> event, 133 TcpChannelImpl channel) throws InterruptedException { 134 if (channels.contains(channel)) { 135 channel.write(event); 136 } 137 } 138 139 /** 140 * Removes the channel from the set of registered channels. 141 * 142 * @param channel the channel 143 * @return true, if channel was registered 144 */ 145 protected boolean removeChannel(TcpChannelImpl channel) { 146 synchronized (channels) { 147 return channels.remove(channel); 148 } 149 } 150 151 /* 152 * (non-Javadoc) 153 * 154 * @see java.lang.Object#toString() 155 */ 156 @Override 157 public String toString() { 158 return Components.objectName(this); 159 } 160 161 /** 162 * The close state. 163 */ 164 private enum ConnectionState { 165 OPEN, DELAYED_EVENT, DELAYED_REQUEST, HALF_CLOSED, CLOSED 166 } 167 168 /** 169 * The purgeable state. 170 */ 171 private enum PurgeableState { 172 NO, PENDING, YES 173 } 174 175 /** 176 * The internal representation of a connection. 177 */ 178 /** 179 * 180 */ 181 @SuppressWarnings("PMD.GodClass") 182 protected class TcpChannelImpl 183 extends DefaultIOSubchannel implements NioHandler, TcpChannel { 184 185 private final OpenTcpConnection openEvent; 186 private final SocketChannel nioChannel; 187 private final SocketAddress localAddress; 188 private final SocketAddress remoteAddress; 189 private final EventPipeline downPipeline; 190 private final ManagedBufferPool<ManagedBuffer<ByteBuffer>, 191 ByteBuffer> readBuffers; 192 private Registration registration; 193 private int selectionKeys; 194 private final Queue< 195 ManagedBuffer<ByteBuffer>.ByteBufferView> pendingWrites 196 = new ArrayDeque<>(); 197 private ConnectionState connState = ConnectionState.OPEN; 198 private PurgeableState purgeable = PurgeableState.NO; 199 private long becamePurgeableAt; 200 201 /** 202 * @param nioChannel the channel 203 * @throws IOException if an I/O error occured 204 */ 205 public TcpChannelImpl(OpenTcpConnection openEvent, 206 SocketChannel nioChannel) throws IOException { 207 super(channel(), newEventPipeline()); 208 this.openEvent = openEvent; 209 this.nioChannel = nioChannel; 210 // Copy, because they are only available while channel is open. 211 localAddress = nioChannel.getLocalAddress(); 212 remoteAddress = nioChannel.getRemoteAddress(); 213 if (executorService == null) { 214 downPipeline = newEventPipeline(); 215 } else { 216 downPipeline = newEventPipeline(executorService); 217 } 218 String channelName 219 = Components.objectName(TcpConnectionManager.this) 220 + "." + Components.objectName(this); 221 222 // Prepare write buffers 223 int writeBufferSize = bufferSize < 1500 ? 1500 : bufferSize; 224 setByteBufferPool(new ManagedBufferPool<>(ManagedBuffer::new, 225 () -> { 226 return ByteBuffer.allocate(writeBufferSize); 227 }, 2) 228 .setName(channelName + ".upstream.buffers")); 229 230 // Prepare read buffers 231 int readBufferSize = bufferSize < 1500 ? 1500 : bufferSize; 232 readBuffers = new ManagedBufferPool<>(ManagedBuffer::new, 233 () -> { 234 return ByteBuffer.allocate(readBufferSize); 235 }, 2) 236 .setName(channelName + ".downstream.buffers"); 237 238 // Register with dispatcher 239 nioChannel.configureBlocking(false); 240 TcpConnectionManager.this.fire( 241 new NioRegistration(this, nioChannel, 0, 242 TcpConnectionManager.this), 243 Channel.BROADCAST); 244 } 245 246 /** 247 * Returns the event that caused this connection to be opened. 248 * 249 * May be `null` if the channel was created in response to a 250 * client connecting to the server. 251 * 252 * @return the event 253 */ 254 public Optional<OpenTcpConnection> openEvent() { 255 return Optional.ofNullable(openEvent); 256 } 257 258 /** 259 * Gets the nio channel. 260 * 261 * @return the nioChannel 262 */ 263 public SocketChannel nioChannel() { 264 return nioChannel; 265 } 266 267 @Override 268 public SocketAddress localAddress() { 269 return localAddress; 270 } 271 272 @Override 273 public SocketAddress remoteAddress() { 274 return remoteAddress; 275 } 276 277 /** 278 * Gets the read buffers. 279 * 280 * @return the readBuffers 281 */ 282 public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer> 283 readBuffers() { 284 return readBuffers; 285 } 286 287 /** 288 * Gets the down pipeline. 289 * 290 * @return the downPipeline 291 */ 292 public EventPipeline downPipeline() { 293 return downPipeline; 294 } 295 296 /** 297 * Invoked when registration has completed. 298 * 299 * @param registration the registration (result from the 300 * {@link NioRegistration} event) 301 */ 302 public void registrationComplete(Registration registration) { 303 this.registration = registration; 304 selectionKeys |= SelectionKey.OP_READ; 305 registration.updateInterested(selectionKeys); 306 } 307 308 /** 309 * Checks if is purgeable. 310 * 311 * @return true, if is purgeable 312 */ 313 public boolean isPurgeable() { 314 return purgeable == PurgeableState.YES; 315 } 316 317 /** 318 * Gets the the time when the connection became purgeable. 319 * 320 * @return the time 321 */ 322 public long purgeableSince() { 323 return becamePurgeableAt; 324 } 325 326 /** 327 * Write the data on this channel. 328 * 329 * @param event the event 330 */ 331 public void write(Output<ByteBuffer> event) 332 throws InterruptedException { 333 synchronized (pendingWrites) { 334 if (!nioChannel.isOpen()) { 335 return; 336 } 337 ManagedBuffer<ByteBuffer>.ByteBufferView reader 338 = event.buffer().newByteBufferView(); 339 if (!pendingWrites.isEmpty()) { 340 reader.managedBuffer().lockBuffer(); 341 purgeable = event.isEndOfRecord() ? PurgeableState.PENDING 342 : PurgeableState.NO; 343 pendingWrites.add(reader); 344 return; 345 } 346 try { 347 nioChannel.write(reader.get()); 348 } catch (IOException e) { 349 forceClose(e); 350 return; 351 } 352 if (!reader.get().hasRemaining()) { 353 if (event.isEndOfRecord()) { 354 becamePurgeableAt = System.currentTimeMillis(); 355 purgeable = PurgeableState.YES; 356 } else { 357 purgeable = PurgeableState.NO; 358 } 359 return; 360 } 361 reader.managedBuffer().lockBuffer(); 362 purgeable = event.isEndOfRecord() ? PurgeableState.PENDING 363 : PurgeableState.NO; 364 pendingWrites.add(reader); 365 selectionKeys |= SelectionKey.OP_WRITE; 366 registration.updateInterested(selectionKeys); 367 } 368 } 369 370 @Override 371 public void handleOps(int ops) throws InterruptedException { 372 if ((ops & SelectionKey.OP_READ) != 0) { 373 handleReadOp(); 374 } 375 if ((ops & SelectionKey.OP_WRITE) != 0) { 376 handleWriteOp(); 377 } 378 } 379 380 /** 381 * Gets a buffer from the pool and reads available data into it. 382 * Sends the result as event. 383 * 384 * @throws InterruptedException 385 * @throws IOException 386 */ 387 @SuppressWarnings("PMD.EmptyCatchBlock") 388 private void handleReadOp() throws InterruptedException { 389 ManagedBuffer<ByteBuffer> buffer; 390 buffer = readBuffers.acquire(); 391 try { 392 int bytes = buffer.fillFromChannel(nioChannel); 393 if (bytes == 0) { 394 buffer.unlockBuffer(); 395 return; 396 } 397 if (bytes > 0) { 398 purgeable = PurgeableState.NO; 399 downPipeline.fire(Input.fromSink(buffer, false), this); 400 return; 401 } 402 } catch (IOException e) { 403 // Buffer already unlocked by fillFromChannel 404 forceClose(e); 405 return; 406 } 407 // EOF (-1) from other end 408 buffer.unlockBuffer(); 409 synchronized (nioChannel) { 410 if (connState == ConnectionState.HALF_CLOSED) { 411 // Other end confirms our close, complete close 412 try { 413 nioChannel.close(); 414 } catch (IOException e) { 415 // Ignored for close 416 } 417 connState = ConnectionState.CLOSED; 418 downPipeline.fire(new Closed<Void>(), this); 419 return; 420 } 421 } 422 // Other end initiates close 423 selectionKeys &= ~SelectionKey.OP_READ; 424 registration.updateInterested(selectionKeys); 425 downPipeline.submit("SendHalfClosed", () -> { 426 try { 427 // Inform downstream and wait until everything has settled. 428 newEventPipeline().fire(new HalfClosed(), this).get(); 429 // All settled. 430 removeChannel(this); 431 downPipeline.fire(new Closed<Void>(), this); 432 // Close our end if everything has been written. 433 synchronized (pendingWrites) { 434 synchronized (nioChannel) { 435 try { 436 if (!pendingWrites.isEmpty()) { 437 // Pending writes, delay close 438 connState = ConnectionState.DELAYED_REQUEST; 439 return; 440 } 441 // Nothing left to do, close 442 nioChannel.close(); 443 connState = ConnectionState.CLOSED; 444 } catch (IOException e) { 445 // Ignored for close 446 } 447 } 448 } 449 } catch (InterruptedException e) { 450 // Nothing to do about this 451 } 452 }); 453 } 454 455 /** 456 * Checks if there is still data to be written. This may be 457 * a left over in an incompletely written buffer or a complete 458 * pending buffer. 459 * 460 * @throws IOException 461 * @throws InterruptedException 462 */ 463 @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", 464 "PMD.EmptyCatchBlock", "PMD.AvoidBranchingStatementAsLastInLoop", 465 "PMD.CognitiveComplexity" }) 466 private void handleWriteOp() throws InterruptedException { 467 while (true) { 468 ManagedBuffer<ByteBuffer>.ByteBufferView head; 469 synchronized (pendingWrites) { 470 if (pendingWrites.isEmpty()) { 471 // Nothing left to write, stop getting ops 472 selectionKeys &= ~SelectionKey.OP_WRITE; 473 registration.updateInterested(selectionKeys); 474 // Was the connection closed while we were writing? 475 if (connState == ConnectionState.DELAYED_REQUEST 476 || connState == ConnectionState.DELAYED_EVENT) { 477 synchronized (nioChannel) { 478 try { 479 if (connState == ConnectionState.DELAYED_REQUEST) { 480 // Delayed close request from other end, 481 // complete 482 nioChannel.close(); 483 connState = ConnectionState.CLOSED; 484 } 485 if (connState == ConnectionState.DELAYED_EVENT) { 486 // Delayed close from this end, initiate 487 nioChannel.shutdownOutput(); 488 connState = ConnectionState.HALF_CLOSED; 489 } 490 } catch (IOException e) { 491 // Ignored for close 492 } 493 } 494 } else { 495 if (purgeable == PurgeableState.PENDING) { 496 purgeable = PurgeableState.YES; 497 } 498 } 499 break; // Nothing left to do 500 } 501 head = pendingWrites.peek(); 502 if (!head.get().hasRemaining()) { 503 // Nothing left in head buffer, try next 504 head.managedBuffer().unlockBuffer(); 505 pendingWrites.remove(); 506 continue; 507 } 508 } 509 try { 510 nioChannel.write(head.get()); // write... 511 } catch (IOException e) { 512 forceClose(e); 513 return; 514 } 515 break; // ... and wait for next op 516 } 517 } 518 519 /** 520 * Closes this channel. 521 * 522 * @throws IOException if an error occurs 523 * @throws InterruptedException if the execution was interrupted 524 */ 525 public void close() throws IOException, InterruptedException { 526 if (!removeChannel(this)) { 527 return; 528 } 529 synchronized (pendingWrites) { 530 if (!pendingWrites.isEmpty()) { 531 // Pending writes, delay close until done 532 connState = ConnectionState.DELAYED_EVENT; 533 return; 534 } 535 // Nothing left to do, proceed 536 synchronized (nioChannel) { 537 if (nioChannel.isOpen()) { 538 // Initiate close, must be confirmed by other end 539 nioChannel.shutdownOutput(); 540 connState = ConnectionState.HALF_CLOSED; 541 } 542 } 543 } 544 } 545 546 @SuppressWarnings("PMD.EmptyCatchBlock") 547 private void forceClose(Throwable error) throws InterruptedException { 548 try { 549 nioChannel.close(); 550 connState = ConnectionState.CLOSED; 551 } catch (IOException e) { 552 // Closed only to make sure, any failure can be ignored. 553 } 554 if (removeChannel(this)) { 555 var evt = new Closed<Void>(error); 556 downPipeline.fire(evt, this); 557 } 558 } 559 560 /* 561 * (non-Javadoc) 562 * 563 * @see org.jgrapes.io.IOSubchannel.DefaultSubchannel#toString() 564 */ 565 @SuppressWarnings("PMD.CommentRequired") 566 public String toString() { 567 return Subchannel.toString(this); 568 } 569 } 570 571}