001/* 002 * This file is part of the JDrupes non-blocking HTTP Codec 003 * Copyright (C) 2016, 2017 Michael N. Lipp 004 * 005 * This program is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published 007 * by the Free Software Foundation; either version 3 of the License, or 008 * (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, but 011 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 012 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 013 * License for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License along 016 * with this program; if not, see <http://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.httpcodec.util; 020 021import java.io.IOException; 022import java.io.OutputStream; 023import java.nio.ByteBuffer; 024import java.util.ArrayDeque; 025import java.util.Queue; 026 027/** 028 * An {@link OutputStream} that is backed by a {@link ByteBuffer} assigned to 029 * the stream. If the buffer becomes full, one or more buffers are allocated as 030 * intermediate storage. Their content is copied to the next assigned buffer(s). 031 * <p> 032 * While writing to this stream, {@link #remaining()} should be checked 033 * regularly and the production of data should be suspended if possible when no 034 * more space is left to avoid the usage of intermediate storage. 035 */ 036public class ByteBufferOutputStream extends OutputStream { 037 038 private ByteBuffer assignedBuffer = null; 039 private Queue<ByteBuffer> overflows = new ArrayDeque<>(); 040 private ByteBuffer current = null; 041 private int overflowBufferSize = 0; 042 private long bytesWritten = 0; 043 044 /** 045 * Creates a new instance with an unset overflow buffer size. 046 */ 047 public ByteBufferOutputStream() { 048 super(); 049 } 050 051 /** 052 * Creates a new instance with the given overflow buffer size. 053 * 054 * @param overflowBufferSize 055 * the overflow buffer size to use 056 */ 057 public ByteBufferOutputStream(int overflowBufferSize) { 058 super(); 059 this.overflowBufferSize = overflowBufferSize; 060 } 061 062 /** 063 * Returns the size of the buffers that will be allocated as overflow 064 * buffers. 065 * 066 * @return the allocation size for the overflow buffers 067 */ 068 public int overflowBufferSize() { 069 return overflowBufferSize; 070 } 071 072 /** 073 * The size of the buffers that are allocated if the assigned buffer 074 * overflows. If not set, buffers are allocated with one fourth of the size 075 * of the assigned buffer or 4096 if no buffer has been assigned yet. 076 * 077 * @param overflowBufferSize 078 * the size 079 */ 080 public void setOverflowBufferSize(int overflowBufferSize) { 081 this.overflowBufferSize = overflowBufferSize; 082 } 083 084 /** 085 * Clear any buffered data and prepares the buffer for reuse. 086 */ 087 public void clear() { 088 assignedBuffer = null; 089 current = null; 090 bytesWritten = 0; 091 overflows.clear(); 092 } 093 094 /** 095 * Assign a new buffer to this output stream. If the previously used buffer 096 * had become full and intermediate storage was allocated, the data from the 097 * intermediate storage is copied to the new buffer first. Then, the new 098 * buffer is used for all subsequent write operations. 099 * 100 * @param buffer 101 * the buffer to use 102 */ 103 public void assignBuffer(ByteBuffer buffer) { 104 if (buffer == null) { 105 if (current == assignedBuffer) { 106 current = null; 107 } 108 assignedBuffer = null; 109 return; 110 } 111 assignedBuffer = buffer; 112 // Move any overflow to the new buffer 113 while (!overflows.isEmpty()) { 114 ByteBuffer head = overflows.peek(); 115 // Do a "flip with position to mark" 116 int writePos = head.position(); // Save position 117 head.reset(); 118 head.limit(writePos); 119 if (!ByteBufferUtils.putAsMuchAsPossible(assignedBuffer, head)) { 120 // Cannot transfer everything, done what's possible 121 head.mark(); // new position for next put 122 head.limit(head.capacity()); 123 head.position(writePos); 124 return; 125 } 126 overflows.remove(); 127 } 128 current = assignedBuffer; 129 } 130 131 private void allocateOverflowBuffer() { 132 current = ByteBuffer.allocate( 133 overflowBufferSize != 0 134 ? overflowBufferSize 135 : Math.max(4096, (assignedBuffer == null 136 ? 0 : assignedBuffer.capacity() / 4))); 137 current.mark(); 138 overflows.add(current); 139 } 140 141 /* 142 * (non-Javadoc) 143 * 144 * @see java.io.OutputStream#write(int) 145 */ 146 @Override 147 public void write(int data) throws IOException { 148 if (current == null || current.remaining() == 0) { 149 allocateOverflowBuffer(); 150 } 151 current.put((byte) data); 152 bytesWritten += 1; 153 } 154 155 /* 156 * (non-Javadoc) 157 * 158 * @see java.io.OutputStream#write(byte[], int, int) 159 */ 160 @Override 161 public void write(byte[] data, int offset, int length) 162 throws IOException { 163 if (current == null) { 164 allocateOverflowBuffer(); 165 } 166 bytesWritten += length; 167 while (true) { 168 if (current.remaining() >= length) { 169 current.put(data, offset, length); 170 return; 171 } 172 if (current.remaining() > 0) { 173 int processed = current.remaining(); 174 current.put(data, offset, processed); 175 offset += processed; 176 length -= processed; 177 } 178 allocateOverflowBuffer(); 179 } 180 } 181 182 /** 183 * Copies the data from the given buffer to this output stream. 184 * 185 * @param data the buffer 186 */ 187 public void write(ByteBuffer data) { 188 if (current == null) { 189 allocateOverflowBuffer(); 190 } 191 bytesWritten += data.remaining(); 192 while (true) { 193 if (ByteBufferUtils.putAsMuchAsPossible(current, data)) { 194 return; 195 } 196 allocateOverflowBuffer(); 197 } 198 } 199 200 /** 201 * Copies length bytes from the given buffer to this output stream. 202 * 203 * @param data the buffer 204 * @param length the number of bytes to copy 205 */ 206 public void write(ByteBuffer data, int length) { 207 if (data.remaining() <= length) { 208 write(data); 209 return; 210 } 211 int savedLimit = data.limit(); 212 data.limit(data.position() + length); 213 write(data); 214 data.limit(savedLimit); 215 } 216 217 /** 218 * Returns the number of bytes remaining in the assigned buffer. A negative 219 * value indicates that the assigned buffer is full and one or more overflow 220 * buffers are being used. The absolute value of the negative number is 221 * the number of bytes in the overflow buffer(s). 222 * 223 * @return the bytes remaining or buffered (negative value) 224 */ 225 public long remaining() { 226 if (overflows.isEmpty()) { 227 if (assignedBuffer == null) { 228 return 0; 229 } 230 return assignedBuffer.remaining(); 231 } 232 long sum = 0; 233 for (ByteBuffer b : overflows) { 234 int curPos = b.position(); // Save position 235 b.reset(); 236 sum += curPos - b.position(); 237 b.position(curPos); 238 } 239 return -sum; 240 } 241 242 /** 243 * Does not have any effect. May be called for consistent usage of the 244 * output stream. 245 * 246 * @throws IOException 247 * if there is still data in intermediate storage 248 * @see java.io.OutputStream#close() 249 */ 250 @Override 251 public void close() throws IOException { 252 super.close(); 253 } 254 255 /** 256 * The number of bytes written to this output stream. 257 * 258 * @return the number of bytes written 259 */ 260 public long bytesWritten() { 261 return bytesWritten; 262 } 263}