001/*
002 * JGrapes Event Driven Framework
003 * Copyright (C) 2016-2018 Michael N. Lipp
004 * 
005 * This program is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Affero General Public License as published by 
007 * the Free Software Foundation; either version 3 of the License, or 
008 * (at your option) any later version.
009 * 
010 * This program is distributed in the hope that it will be useful, but 
011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Affero General Public License along 
016 * with this program; if not, see <http://www.gnu.org/licenses/>.
017 */
018
019package org.jgrapes.io;
020
021import java.nio.ByteBuffer;
022import java.nio.CharBuffer;
023import org.jgrapes.core.Channel;
024import org.jgrapes.core.Component;
025import org.jgrapes.core.Components;
026import org.jgrapes.core.Event;
027import org.jgrapes.core.EventPipeline;
028import org.jgrapes.core.Subchannel;
029import org.jgrapes.io.util.ManagedBuffer;
030import org.jgrapes.io.util.ManagedBufferPool;
031
032/**
033 * Represents a subchannel for grouping input and output events related
034 * to an I/O resource such as an opened file or a network connection.
035 * 
036 * An I/O subchannel has an initiator that creates and manages the subchannel.
037 * Events fired by the initiator are said to flow downstream on the channel.
038 * Events fired by components in response are said to flow upstream.
039 * 
040 * Upstream and downstream events are usually handled by two different pipelines
041 * managed by the initiator. One pipeline, accessible only to the initiator,
042 * handles the downstream events. The other, made available as a property of the
043 * I/O subchannel (see {@link #responsePipeline()} and {@link #respond(Event)}), 
044 * handles the upstream events. Of course, any pipeline can be
045 * used to send events upstream to the initiator component. However, using
046 * arbitrary pipelines holds the risk that events aren't delivered in the
047 * intended order.
048 * 
049 * An I/O subchannel also provides associated buffer pools for byte buffers
050 * and character buffers. Buffers used in responses (upstream events)
051 * should be acquired from these pools only. The initiator should initialize 
052 * the pools in such a way that it suits its needs.
053 */
054public interface IOSubchannel extends Subchannel {
055
056    /**
057     * Gets the {@link EventPipeline} that can be used for events going back to
058     * the initiator of this connection. Consistently using this event pipeline
059     * for response events ensures that the events are written in proper
060     * sequence.
061     * 
062     * @return the event pipeline
063     */
064    EventPipeline responsePipeline();
065
066    /**
067     * Get the subchannel's byte buffer pool.
068     * 
069     * @return the buffer pool
070     */
071    ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
072            byteBufferPool();
073
074    /**
075     * Get the subchannel's char buffer pool.
076     * 
077     * @return the buffer pool
078     */
079    ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer>
080            charBufferPool();
081
082    /**
083     * Fires the given event on this subchannel using the subchannel's response
084     * pipeline. Effectively, {@code fire(someEvent)} is a shortcut for
085     * {@code getResponsePipeline.add(someEvent, this)}.
086     * 
087     * @param <T> the event's type
088     * @param event
089     *            the event to fire
090     * @return the event (for easy chaining)
091     */
092    default <T extends Event<?>> T respond(T event) {
093        return responsePipeline().fire(event, this);
094    }
095
096    /**
097     * Creates a new subchannel of the given component's channel with the
098     * given event pipeline and a buffer pool with two buffers sized 4096.
099     *
100     * @param component the component used to get the main channel
101     * @param responsePipeline the response pipeline
102     * @return the subchannel
103     */
104    static IOSubchannel create(
105            Component component, EventPipeline responsePipeline) {
106        return new DefaultIOSubchannel(component.channel(), responsePipeline);
107    }
108
109    /**
110     * A simple implementation of {@link IOSubchannel}.
111     */
112    class DefaultIOSubchannel extends Subchannel.DefaultSubchannel
113            implements IOSubchannel {
114        private final EventPipeline responsePipeline;
115        private ManagedBufferPool<ManagedBuffer<ByteBuffer>,
116                ByteBuffer> byteBufferPool;
117        private ManagedBufferPool<ManagedBuffer<CharBuffer>,
118                CharBuffer> charBufferPool;
119
120        /**
121         * Creates a new instance with the given main channel and response
122         * pipeline.  
123         * 
124         * @param mainChannel the main channel
125         * @param responsePipeline the response pipeline to use
126         * 
127         */
128        public DefaultIOSubchannel(
129                Channel mainChannel, EventPipeline responsePipeline) {
130            super(mainChannel);
131            this.responsePipeline = responsePipeline;
132        }
133
134        protected void setByteBufferPool(
135                ManagedBufferPool<ManagedBuffer<ByteBuffer>,
136                        ByteBuffer> bufferPool) {
137            this.byteBufferPool = bufferPool;
138        }
139
140        protected void setCharBufferPool(
141                ManagedBufferPool<ManagedBuffer<CharBuffer>,
142                        CharBuffer> bufferPool) {
143            this.charBufferPool = bufferPool;
144        }
145
146        /*
147         * (non-Javadoc)
148         * 
149         * @see org.jgrapes.io.IOSubchannel#responsePipeline()
150         */
151        @Override
152        public EventPipeline responsePipeline() {
153            return responsePipeline;
154        }
155
156        /**
157         * Returns the buffer pool set. If no buffer pool has been set, a
158         * buffer pool with with two buffers of size 4096 is created.
159         */
160        public ManagedBufferPool<ManagedBuffer<ByteBuffer>, ByteBuffer>
161                byteBufferPool() {
162            if (byteBufferPool == null) {
163                byteBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
164                    () -> {
165                        return ByteBuffer.allocate(4096);
166                    }, 2)
167                        .setName(Components.objectName(this)
168                            + ".upstream.byteBuffers");
169            }
170            return byteBufferPool;
171        }
172
173        /**
174         * Returns the buffer pool set. If no buffer pool has been set, a
175         * buffer pool with with two buffers of size 4096 is created.
176         */
177        public ManagedBufferPool<ManagedBuffer<CharBuffer>, CharBuffer>
178                charBufferPool() {
179            if (charBufferPool == null) {
180                charBufferPool = new ManagedBufferPool<>(ManagedBuffer::new,
181                    () -> {
182                        return CharBuffer.allocate(4096);
183                    }, 2)
184                        .setName(Components.objectName(this)
185                            + ".upstream.charBuffers");
186            }
187            return charBufferPool;
188        }
189
190    }
191}