001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.nio.ByteBuffer;
022import java.nio.CharBuffer;
023
024import org.jgrapes.core.Channel;
025import org.jgrapes.core.Component;
026import org.jgrapes.core.Components;
027import org.jgrapes.core.Event;
028import org.jgrapes.core.EventPipeline;
029import org.jgrapes.core.Subchannel;
030import org.jgrapes.io.util.ManagedBuffer;
031import org.jgrapes.io.util.ManagedBufferPool;
032
033/**
034 * Represents a subchannel for grouping input and output events related
035 * to an I/O resource such as an opened file or a network connection.
036 * 
037 * An I/O subchannel has an initiator that creates and manages the subchannel.
038 * Events fired by the initiator are said to flow downstream on the channel.
039 * Events fired by components in response are said to flow upstream.
040 * 
041 * Upstream and downstream events are usually handled by two different pipelines
042 * managed by the initiator. One pipeline, accessible only to the initiator,
043 * handles the downstream events. The other, made available as a property of the
044 * I/O subchannel (see {@link #responsePipeline()} and {@link #respond(Event)}), 
045 * handles the upstream events. Of course, any pipeline can be
046 * used to send events upstream to the initiator component. However, using
047 * arbitrary pipelines holds the risk that events aren't delivered in the
048 * intended order.
049 * 
050 * An I/O subchannel also provides associated buffer pools for byte buffers
051 * and character buffers. Buffers used in responses (upstream events)
052 * should be acquired from these pools only. The initiator should initialize 
053 * the pools in such a way that it suits its needs.
054 */
055public interface IOSubchannel extends Subchannel {
056
057    /**
058     * Gets the {@link EventPipeline} that can be used for events going back to
059     * the initiator of this connection. Consistently using this event pipeline
060     * for response events ensures that the events are written in proper
061     * sequence.
062     * 
063     * @return the event pipeline
064     */
065    EventPipeline responsePipeline();
066
067    /**
068     * Get the subchannel's byte buffer pool.
069     * 
070     * @return the buffer pool
071     */
072    ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
073            byteBufferPool();
074
075    /**
076     * Get the subchannel's char buffer pool.
077     * 
078     * @return the buffer pool
079     */
080    ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer>
081            charBufferPool();
082
083    /**
084     * Fires the given event on this subchannel using the subchannel's response
085     * pipeline. Effectively, {@code fire(someEvent)} is a shortcut for
086     * {@code getResponsePipeline.add(someEvent, this)}.
087     * 
088     * @param <T> the event's type
089     * @param event
090     *            the event to fire
091     * @return the event (for easy chaining)
092     */
093    default <T extends Event<?>> T respond(T event) {
094        return responsePipeline().fire(event, this);
095    }
096
097    /**
098     * Creates a new subchannel of the given component's channel with the
099     * given event pipeline and a buffer pool with two buffers sized 4096.
100     *
101     * @param component the component used to get the main channel
102     * @param responsePipeline the response pipeline
103     * @return the subchannel
104     */
105    static IOSubchannel create(
106            Component component, EventPipeline responsePipeline) {
107        return new DefaultIOSubchannel(component.channel(), responsePipeline);
108    }
109
110    /**
111     * A simple implementation of {@link IOSubchannel}.
112     */
113    class DefaultIOSubchannel extends Subchannel.DefaultSubchannel
114            implements IOSubchannel {
115        private final EventPipeline responsePipeline;
116        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
117                ByteBuffer> byteBufferPool;
118        private ManagedBufferPool<ManagedBuffer<CharBuffer>,
119                CharBuffer> charBufferPool;
120
121        /**
122         * Creates a new instance with the given main channel and response
123         * pipeline.  
124         * 
125         * @param mainChannel the main channel
126         * @param responsePipeline the response pipeline to use
127         * 
128         */
129        public DefaultIOSubchannel(
130                Channel mainChannel, EventPipeline responsePipeline) {
131            super(mainChannel);
132            this.responsePipeline = responsePipeline;
133        }
134
135        protected void setByteBufferPool(
136                ManagedBufferPool<ManagedBuffer<ByteBuffer>,
137                        ByteBuffer> bufferPool) {
138            this.byteBufferPool = bufferPool;
139        }
140
141        protected void setCharBufferPool(
142                ManagedBufferPool<ManagedBuffer<CharBuffer>,
143                        CharBuffer> bufferPool) {
144            this.charBufferPool = bufferPool;
145        }
146
147        /*
148         * (non-Javadoc)
149         * 
150         * @see org.jgrapes.io.IOSubchannel#responsePipeline()
151         */
152        @Override
153        public EventPipeline responsePipeline() {
154            return responsePipeline;
155        }
156
157        /**
158         * Returns the buffer pool set. If no buffer pool has been set, a
159         * buffer pool with with two buffers of size 4096 is created.
160         */
161        public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
162                byteBufferPool() {
163            if (byteBufferPool == null) {
164                byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
165                    () -> {
166                        return ByteBuffer.allocate(4096);
167                    }, 2)
168                        .setName(Components.objectName(this)
169                            + ".upstream.byteBuffers");
170            }
171            return byteBufferPool;
172        }
173
174        /**
175         * Returns the buffer pool set. If no buffer pool has been set, a
176         * buffer pool with with two buffers of size 4096 is created.
177         */
178        public ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer>
179                charBufferPool() {
180            if (charBufferPool == null) {
181                charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
182                    () -> {
183                        return CharBuffer.allocate(4096);
184                    }, 2)
185                        .setName(Components.objectName(this)
186                            + ".upstream.charBuffers");
187            }
188            return charBufferPool;
189        }
190
191    }
192}