Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow logs #431

Merged
merged 2 commits into from
May 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 214 additions & 21 deletions crates/engine/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,215 @@
use std::sync::{Arc, RwLock};
use wasi_common::pipe::{ReadPipe, WritePipe};

/// Input/Output stream redirects
#[derive(Clone)]
pub struct IoStreamRedirects {
/// Standard input redirect.
pub stdin: ReadPipe<std::io::Cursor<Vec<u8>>>,
/// Standard output redirect.
pub stdout: OutRedirect,
/// Standard error redirect.
pub stderr: OutRedirect,
}

/// Output redirect and lock.
#[derive(Clone)]
pub struct OutRedirect {
/// Output redirect.
pub out: WritePipe<Vec<u8>>,
/// Lock for writing.
pub lock: Arc<RwLock<Vec<u8>>>,
use std::{
collections::HashSet,
io::{LineWriter, Write},
sync::{Arc, RwLock, RwLockReadGuard},
};
use wasi_common::{
pipe::{ReadPipe, WritePipe},
WasiFile,
};

/// Which components should have their logs followed on stdout/stderr.
#[derive(Clone, Debug)]
pub enum FollowComponents {
/// No components should have their logs followed.
None,
/// Only the specified components should have their logs followed.
Named(HashSet<String>),
/// All components should have their logs followed.
All,
}

impl FollowComponents {
/// Whether a given component should have its logs followed on stdout/stderr.
pub fn should_follow(&self, component_id: &str) -> bool {
match self {
Self::None => false,
Self::All => true,
Self::Named(ids) => ids.contains(component_id),
}
}
}

/// The buffers in which Wasm module output has been saved.
pub trait OutputBuffers {
/// The buffer in which stdout has been saved.
fn stdout(&self) -> &[u8];
/// The buffer in which stderr has been saved.
fn stderr(&self) -> &[u8];
}

/// A set of redirected standard I/O streams with which
/// a Wasm module is to be run.
pub struct ModuleIoRedirects {
pub(crate) stdin: Box<dyn WasiFile>,
pub(crate) stdout: Box<dyn WasiFile>,
pub(crate) stderr: Box<dyn WasiFile>,
}

impl ModuleIoRedirects {
/// Constructs an instance from a set of WasiFile objects.
pub fn new(
stdin: Box<dyn WasiFile>,
stdout: Box<dyn WasiFile>,
stderr: Box<dyn WasiFile>,
) -> Self {
Self {
stdin,
stdout,
stderr,
}
}
}

/// The destinations to which redirected module output will be written.
/// Used for subsequently reading back the output.
pub struct RedirectReadHandles {
stdout: Arc<RwLock<WriteDestinations>>,
stderr: Arc<RwLock<WriteDestinations>>,
}

impl RedirectReadHandles {
/// Acquires a read lock for the in-memory output buffers.
pub fn read(&self) -> impl OutputBuffers + '_ {
RedirectReadHandlesLock {
stdout: self.stdout.read().unwrap(),
stderr: self.stderr.read().unwrap(),
}
}
}

struct RedirectReadHandlesLock<'a> {
stdout: RwLockReadGuard<'a, WriteDestinations>,
stderr: RwLockReadGuard<'a, WriteDestinations>,
}

impl<'a> OutputBuffers for RedirectReadHandlesLock<'a> {
fn stdout(&self) -> &[u8] {
self.stdout.buffer()
}
fn stderr(&self) -> &[u8] {
self.stderr.buffer()
}
}

/// Prepares WASI pipes which redirect a component's output to
/// memory buffers.
pub fn capture_io_to_memory(
follow_on_stdout: bool,
follow_on_stderr: bool,
) -> (ModuleIoRedirects, RedirectReadHandles) {
let stdout_follow = Follow::stdout(follow_on_stdout);
let stderr_follow = Follow::stderr(follow_on_stderr);

let stdin = ReadPipe::from(vec![]);

let (stdout_pipe, stdout_lock) = redirect_to_mem_buffer(stdout_follow);

let (stderr_pipe, stderr_lock) = redirect_to_mem_buffer(stderr_follow);

let redirects = ModuleIoRedirects {
stdin: Box::new(stdin),
stdout: Box::new(stdout_pipe),
stderr: Box::new(stderr_pipe),
};

let outputs = RedirectReadHandles {
stdout: stdout_lock,
stderr: stderr_lock,
};

(redirects, outputs)
}

/// Indicates whether a memory redirect should also pipe the output to
/// the console so it can be followed live.
pub enum Follow {
/// Do not pipe to console - only write to memory.
None,
/// Also pipe to stdout.
Stdout,
/// Also pipe to stderr.
Stderr,
}

