Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation changes to BufWriter #78551

Closed
wants to merge 15 commits into from
101 changes: 70 additions & 31 deletions library/std/src/io/buffered/bufwriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,55 +291,94 @@ impl<W: Write> BufWriter<W> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> Write for BufWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// We assume that callers of `write` prefer to avoid split writes where
// possible, so we pre-flush the buffer rather than doing a partial
// write to fill it.
if self.buf.len() + buf.len() > self.buf.capacity() {
self.flush_buf()?;
}
// FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
if buf.len() >= self.buf.capacity() {
self.panicked = true;
let r = self.get_mut().write(buf);
self.panicked = false;
r
self.get_mut().write(buf)
} else {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
// Normally, `write_all` just calls `write` in a loop. We can do better
// by calling `self.get_mut().write_all()` directly, which avoids
// round trips through the buffer in the event of a series of partial
// writes in some circumstances.
if self.buf.len() + buf.len() > self.buf.capacity() {
// Unlike with `write`, we assume that a caller of `write_all` is
// interested in minimizing system calls even if the buffer is split.
// This method tries to fill up the buffer as much as possible before
// flushing, whereas `write` prefers not split incoming bufs.

// Bypass the buffer if the the incoming write is larger than the whole
// buffer.
if buf.len() >= self.capacity() {
self.flush_buf()?;
return self.get_mut().write_all(buf);
}
// FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
if buf.len() >= self.buf.capacity() {
self.panicked = true;
let r = self.get_mut().write_all(buf);
self.panicked = false;
r
} else {
self.buf.extend_from_slice(buf);
Ok(())

// In order to reduce net writes in aggregate, we buffer as much as
// possible, then forward, then buffer the rest
let amt_buffered = self.write_to_buf(buf);
if amt_buffered < buf.len() {
self.flush_buf()?;
// At this point, because we know that buf.len() < self.buf.len(),
// we know that this will succeed in totality
self.buf.extend_from_slice(&buf[amt_buffered..]);
}
Ok(())
}

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.buf.len() + total_len > self.buf.capacity() {
self.flush_buf()?;
}
// FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
if total_len >= self.buf.capacity() {
self.panicked = true;
let r = self.get_mut().write_vectored(bufs);
self.panicked = false;
r
if self.get_ref().is_write_vectored() {
let total_len: usize = bufs.iter().map(|buf| buf.len()).sum();

if total_len + self.buffer().len() > self.capacity() {
self.flush_buf()?;
}
if total_len >= self.buf.capacity() {
self.get_mut().write_vectored(bufs)
} else {
// Correctness note: we've already verified that none of these
// will overflow the buffer, because total_len < capacity
bufs.iter().for_each(|buf| self.buf.extend_from_slice(buf));
Ok(total_len)
}
} else {
bufs.iter().for_each(|b| self.buf.extend_from_slice(b));
Ok(total_len)
// Because the inner writer doesn't have native vectored write
// support, we should take care of buffering together the individual
// incoming bufs, even if the *total* length is larger than our
// buffer. We only want to skip our buffer if an *individual* write
// exceeds our buffer capacity.
let mut total_buffered = 0;

for buf in bufs {
if total_buffered == 0 {
if buf.len() + self.buffer().len() > self.capacity() {
// If an individual write would overflow our remaining
// capacity and we haven't buffered anything yet,
// pre-flush before buffering (same as with regular
// write())
self.flush_buf()?;
}

if buf.len() > self.capacity() {
// If an individual buffer exceeds our *total* capacity
// and we haven't buffered anything yet, just forward
// it to the underlying device
return self.get_mut().write(buf);
}
}

// Buffer as much as possible until we reach full capacity
total_buffered += self.write_to_buf(buf);
if self.buffer().len() == self.capacity() {
break;
}
}

Ok(total_buffered)
}
}

Expand Down