001/* 002 * JGrapes Event Driven Framework 003 * Copyright (C) 2016-2018 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Affero General Public License as published by 007 * the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jgrapes.net; 020 021import java.io.IOException; 022import java.lang.management.ManagementFactory; 023import java.lang.ref.WeakReference; 024import java.net.InetSocketAddress; 025import java.net.SocketAddress; 026import java.net.StandardProtocolFamily; 027import java.net.UnixDomainSocketAddress; 028import java.nio.channels.SelectionKey; 029import java.nio.channels.ServerSocketChannel; 030import java.nio.channels.SocketChannel; 031import java.util.ArrayList; 032import java.util.Collections; 033import java.util.Comparator; 034import java.util.HashSet; 035import java.util.IntSummaryStatistics; 036import java.util.List; 037import java.util.Optional; 038import java.util.Set; 039import java.util.SortedMap; 040import java.util.TreeMap; 041import java.util.stream.Collectors; 042import javax.management.InstanceAlreadyExistsException; 043import javax.management.MBeanRegistrationException; 044import javax.management.MBeanServer; 045import javax.management.MalformedObjectNameException; 046import javax.management.NotCompliantMBeanException; 047import javax.management.ObjectName; 048import org.jgrapes.core.Channel; 049import org.jgrapes.core.Components; 050import org.jgrapes.core.Event; 051import org.jgrapes.core.Manager; 052import org.jgrapes.core.Self; 053import org.jgrapes.core.Subchannel; 054import org.jgrapes.core.annotation.Handler; 055import org.jgrapes.core.events.Error; 056import org.jgrapes.core.events.Start; 057import org.jgrapes.core.events.Stop; 058import org.jgrapes.io.NioHandler; 059import org.jgrapes.io.events.Close; 060import org.jgrapes.io.events.Closed; 061import org.jgrapes.io.events.IOError; 062import org.jgrapes.io.events.Input; 063import org.jgrapes.io.events.NioRegistration; 064import org.jgrapes.io.events.NioRegistration.Registration; 065import org.jgrapes.io.events.Opening; 066import org.jgrapes.io.events.Output; 067import org.jgrapes.io.events.Purge; 068import org.jgrapes.io.util.AvailabilityListener; 069import org.jgrapes.io.util.LinkedIOSubchannel; 070import org.jgrapes.io.util.PermitsPool; 071import org.jgrapes.net.events.Accepted; 072import org.jgrapes.net.events.Ready; 073import org.jgrapes.util.events.ConfigurationUpdate; 074 075/** 076 * Provides a socket server. The server binds to the given address. If the 077 * address is {@code null}, address and port are automatically assigned. 078 * The port may be overwritten by a configuration event 079 * (see {@link #onConfigurationUpdate(ConfigurationUpdate)}). 080 * 081 * For each established connection, the server creates a new 082 * {@link LinkedIOSubchannel}. The servers basic operation is to 083 * fire {@link Input} (and {@link Closed}) events on the 084 * appropriate subchannel in response to data received from the 085 * network and to handle {@link Output} (and {@link Close}) events 086 * on the subchannel and forward the information to the network 087 * connection. 088 * 089 * The server supports limiting the number of concurrent connections 090 * with a {@link PermitsPool}. If such a pool is set as connection 091 * limiter (see {@link #setConnectionLimiter(PermitsPool)}), a 092 * permit is acquired for each new connection attempt. If no more 093 * permits are available, the server sends a {@link Purge} event on 094 * each channel that is purgeable for at least the time span 095 * set with {@link #setMinimalPurgeableTime(long)}. Purgeability 096 * is derived from the end of record flag of {@link Output} events 097 * (see {@link #onOutput(Output, SocketChannelImpl)}. When using this feature, 098 * make sure that connections are either short lived or the application 099 * level components support the {@link Purge} event. Else, it may become 100 * impossible to establish new connections. 101 */ 102@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.ExcessivePublicCount", 103 "PMD.NcssCount", "PMD.EmptyCatchBlock", "PMD.AvoidDuplicateLiterals", 104 "PMD.ExcessiveClassLength", "PMD.CouplingBetweenObjects" }) 105public class SocketServer extends SocketConnectionManager 106 implements NioHandler { 107 108 private SocketAddress serverAddress; 109 private ServerSocketChannel serverSocketChannel; 110 private boolean closing; 111 private int backlog; 112 private PermitsPool connLimiter; 113 private Registration registration; 114 @SuppressWarnings("PMD.SingularField") 115 private Thread purger; 116 private long minimumPurgeableTime; 117 118 /** 119 * The purger thread. 120 */ 121 private class Purger extends Thread implements AvailabilityListener { 122 123 private boolean permitsAvailable = true; 124 125 /** 126 * Instantiates a new purger. 127 */ 128 public Purger() { 129 setName(Components.simpleObjectName(this)); 130 } 131 132 @Override 133 public void availabilityChanged(PermitsPool pool, boolean available) { 134 if (registration == null) { 135 return; 136 } 137 synchronized (this) { 138 permitsAvailable = available; 139 registration.updateInterested( 140 permitsAvailable ? SelectionKey.OP_ACCEPT : 0); 141 if (!permitsAvailable) { 142 this.notifyAll(); 143 } 144 } 145 } 146 147 @Override 148 @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", 149 "PMD.DataflowAnomalyAnalysis", "PMD.CognitiveComplexity" }) 150 public void run() { 151 if (connLimiter == null) { 152 return; 153 } 154 try { 155 connLimiter.addListener(this); 156 while (serverSocketChannel.isOpen()) { 157 synchronized (this) { 158 while (permitsAvailable) { 159 wait(); 160 } 161 } 162 // Copy to avoid ConcurrentModificationException 163 List<SocketChannelImpl> candidates; 164 synchronized (channels) { 165 candidates = new ArrayList<>(channels); 166 } 167 long purgeableSince 168 = System.currentTimeMillis() - minimumPurgeableTime; 169 candidates = candidates.stream() 170 .filter(channel -> channel.isPurgeable() 171 && channel.purgeableSince() < purgeableSince) 172 .sorted(new Comparator<>() { 173 @Override 174 @SuppressWarnings("PMD.ShortVariable") 175 public int compare(SocketChannelImpl c1, 176 SocketChannelImpl c2) { 177 if (c1.purgeableSince() < c2 178 .purgeableSince()) { 179 return 1; 180 } 181 if (c1.purgeableSince() > c2 182 .purgeableSince()) { 183 return -1; 184 } 185 return 0; 186 } 187 }) 188 .collect(Collectors.toList()); 189 for (SocketChannelImpl channel : candidates) { 190 // Sorting may have taken time... 191 if (!channel.isPurgeable()) { 192 continue; 193 } 194 channel.downPipeline().fire(new Purge(), channel); 195 // Continue only as long as necessary 196 if (permitsAvailable) { 197 break; 198 } 199 } 200 sleep(1000); 201 } 202 } catch (InterruptedException e) { 203 // Fall through 204 } finally { 205 connLimiter.removeListener(this); 206 } 207 } 208 209 } 210 211 /** 212 * Creates a new server, using itself as component channel. 213 */ 214 public SocketServer() { 215 this(Channel.SELF); 216 } 217 218 /** 219 * Creates a new server using the given channel. 220 * 221 * @param componentChannel the component's channel 222 */ 223 public SocketServer(Channel componentChannel) { 224 super(componentChannel); 225 } 226 227 /** 228 * Sets the address to bind to. If none is set, the address and port 229 * are assigned automatically. 230 * 231 * @param serverAddress the address to bind to 232 * @return the socket server for easy chaining 233 */ 234 public SocketServer setServerAddress(SocketAddress serverAddress) { 235 this.serverAddress = serverAddress; 236 return this; 237 } 238 239 @Override 240 public SocketServer setBufferSize(int size) { 241 super.setBufferSize(size); 242 return this; 243 } 244 245 /** 246 * The component can be configured with events that include 247 * a path (see @link {@link ConfigurationUpdate#paths()}) 248 * that matches this components path (see {@link Manager#componentPath()}). 249 * 250 * The following properties are recognized: 251 * 252 * `hostname` 253 * : If given, is used as first parameter for 254 * {@link InetSocketAddress#InetSocketAddress(String, int)}. 255 * 256 * `port` 257 * : If given, is used as parameter for 258 * {@link InetSocketAddress#InetSocketAddress(String, int)} 259 * or {@link InetSocketAddress#InetSocketAddress(int)}, 260 * depending on whether a host name is specified. Defaults to "0". 261 * 262 * `backlog` 263 * : See {@link #setBacklog(int)}. 264 * 265 * `bufferSize` 266 * : See {@link #setBufferSize(int)}. 267 * 268 * `maxConnections` 269 * : Calls {@link #setConnectionLimiter} with a 270 * {@link PermitsPool} of the specified size. 271 * 272 * `minimalPurgeableTime` 273 * : See {@link #setMinimalPurgeableTime(long)}. 274 * 275 * @param event the event 276 */ 277 @Handler 278 @SuppressWarnings("PMD.ConfusingTernary") 279 public void onConfigurationUpdate(ConfigurationUpdate event) { 280 event.values(componentPath()).ifPresent(values -> { 281 String hostname = values.get("hostname"); 282 if (hostname != null) { 283 setServerAddress(new InetSocketAddress(hostname, 284 Integer.parseInt(values.getOrDefault("port", "0")))); 285 } else if (values.containsKey("port")) { 286 setServerAddress(new InetSocketAddress( 287 Integer.parseInt(values.get("port")))); 288 } 289 Optional.ofNullable(values.get("backlog")).ifPresent( 290 value -> setBacklog(Integer.parseInt(value))); 291 Optional.ofNullable(values.get("bufferSize")).ifPresent( 292 value -> setBufferSize(Integer.parseInt(value))); 293 Optional.ofNullable(values.get("maxConnections")) 294 .map(Integer::parseInt).map(PermitsPool::new) 295 .ifPresent(this::setConnectionLimiter); 296 Optional.ofNullable(values.get("minimalPurgeableTime")) 297 .map(Long::parseLong).ifPresent(this::setMinimalPurgeableTime); 298 }); 299 } 300 301 /** 302 * Returns the server address. Before starting, the address is the 303 * address set with {@link #setServerAddress(InetSocketAddress)}. After 304 * starting the address is obtained from the created socket. 305 * 306 * @return the serverAddress 307 */ 308 public SocketAddress serverAddress() { 309 try { 310 return serverSocketChannel == null ? serverAddress 311 : serverSocketChannel.getLocalAddress(); 312 } catch (IOException e) { 313 return serverAddress; 314 } 315 } 316 317 /** 318 * Sets the backlog size. 319 * 320 * @param backlog the backlog to set 321 * @return the socket server for easy chaining 322 */ 323 public SocketServer setBacklog(int backlog) { 324 this.backlog = backlog; 325 return this; 326 } 327 328 /** 329 * Return the configured backlog size. 330 * 331 * @return the backlog 332 */ 333 public int backlog() { 334 return backlog; 335 } 336 337 /** 338 * Sets a permit "pool". A new connection is created only if a permit 339 * can be obtained from the pool. 340 * 341 * A connection limiter must be set before starting the component. 342 * 343 * @param connectionLimiter the connection pool to set 344 * @return the socket server for easy chaining 345 */ 346 public SocketServer setConnectionLimiter(PermitsPool connectionLimiter) { 347 this.connLimiter = connectionLimiter; 348 return this; 349 } 350 351 /** 352 * Returns the connection limiter. 353 * 354 * @return the connection Limiter 355 */ 356 public PermitsPool getConnectionLimiter() { 357 return connLimiter; 358 } 359 360 /** 361 * Sets a minimal time that a connection must be purgeable (idle) 362 * before it may be purged. 363 * 364 * @param millis the millis 365 * @return the socket server 366 */ 367 public SocketServer setMinimalPurgeableTime(long millis) { 368 this.minimumPurgeableTime = millis; 369 return this; 370 } 371 372 /** 373 * Gets the minimal purgeable time. 374 * 375 * @return the minimal purgeable time 376 */ 377 public long getMinimalPurgeableTime() { 378 return minimumPurgeableTime; 379 } 380 381 /** 382 * Starts the server. 383 * 384 * @param event the start event 385 * @throws IOException if an I/O exception occurred 386 */ 387 @Handler 388 public void onStart(Start event) throws IOException { 389 closing = false; 390 if (serverAddress instanceof UnixDomainSocketAddress) { 391 serverSocketChannel 392 = ServerSocketChannel.open(StandardProtocolFamily.UNIX); 393 } else { 394 serverSocketChannel = ServerSocketChannel.open(); 395 } 396 serverSocketChannel.bind(serverAddress, backlog); 397 MBeanView.addServer(this); 398 fire(new NioRegistration(this, serverSocketChannel, 399 SelectionKey.OP_ACCEPT, this), Channel.BROADCAST); 400 } 401 402 /** 403 * Handles the successful channel registration. 404 * 405 * @param event the event 406 * @throws InterruptedException the interrupted exception 407 * @throws IOException Signals that an I/O exception has occurred. 408 */ 409 @Handler(channels = Self.class) 410 public void onRegistered(NioRegistration.Completed event) 411 throws InterruptedException, IOException { 412 NioHandler handler = event.event().handler(); 413 if (handler == this) { 414 if (event.event().get() == null) { 415 fire(new Error(event, 416 "Registration failed, no NioDispatcher?")); 417 return; 418 } 419 registration = event.event().get(); 420 purger = Thread.ofVirtual().start(new Purger()); 421 fire(new Ready(serverSocketChannel.getLocalAddress())); 422 return; 423 } 424 if (handler instanceof SocketChannelImpl channel 425 && channels.contains(channel)) { 426 var accepted = new Accepted(channel.nioChannel().getLocalAddress(), 427 channel.nioChannel().getRemoteAddress(), false, 428 Collections.emptyList()); 429 var registration = event.event().get(); 430 // (1) Opening, (2) Accepted, (3) process input 431 channel.downPipeline().fire(Event.onCompletion(new Opening<Void>(), 432 e -> { 433 channel.downPipeline().fire(accepted, channel); 434 channel.registrationComplete(registration); 435 }), channel); 436 } 437 } 438 439 /* 440 * (non-Javadoc) 441 * 442 * @see org.jgrapes.io.NioSelectable#handleOps(int) 443 */ 444 @Override 445 public void handleOps(int ops) { 446 if ((ops & SelectionKey.OP_ACCEPT) == 0 || closing) { 447 return; 448 } 449 synchronized (channels) { 450 if (connLimiter != null && !connLimiter.tryAcquire()) { 451 return; 452 } 453 try { 454 @SuppressWarnings("PMD.CloseResource") 455 SocketChannel socketChannel = serverSocketChannel.accept(); 456 if (socketChannel == null) { 457 // "False alarm" 458 if (connLimiter != null) { 459 connLimiter.release(); 460 } 461 return; 462 } 463 new SocketChannelImpl(null, socketChannel); 464 } catch (IOException e) { 465 fire(new IOError(null, e)); 466 } 467 } 468 } 469 470 @Override 471 protected boolean removeChannel(SocketChannelImpl channel) { 472 synchronized (channels) { 473 if (!channels.remove(channel)) { 474 // Closed already 475 return false; 476 } 477 // In case the server is shutting down 478 channels.notifyAll(); 479 } 480 if (connLimiter != null) { 481 connLimiter.release(); 482 } 483 return true; 484 } 485 486 /** 487 * Shuts down the server or one of the connections to the server. 488 * 489 * @param event the event 490 * @throws IOException if an I/O exception occurred 491 * @throws InterruptedException if the execution was interrupted 492 */ 493 @Handler 494 @SuppressWarnings("PMD.DataflowAnomalyAnalysis") 495 public void onClose(Close event) throws IOException, InterruptedException { 496 boolean closeServer = false; 497 for (Channel channel : event.channels()) { 498 if (channels.contains(channel)) { 499 ((SocketChannelImpl) channel).close(); 500 continue; 501 } 502 if (channel instanceof Subchannel) { 503 // Some subchannel that we're not interested in. 504 continue; 505 } 506 // Close event on "main" channel 507 closeServer = true; 508 } 509 if (!closeServer) { 510 // Only connection(s) were to be closed. 511 return; 512 } 513 if (!serverSocketChannel.isOpen()) { 514 // Closed already 515 fire(new Closed<Void>()); 516 return; 517 } 518 synchronized (channels) { 519 closing = true; 520 // Copy to avoid concurrent modification exception 521 Set<SocketChannelImpl> conns = new HashSet<>(channels); 522 for (SocketChannelImpl conn : conns) { 523 conn.close(); 524 } 525 while (!channels.isEmpty()) { 526 channels.wait(); 527 } 528 } 529 serverSocketChannel.close(); 530 purger.interrupt(); 531 closing = false; 532 fire(new Closed<Void>()); 533 } 534 535 /** 536 * Shuts down the server by firing a {@link Close} using the 537 * server as channel. Note that this automatically results 538 * in closing all open connections by the runtime system 539 * and thus in {@link Closed} events on all subchannels. 540 * 541 * @param event the event 542 * @throws InterruptedException 543 */ 544 @Handler(priority = -1000) 545 public void onStop(Stop event) throws InterruptedException { 546 if (closing || !serverSocketChannel.isOpen()) { 547 return; 548 } 549 newEventPipeline().fire(new Close(), this).get(); 550 } 551 552 /** 553 * The Interface of the SocketServer MXBean. 554 */ 555 public interface SocketServerMXBean { 556 557 /** 558 * The Class ChannelInfo. 559 */ 560 class ChannelInfo { 561 562 private final SocketChannelImpl channel; 563 564 /** 565 * Instantiates a new channel info. 566 * 567 * @param channel the channel 568 */ 569 public ChannelInfo(SocketChannelImpl channel) { 570 this.channel = channel; 571 } 572 573 /** 574 * Checks if is purgeable. 575 * 576 * @return true, if is purgeable 577 */ 578 public boolean isPurgeable() { 579 return channel.isPurgeable(); 580 } 581 582 /** 583 * Gets the downstream pool. 584 * 585 * @return the downstream pool 586 */ 587 public String getDownstreamPool() { 588 return channel.readBuffers().name(); 589 } 590 591 /** 592 * Gets the upstream pool. 593 * 594 * @return the upstream pool 595 */ 596 public String getUpstreamPool() { 597 return channel.byteBufferPool().name(); 598 } 599 } 600 601 /** 602 * Gets the component path. 603 * 604 * @return the component path 605 */ 606 String getComponentPath(); 607 608 /** 609 * Gets the channel count. 610 * 611 * @return the channel count 612 */ 613 int getChannelCount(); 614 615 /** 616 * Gets the channels. 617 * 618 * @return the channels 619 */ 620 SortedMap<String, ChannelInfo> getChannels(); 621 622 } 623 624 /** 625 * The Class SocketServerInfo. 626 */ 627 public static class SocketServerInfo implements SocketServerMXBean { 628 629 private static MBeanServer mbs 630 = ManagementFactory.getPlatformMBeanServer(); 631 632 private ObjectName mbeanName; 633 private final WeakReference<SocketServer> serverRef; 634 635 /** 636 * Instantiates a new socket server info. 637 * 638 * @param server the server 639 */ 640 @SuppressWarnings({ "PMD.EmptyCatchBlock", 641 "PMD.AvoidCatchingGenericException", 642 "PMD.ConstructorCallsOverridableMethod" }) 643 public SocketServerInfo(SocketServer server) { 644 serverRef = new WeakReference<>(server); 645 try { 646 String endPoint = ""; 647 if (server.serverAddress instanceof InetSocketAddress addr) { 648 endPoint = " (" + addr.getHostName() + ":" + addr.getPort() 649 + ")"; 650 } else if (server.serverAddress instanceof UnixDomainSocketAddress addr) { 651 endPoint = " (" + addr.getPath() + ")"; 652 } 653 mbeanName = new ObjectName("org.jgrapes.io:type=" 654 + SocketServer.class.getSimpleName() + ",name=" 655 + ObjectName 656 .quote(Components.objectName(server) + endPoint)); 657 } catch (MalformedObjectNameException e) { 658 // Should not happen 659 } 660 try { 661 mbs.unregisterMBean(mbeanName); 662 } catch (Exception e) { 663 // Just in case, should not work 664 } 665 try { 666 mbs.registerMBean(this, mbeanName); 667 } catch (InstanceAlreadyExistsException | MBeanRegistrationException 668 | NotCompliantMBeanException e) { 669 // Have to live with that 670 } 671 } 672 673 /** 674 * Server. 675 * 676 * @return the optional 677 */ 678 @SuppressWarnings({ "PMD.AvoidCatchingGenericException", 679 "PMD.EmptyCatchBlock" }) 680 public Optional<SocketServer> server() { 681 SocketServer server = serverRef.get(); 682 if (server == null) { 683 try { 684 mbs.unregisterMBean(mbeanName); 685 } catch (Exception e) { 686 // Should work. 687 } 688 } 689 return Optional.ofNullable(server); 690 } 691 692 @Override 693 public String getComponentPath() { 694 return server().map(mgr -> mgr.componentPath()).orElse("<removed>"); 695 } 696 697 @Override 698 public int getChannelCount() { 699 return server().map(server -> server.channels.size()).orElse(0); 700 } 701 702 @Override 703 @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") 704 public SortedMap<String, ChannelInfo> getChannels() { 705 return server().map(server -> { 706 SortedMap<String, ChannelInfo> result = new TreeMap<>(); 707 for (SocketChannelImpl channel : server.channels) { 708 result.put(channel.nioChannel().socket() 709 .getRemoteSocketAddress().toString(), 710 new ChannelInfo(channel)); 711 } 712 return result; 713 }).orElse(Collections.emptySortedMap()); 714 } 715 } 716 717 /** 718 * An MBean interface for getting information about the socket servers 719 * and established connections. 720 */ 721 public interface SocketServerSummaryMXBean { 722 723 /** 724 * Gets the connections per server statistics. 725 * 726 * @return the connections per server statistics 727 */ 728 IntSummaryStatistics getConnectionsPerServerStatistics(); 729 730 /** 731 * Gets the servers. 732 * 733 * @return the servers 734 */ 735 Set<SocketServerMXBean> getServers(); 736 } 737 738 /** 739 * The MBeanView. 740 */ 741 private static final class MBeanView implements SocketServerSummaryMXBean { 742 private static Set<SocketServerInfo> serverInfos = new HashSet<>(); 743 744 /** 745 * Adds the server to the reported servers. 746 * 747 * @param server the server 748 */ 749 public static void addServer(SocketServer server) { 750 synchronized (serverInfos) { 751 serverInfos.add(new SocketServerInfo(server)); 752 } 753 } 754 755 /** 756 * Returns the infos. 757 * 758 * @return the sets the 759 */ 760 private Set<SocketServerInfo> infos() { 761 Set<SocketServerInfo> expired = new HashSet<>(); 762 synchronized (serverInfos) { 763 for (SocketServerInfo serverInfo : serverInfos) { 764 if (!serverInfo.server().isPresent()) { 765 expired.add(serverInfo); 766 } 767 } 768 serverInfos.removeAll(expired); 769 } 770 return serverInfos; 771 } 772 773 @SuppressWarnings("unchecked") 774 @Override 775 public Set<SocketServerMXBean> getServers() { 776 return (Set<SocketServerMXBean>) (Object) infos(); 777 } 778 779 @Override 780 public IntSummaryStatistics getConnectionsPerServerStatistics() { 781 return infos().stream().map(info -> info.server().get()) 782 .filter(ref -> ref != null).collect( 783 Collectors.summarizingInt(srv -> srv.channels.size())); 784 } 785 } 786 787 static { 788 try { 789 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); 790 ObjectName mxbeanName = new ObjectName("org.jgrapes.io:type=" 791 + SocketServer.class.getSimpleName() + "s"); 792 mbs.registerMBean(new MBeanView(), mxbeanName); 793 } catch (MalformedObjectNameException | InstanceAlreadyExistsException 794 | MBeanRegistrationException | NotCompliantMBeanException e) { 795 // Does not happen 796 } 797 } 798 799}