impl Follow {
pub(crate) fn writer(&self) -> Box<dyn Write + Send + Sync> {
match self {
Self::None => Box::new(DiscardingWriter),
Self::Stdout => Box::new(LineWriter::new(std::io::stdout())),
Self::Stderr => Box::new(LineWriter::new(std::io::stderr())),
}
}

/// Follow on stdout if so specified.
pub fn stdout(follow_on_stdout: bool) -> Self {
if follow_on_stdout {
Self::Stdout
} else {
Self::None
}
}

/// Follow on stderr if so specified.
pub fn stderr(follow_on_stderr: bool) -> Self {
if follow_on_stderr {
Self::Stderr
} else {
Self::None
}
}
}

/// Prepares a WASI pipe which writes to a memory buffer, optionally
/// copying to the specified output stream.
pub fn redirect_to_mem_buffer(
follow: Follow,
) -> (WritePipe<WriteDestinations>, Arc<RwLock<WriteDestinations>>) {
let immediate = follow.writer();

let buffer: Vec<u8> = vec![];
let std_dests = WriteDestinations { buffer, immediate };
let lock = Arc::new(RwLock::new(std_dests));
let std_pipe = WritePipe::from_shared(lock.clone());

(std_pipe, lock)
}

/// The destinations to which a component writes an output stream.
pub struct WriteDestinations {
buffer: Vec<u8>,
immediate: Box<dyn Write + Send + Sync>,
}

impl WriteDestinations {
/// The memory buffer to which a component writes an output stream.
pub fn buffer(&self) -> &[u8] {
&self.buffer
}
}

impl Write for WriteDestinations {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let written = self.buffer.write(buf)?;
self.immediate.write_all(&buf[0..written])?;
Ok(written)
}

fn flush(&mut self) -> std::io::Result<()> {
self.buffer.flush()?;
self.immediate.flush()?;
Ok(())
}
}

struct DiscardingWriter;

impl Write for DiscardingWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
21 changes: 9 additions & 12 deletions crates/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod io;

use anyhow::{bail, Context, Result};
use host_component::{HostComponent, HostComponents, HostComponentsState};
use io::IoStreamRedirects;
use io::{ModuleIoRedirects, OutputBuffers};
use spin_config::{host_component::ComponentConfig, Resolver};
use spin_manifest::{Application, CoreComponent, DirectoryMount, ModuleSource};
use std::{collections::HashMap, io::Write, path::PathBuf, sync::Arc};
Expand Down Expand Up @@ -219,7 +219,7 @@ impl<T: Default> ExecutionContext<T> {
&self,
component: &str,
data: Option<T>,
io: Option<IoStreamRedirects>,
io: Option<ModuleIoRedirects>,
env: Option<HashMap<String, String>>,
args: Option<Vec<String>>,
) -> Result<(Store<RuntimeContext<T>>, Instance)> {
Expand All @@ -238,7 +238,7 @@ impl<T: Default> ExecutionContext<T> {
/// Save logs for a given component in the log directory on the host
pub fn save_output_to_logs(
&self,
io_redirects: IoStreamRedirects,
io_redirects: impl OutputBuffers,
component: &str,
save_stdout: bool,
save_stderr: bool,
Expand Down Expand Up @@ -274,18 +274,18 @@ impl<T: Default> ExecutionContext<T> {
.append(true)
.create(true)
.open(stdout_filename)?;
let contents = io_redirects.stdout.lock.read().unwrap();
file.write_all(&contents)?;
let contents = io_redirects.stdout();
file.write_all(contents)?;
}

if save_stderr {
let contents = io_redirects.stderr.lock.read().unwrap();
let mut file = std::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(stderr_filename)?;
file.write_all(&contents)?;
let contents = io_redirects.stderr();
file.write_all(contents)?;
}

Ok(())
Expand All @@ -295,7 +295,7 @@ impl<T: Default> ExecutionContext<T> {
&self,
component: &Component<T>,
data: Option<T>,
io: Option<IoStreamRedirects>,
io: Option<ModuleIoRedirects>,
env: Option<HashMap<String, String>>,
args: Option<Vec<String>>,
) -> Result<Store<RuntimeContext<T>>> {
Expand All @@ -307,10 +307,7 @@ impl<T: Default> ExecutionContext<T> {
.envs(&env)?;
match io {
Some(r) => {
wasi_ctx = wasi_ctx
.stderr(Box::new(r.stderr.out))
.stdout(Box::new(r.stdout.out))
.stdin(Box::new(r.stdin));
wasi_ctx = wasi_ctx.stderr(r.stderr).stdout(r.stdout).stdin(r.stdin);
}
None => wasi_ctx = wasi_ctx.inherit_stdio(),
};
Expand Down
Loading