diff --git a/Cargo.lock b/Cargo.lock index 226ac2de..d9cebb66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -420,6 +420,13 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "complex-component-server" +version = "0.1.0" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1094,6 +1101,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.29" @@ -2743,6 +2756,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" @@ -3732,7 +3754,7 @@ dependencies = [ [[package]] name = "wrpc-runtime-wasmtime" -version = "0.19.2" +version = "0.19.3" dependencies = [ "anyhow", "bytes", @@ -3740,6 +3762,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "uuid", "wasm-tokio", "wasmtime", "wasmtime-wasi", @@ -3750,7 +3773,7 @@ dependencies = [ [[package]] name = "wrpc-transport" -version = "0.26.5" +version = "0.26.6" dependencies = [ "anyhow", "bytes", @@ -3801,12 +3824,13 @@ dependencies = [ [[package]] name = "wrpc-wasmtime-nats-cli" -version = "0.3.2" +version = "0.3.3" dependencies = [ "anyhow", "async-nats-wrpc", "clap", "futures", + "humantime", "reqwest", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index d361d2e1..bd1b3791 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ bytes = { version = "1", default-features = false } clap = { version = "4", default-features = false } futures = { version = "0.3", default-features = false } heck = { version = "0.5", default-features = false } +humantime = { version = "2.1", default-features = false } pin-project-lite = { version = "0.2", default-features = false } prettyplease = { version = "0.2.20", default-features = false } proc-macro2 = { version = "1", default-features = false } @@ -120,6 +121,7 @@ tokio-stream = { version = "0.1", default-features = false } tokio-util = { version = "0.7", default-features = false } tracing = { version = "0.1", default-features = false } tracing-subscriber = { version = "0.3", default-features = false } +uuid = { version = "1", default-features = false } url = { version = "2", default-features = false } wasm-tokio = { version = "0.5.16", default-features = false } wasmcloud-component-adapters = { version = "0.9", default-features = false } @@ -136,8 +138,8 @@ wit-component = { version = "0.212", default-features = false } wit-parser = { version = "0.212", default-features = false } wrpc-cli = { version = "0.2", path = "./crates/cli", default-features = false } wrpc-introspect = { version = "0.1", default-features = false, path = "./crates/introspect" } -wrpc-runtime-wasmtime = { version = "0.19.2", path = "./crates/runtime-wasmtime", default-features = false } -wrpc-transport = { version = "0.26.5", path = "./crates/transport", default-features = false } +wrpc-runtime-wasmtime = { version = "0.19.3", path = "./crates/runtime-wasmtime", default-features = false } +wrpc-transport = { version = "0.26.6", path = "./crates/transport", default-features = false } wrpc-transport-nats = { version = "0.22.2", path = "./crates/transport-nats", default-features = false } wrpc-transport-quic = { version = "0.1", path = "./crates/transport-quic", default-features = false } -wrpc-wasmtime-nats-cli = { version = "0.3.2", path = "./crates/wasmtime-nats-cli", default-features = false } +wrpc-wasmtime-nats-cli = { version = "0.3.3", path = "./crates/wasmtime-nats-cli", default-features = false } diff --git a/crates/runtime-wasmtime/Cargo.toml b/crates/runtime-wasmtime/Cargo.toml index 4c02ef3b..632379a8 100644 --- a/crates/runtime-wasmtime/Cargo.toml +++ b/crates/runtime-wasmtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wrpc-runtime-wasmtime" -version = "0.19.2" +version = "0.19.3" description = "wRPC wasmtime integration" authors.workspace = true @@ -17,6 +17,7 @@ futures = { workspace = true, features = ["alloc"] } tokio = { workspace = true, features = ["macros"] } tokio-util = { workspace = true, features = ["codec", "compat"] } tracing = { workspace = true, features = ["attributes"] } +uuid = { workspace = true, features = ["std", "v7"] } wasm-tokio = { workspace = true } wasmtime = { workspace = true } wasmtime-wasi = { workspace = true } diff --git a/crates/runtime-wasmtime/src/lib.rs b/crates/runtime-wasmtime/src/lib.rs index 47a8bf6d..14de11cb 100644 --- a/crates/runtime-wasmtime/src/lib.rs +++ b/crates/runtime-wasmtime/src/lib.rs @@ -4,8 +4,9 @@ use core::future::Future; use core::iter::zip; use core::ops::{BitOrAssign, Shl}; use core::pin::{pin, Pin}; +use core::time::Duration; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use anyhow::{bail, Context as _}; @@ -14,28 +15,48 @@ use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::{Stream, TryStreamExt as _}; use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; +use tokio::sync::Mutex; +use tokio::time::Instant; use tokio::try_join; use tokio_util::codec::{Encoder, FramedRead}; use tokio_util::compat::FuturesAsyncReadCompatExt as _; -use tracing::{debug, trace}; -use tracing::{instrument, warn}; +use tracing::{debug, error, instrument, trace, warn, Instrument as _, Span}; +use uuid::Uuid; use wasm_tokio::cm::AsyncReadValue as _; use wasm_tokio::{ AsyncReadCore as _, AsyncReadLeb128 as _, AsyncReadUtf8 as _, CoreNameEncoder, CoreVecEncoderBytes, Leb128Encoder, Utf8Codec, }; use wasmtime::component::types::{self, Case, Field}; -use wasmtime::component::{Func, InstancePre, LinkerInstance, ResourceType, Type, Val}; +use wasmtime::component::{ + Func, Instance, InstancePre, LinkerInstance, ResourceAny, ResourceType, Type, Val, +}; use wasmtime::{AsContextMut, Engine, StoreContextMut}; use wasmtime_wasi::pipe::AsyncReadStream; use wasmtime_wasi::{InputStream, StreamError, WasiView}; -use wrpc_transport::{Index as _, Invoke, ListDecoderU8}; +use wrpc_transport::{Index as _, Invoke, InvokeExt as _, ListDecoderU8}; + +// this returns the RPC name for a wasmtime function name. +// Unfortunately, the [`types::ComponentFunc`] does not include the kind information and we want to +// avoid (re-)parsing the WIT here. +fn rpc_func_name(name: &str) -> &str { + if let Some(name) = name.strip_prefix("[constructor]") { + name + } else if let Some(name) = name.strip_prefix("[static]") { + name + } else if let Some(name) = name.strip_prefix("[method]") { + name + } else { + name + } +} pub struct RemoteResource(pub Bytes); pub struct ValEncoder<'a, T, W> { pub store: StoreContextMut<'a, T>, pub ty: &'a Type, + pub resources: &'a [ResourceType], pub deferred: Option< Box Pin> + Send>> + Send>, >, @@ -43,10 +64,15 @@ pub struct ValEncoder<'a, T, W> { impl ValEncoder<'_, T, W> { #[must_use] - pub fn new<'a>(store: StoreContextMut<'a, T>, ty: &'a Type) -> ValEncoder<'a, T, W> { + pub fn new<'a>( + store: StoreContextMut<'a, T>, + ty: &'a Type, + resources: &'a [ResourceType], + ) -> ValEncoder<'a, T, W> { ValEncoder { store, ty, + resources, deferred: None, } } @@ -55,6 +81,7 @@ impl ValEncoder<'_, T, W> { ValEncoder { store: self.store.as_context_mut(), ty, + resources: self.resources, deferred: None, } } @@ -118,12 +145,13 @@ where impl Encoder<&Val> for ValEncoder<'_, T, W> where - T: WasiView, + T: WasiView + WrpcView, W: AsyncWrite + wrpc_transport::Index + Sync + Send + 'static, { type Error = wasmtime::Error; #[allow(clippy::too_many_lines)] + #[instrument(level = "trace", skip(self))] fn encode(&mut self, v: &Val, dst: &mut BytesMut) -> Result<(), Self::Error> { match (v, self.ty) { (Val::Bool(v), Type::Bool) => { @@ -536,6 +564,23 @@ where .encode(buf, dst) .context("failed to encode resource handle") } + } else if self.resources.contains(ty) { + let id = Uuid::now_v7(); + CoreVecEncoderBytes + .encode(id.to_bytes_le().as_slice(), dst) + .context("failed to encode resource handle")?; + trace!(?id, "store shared resource"); + if self + .store + .data_mut() + .shared_resources() + .0 + .insert(id, *resource) + .is_some() + { + error!(?id, "duplicate resource ID generated"); + } + Ok(()) } else { bail!("encoding host resources not supported yet") } @@ -557,12 +602,13 @@ async fn read_flags(n: usize, r: &mut (impl AsyncRead + Unpin)) -> std::io::Resu pub async fn read_value( store: &mut impl AsContextMut, r: &mut Pin<&mut R>, + resources: &[ResourceType], val: &mut Val, ty: &Type, path: &[usize], ) -> std::io::Result<()> where - T: WasiView, + T: WasiView + WrpcView, R: AsyncRead + wrpc_transport::Index + Send + Unpin + 'static, { match ty { @@ -642,7 +688,7 @@ where let mut v = Val::Bool(false); path.push(i); trace!(i, "reading list element value"); - Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; + Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?; path.pop(); vs.push(v); } @@ -657,7 +703,7 @@ where let mut v = Val::Bool(false); path.push(i); trace!(i, "reading struct field value"); - Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; + Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?; path.pop(); vs.push((name.to_string(), v)); } @@ -672,7 +718,7 @@ where let mut v = Val::Bool(false); path.push(i); trace!(i, "reading tuple element value"); - Box::pin(read_value(store, r, &mut v, &ty, &path)).await?; + Box::pin(read_value(store, r, resources, &mut v, &ty, &path)).await?; path.pop(); vs.push(v); } @@ -694,7 +740,7 @@ where if let Some(ty) = ty { let mut v = Val::Bool(false); trace!(variant = name, "reading nested variant value"); - Box::pin(read_value(store, r, &mut v, &ty, path)).await?; + Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?; *val = Val::Variant(name, Some(Box::new(v))); } else { *val = Val::Variant(name, None); @@ -720,7 +766,7 @@ where if ok { let mut v = Val::Bool(false); trace!("reading nested `option::some` value"); - Box::pin(read_value(store, r, &mut v, &ty.ty(), path)).await?; + Box::pin(read_value(store, r, resources, &mut v, &ty.ty(), path)).await?; *val = Val::Option(Some(Box::new(v))); } else { *val = Val::Option(None); @@ -733,7 +779,7 @@ where if let Some(ty) = ty.ok() { let mut v = Val::Bool(false); trace!("reading nested `result::ok` value"); - Box::pin(read_value(store, r, &mut v, &ty, path)).await?; + Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?; *val = Val::Result(Ok(Some(Box::new(v)))); } else { *val = Val::Result(Ok(None)); @@ -741,7 +787,7 @@ where } else if let Some(ty) = ty.err() { let mut v = Val::Bool(false); trace!("reading nested `result::err` value"); - Box::pin(read_value(store, r, &mut v, &ty, path)).await?; + Box::pin(read_value(store, r, resources, &mut v, &ty, path)).await?; *val = Val::Result(Err(Some(Box::new(v)))); } else { *val = Val::Result(Err(None)); @@ -821,6 +867,41 @@ where .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; *val = Val::Resource(v); Ok(()) + } else if resources.contains(ty) { + let mut store = store.as_context_mut(); + let mut id = uuid::Bytes::default(); + debug_assert_eq!(id.len(), 16); + let n = r.read_u8_leb128().await?; + if usize::from(n) != id.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!( + "invalid guest resource handle length {n}, expected {}", + id.len() + ), + )); + } + let n = r.read_exact(&mut id).await?; + if n != id.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!( + "invalid amount of guest resource handle bytes read {n}, expected {}", + id.len() + ), + )); + } + + let id = Uuid::from_bytes_le(id); + trace!(?id, "lookup shared resource"); + let resource = store + .data_mut() + .shared_resources() + .0 + .get(&id) + .ok_or_else(|| std::io::Error::from(std::io::ErrorKind::NotFound))?; + *val = Val::Resource(*resource); + Ok(()) } else { let mut store = store.as_context_mut(); let n = r.read_u32_leb128().await?; @@ -842,31 +923,48 @@ where } } -pub trait WrpcView: Send { - fn client(&self) -> &C; +/// A table of shared resources exported by the component +#[derive(Debug, Default)] +pub struct SharedResourceTable(HashMap); + +pub trait WrpcView: Send { + type Invoke: Invoke; + + /// Returns an [Invoke] implementation used to satisfy polyfilled imports + fn client(&self) -> &Self::Invoke; + + /// Returns a table of shared exported resources + fn shared_resources(&mut self) -> &mut SharedResourceTable; + + /// Optional invocation timeout, component will trap if invocation is not finished within the + /// returned [Duration]. If this method returns [None], then no timeout will be used. + fn timeout(&self) -> Option { + None + } } /// Polyfill [`types::ComponentItem`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] #[instrument(level = "trace", skip_all)] -pub fn link_item( +pub fn link_item( engine: &Engine, linker: &mut LinkerInstance, + resources: impl Into>, ty: types::ComponentItem, instance: impl Into>, name: impl Into>, - cx: C::Context, + cx: ::Context, ) -> wasmtime::Result<()> where - V: WrpcView + WasiView, - C: Invoke, - C::Context: Clone + 'static, + V: WasiView + WrpcView, + ::Context: Clone + 'static, { let instance = instance.into(); + let resources = resources.into(); match ty { types::ComponentItem::ComponentFunc(ty) => { let name = name.into(); debug!(?instance, ?name, "linking function"); - link_function(linker, ty, instance, name, cx)?; + link_function(linker, Arc::clone(&resources), ty, instance, name, cx)?; } types::ComponentItem::CoreFunc(_) => { bail!("polyfilling core functions not supported yet") @@ -875,7 +973,15 @@ where types::ComponentItem::Component(ty) => { for (name, ty) in ty.imports(engine) { debug!(?instance, name, "linking component item"); - link_item(engine, linker, ty, "", name, cx.clone())?; + link_item( + engine, + linker, + Arc::clone(&resources), + ty, + "", + name, + cx.clone(), + )?; } } types::ComponentItem::ComponentInstance(ty) => { @@ -884,11 +990,12 @@ where .instance(&name) .with_context(|| format!("failed to instantiate `{name}` in the linker"))?; debug!(?instance, ?name, "linking instance"); - link_instance(engine, &mut linker, ty, name, cx)?; + link_instance(engine, &mut linker, resources, ty, name, cx)?; } types::ComponentItem::Type(_) => {} types::ComponentItem::Resource(_) => { let name = name.into(); + debug!(?instance, ?name, "linking resource"); linker.resource(&name, ResourceType::host::(), |_, _| Ok(()))?; } } @@ -897,67 +1004,87 @@ where /// Polyfill [`types::ComponentInstance`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] #[instrument(level = "trace", skip_all)] -pub fn link_instance( +pub fn link_instance( engine: &Engine, linker: &mut LinkerInstance, + resources: impl Into>, ty: types::ComponentInstance, name: impl Into>, - cx: C::Context, + cx: ::Context, ) -> wasmtime::Result<()> where - V: WrpcView + WasiView, - C: Invoke, - C::Context: Clone + 'static, + V: WrpcView + WasiView, + ::Context: Clone + 'static, { let instance = name.into(); + let resources = resources.into(); for (name, ty) in ty.exports(engine) { debug!(name, "linking instance item"); - link_item(engine, linker, ty, Arc::clone(&instance), name, cx.clone())?; + link_item( + engine, + linker, + Arc::clone(&resources), + ty, + Arc::clone(&instance), + name, + cx.clone(), + )?; } Ok(()) } /// Polyfill [`types::ComponentFunc`] in a [`LinkerInstance`] using [`wrpc_transport::Invoke`] #[instrument(level = "trace", skip_all)] -pub fn link_function( +pub fn link_function( linker: &mut LinkerInstance, + resources: impl Into>, ty: types::ComponentFunc, instance: impl Into>, name: impl Into>, - cx: C::Context, + cx: ::Context, ) -> wasmtime::Result<()> where - V: WrpcView + WasiView, - C: Invoke, - C::Context: Clone + 'static, + V: WrpcView + WasiView, + ::Context: Clone + 'static, { + let span = Span::current(); let instance = instance.into(); let name = name.into(); + let resources = resources.into(); linker.func_new_async(&Arc::clone(&name), move |mut store, params, results| { let cx = cx.clone(); let ty = ty.clone(); let instance = Arc::clone(&instance); let name = Arc::clone(&name); - Box::new(async move { - let mut buf = BytesMut::default(); - let mut deferred = vec![]; - for (v, ref ty) in zip(params, ty.params()) { - let mut enc = ValEncoder::new(store.as_context_mut(), ty); - enc.encode(v, &mut buf) - .context("failed to encode parameter")?; - deferred.push(enc.deferred); - } - let (outgoing, incoming) = store - .data() - .client() + let resources = Arc::clone(&resources); + Box::new( + async move { + let mut buf = BytesMut::default(); + let mut deferred = vec![]; + for (v, ref ty) in zip(params, ty.params()) { + let mut enc = ValEncoder::new(store.as_context_mut(), ty, &resources); + enc.encode(v, &mut buf) + .context("failed to encode parameter")?; + deferred.push(enc.deferred); + } + let clt = store.data().client(); + let timeout = store.data().timeout(); + let buf = buf.freeze(); // TODO: set paths - .invoke(cx, &instance, &name, buf.freeze(), &[[]]) - .await + let paths = &[[]; 0]; + let rpc_name = rpc_func_name(&name); + let start = Instant::now(); + let (outgoing, incoming) = if let Some(timeout) = timeout { + clt.timeout(timeout) + .invoke(cx, &instance, rpc_name, buf, paths) + .await + } else { + clt.invoke(cx, &instance, rpc_name, buf, paths).await + } .with_context(|| { format!("failed to invoke `{instance}.{name}` polyfill via wRPC") })?; - try_join!( - async { + let tx = async { try_join_all( zip(0.., deferred) .filter_map(|(i, f)| f.map(|f| (outgoing.index(&[i]), f))) @@ -972,19 +1099,38 @@ where .shutdown() .await .context("failed to shutdown outgoing stream") - }, - async { + }; + let rx = async { let mut incoming = pin!(incoming); for (i, (v, ref ty)) in zip(results, ty.results()).enumerate() { - read_value(&mut store, &mut incoming, v, ty, &[i]) + read_value(&mut store, &mut incoming, &resources, v, ty, &[i]) .await .with_context(|| format!("failed to decode return value {i}"))?; } Ok(()) - }, - )?; - Ok(()) - }) + }; + if let Some(timeout) = timeout { + let timeout = + timeout.saturating_sub(Instant::now().saturating_duration_since(start)); + try_join!( + async { + tokio::time::timeout(timeout, tx) + .await + .context("data transmission timed out")? + }, + async { + tokio::time::timeout(timeout, rx) + .await + .context("data receipt timed out")? + }, + )?; + } else { + try_join!(tx, rx)?; + } + Ok(()) + } + .instrument(span.clone()), + ) }) } @@ -995,17 +1141,18 @@ pub async fn call( params_ty: impl ExactSizeIterator, results_ty: impl ExactSizeIterator, func: Func, + guest_resources: &[ResourceType], ) -> anyhow::Result<()> where I: AsyncRead + wrpc_transport::Index + Send + Sync + Unpin + 'static, O: AsyncWrite + wrpc_transport::Index + Send + Sync + Unpin + 'static, C: AsContextMut, - C::Data: WasiView, + C::Data: WasiView + WrpcView, { let mut params = vec![Val::Bool(false); params_ty.len()]; let mut rx = pin!(rx); for (i, (v, ty)) in zip(&mut params, params_ty).enumerate() { - read_value(&mut store, &mut rx, v, ty, &[i]) + read_value(&mut store, &mut rx, guest_resources, v, ty, &[i]) .await .with_context(|| format!("failed to decode parameter value {i}"))?; } @@ -1016,7 +1163,7 @@ where let mut buf = BytesMut::default(); let mut deferred = vec![]; for (i, (ref v, ty)) in zip(results, results_ty).enumerate() { - let mut enc = ValEncoder::new(store.as_context_mut(), ty); + let mut enc = ValEncoder::new(store.as_context_mut(), ty, guest_resources); enc.encode(v, &mut buf) .with_context(|| format!("failed to encode result value {i}"))?; deferred.push(enc.deferred); @@ -1037,11 +1184,60 @@ where }), ) .await?; + func.post_return_async(&mut store) + .await + .context("failed to perform post-return cleanup")?; Ok(()) } +/// Recursively iterates the component item type and collects all exported resource types +#[instrument(level = "trace", skip_all)] +pub fn collect_item_resources( + engine: &Engine, + ty: types::ComponentItem, + resources: &mut impl Extend, +) { + match ty { + types::ComponentItem::ComponentFunc(_) + | types::ComponentItem::CoreFunc(_) + | types::ComponentItem::Module(_) + | types::ComponentItem::Type(_) => {} + types::ComponentItem::Component(ty) => collect_component_resources(engine, &ty, resources), + types::ComponentItem::ComponentInstance(ty) => { + collect_instance_resources(engine, &ty, resources); + } + types::ComponentItem::Resource(ty) => resources.extend([ty]), + } +} + +/// Recursively iterates the component type and collects all exported resource types +#[instrument(level = "trace", skip_all)] +pub fn collect_instance_resources( + engine: &Engine, + ty: &types::ComponentInstance, + resources: &mut impl Extend, +) { + for (_, ty) in ty.exports(engine) { + collect_item_resources(engine, ty, resources); + } +} + +/// Recursively iterates the component type and collects all exported resource types +#[instrument(level = "trace", skip_all)] +pub fn collect_component_resources( + engine: &Engine, + ty: &types::Component, + resources: &mut impl Extend, +) { + for (_, ty) in ty.exports(engine) { + collect_item_resources(engine, ty, resources); + } +} + pub trait ServeExt: wrpc_transport::Serve { - /// Serve [`types::ComponentFunc`] from an [`InstancePre`] instantiating it on each call + /// Serve [`types::ComponentFunc`] from an [`InstancePre`] instantiating it on each call. + /// This serving method does not support guest-exported resources. + #[instrument(level = "trace", skip(self, store, instance_pre))] fn serve_function( &self, store: impl Fn() -> wasmtime::Store + Send + 'static, @@ -1061,12 +1257,13 @@ pub trait ServeExt: wrpc_transport::Serve { >, > + Send where - T: WasiView + 'static, + T: WasiView + WrpcView + 'static, { + let span = Span::current(); async move { debug!(instance = instance_name, name, "serving function export"); // TODO: set paths - let invocations = self.serve(instance_name, name, []).await?; + let invocations = self.serve(instance_name, rpc_func_name(name), []).await?; let instance_name = Arc::::from(instance_name); let name = Arc::::from(name); let params_ty: Arc<[_]> = ty.params().collect(); @@ -1077,40 +1274,117 @@ pub trait ServeExt: wrpc_transport::Serve { let name = Arc::clone(&name); let params_ty = Arc::clone(¶ms_ty); let results_ty = Arc::clone(&results_ty); + let mut store = store(); ( cx, - Box::pin(async move { - let instance = instance_pre - .instantiate_async(&mut store) - .await - .context("failed to instantiate component")?; - let func = { - let mut instance = instance.exports(&mut store); - if instance_name.is_empty() { - instance.root() - } else { - instance.instance(&instance_name).with_context(|| { - format!("instance export `{instance_name}` not found") - })? + Box::pin( + async move { + let instance = instance_pre + .instantiate_async(&mut store) + .await + .context("failed to instantiate component")?; + let func = { + let mut exports = instance.exports(&mut store); + if instance_name.is_empty() { + exports.root() + } else { + exports.instance(&instance_name).with_context(|| { + format!("instance export `{instance_name}` not found") + })? + } + .func(&name) } - .func(&name) - .with_context(|| format!("function export`{name}` not found"))? - }; - call( - &mut store, - rx, - tx, - params_ty.iter(), - results_ty.iter(), - func, - ) - .await?; - func.post_return_async(&mut store) + .with_context(|| format!("function export`{name}` not found"))?; + call( + &mut store, + rx, + tx, + params_ty.iter(), + results_ty.iter(), + func, + &[], + ) .await - .context("failed to perform post-return cleanup")?; - Ok(()) - }) as Pin + Send + 'static>>, + } + .instrument(span.clone()), + ) as Pin + Send + 'static>>, + ) + })) + } + } + + /// Like [`Self::serve_function`], but with a shared `store` instance. + /// This is required to allow for serving functions, which operate on guest-exported resources. + #[instrument(level = "trace", skip(self, store, instance, guest_resources))] + fn serve_function_shared( + &self, + store: Arc>>, + instance: Instance, + guest_resources: impl Into>, + ty: types::ComponentFunc, + instance_name: &str, + name: &str, + ) -> impl Future< + Output = anyhow::Result< + impl Stream< + Item = anyhow::Result<( + Self::Context, + Pin> + Send + 'static>>, + )>, + > + Send + + 'static, + >, + > + Send + where + T: WasiView + WrpcView + 'static, + { + let span = Span::current(); + let guest_resources = guest_resources.into(); + async move { + let func = { + let mut store = store.lock().await; + let mut exports = instance.exports(&mut *store); + if instance_name.is_empty() { + exports.root() + } else { + exports + .instance(instance_name) + .with_context(|| format!("instance export `{instance_name}` not found"))? + } + .func(name) + } + .with_context(|| format!("function export`{name}` not found"))?; + debug!(instance = instance_name, name, "serving function export"); + // TODO: set paths + let invocations = self.serve(instance_name, rpc_func_name(name), []).await?; + let params_ty: Arc<[_]> = ty.params().collect(); + let results_ty: Arc<[_]> = ty.results().collect(); + let guest_resources = Arc::clone(&guest_resources); + Ok(invocations.map_ok(move |(cx, tx, rx)| { + let params_ty = Arc::clone(¶ms_ty); + let results_ty = Arc::clone(&results_ty); + let guest_resources = Arc::clone(&guest_resources); + let store = Arc::clone(&store); + ( + cx, + Box::pin( + async move { + let mut store = store.lock().await; + call( + &mut *store, + rx, + tx, + params_ty.iter(), + results_ty.iter(), + func, + &guest_resources, + ) + .await?; + Ok(()) + } + .instrument(span.clone()), + ) as Pin + Send + 'static>>, ) })) } diff --git a/crates/transport/Cargo.toml b/crates/transport/Cargo.toml index b798adb8..640f8295 100644 --- a/crates/transport/Cargo.toml +++ b/crates/transport/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wrpc-transport" -version = "0.26.5" +version = "0.26.6" description = "wRPC core transport functionality" authors.workspace = true diff --git a/crates/transport/src/invoke.rs b/crates/transport/src/invoke.rs index 1d8fc04a..13a5b5b9 100644 --- a/crates/transport/src/invoke.rs +++ b/crates/transport/src/invoke.rs @@ -75,6 +75,7 @@ pub trait Invoke: Send + Sync { P: AsRef<[Option]> + Send + Sync; } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct Timeout<'a, T: ?Sized> { pub inner: &'a T, pub timeout: Duration, @@ -85,6 +86,7 @@ impl Invoke for Timeout<'_, T> { type Outgoing = T::Outgoing; type Incoming = T::Incoming; + #[instrument(level = "trace", skip(self, cx, params, paths))] async fn invoke

( &self, cx: Self::Context, @@ -105,6 +107,36 @@ impl Invoke for Timeout<'_, T> { } } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimeoutOwned { + pub inner: T, + pub timeout: Duration, +} + +impl Invoke for TimeoutOwned { + type Context = T::Context; + type Outgoing = T::Outgoing; + type Incoming = T::Incoming; + + #[instrument(level = "trace", skip(self, cx, params, paths))] + async fn invoke

( + &self, + cx: Self::Context, + instance: &str, + func: &str, + params: Bytes, + paths: impl AsRef<[P]> + Send, + ) -> anyhow::Result<(Self::Outgoing, Self::Incoming)> + where + P: AsRef<[Option]> + Send + Sync, + { + self.inner + .timeout(self.timeout) + .invoke(cx, instance, func, params, paths) + .await + } +} + pub trait InvokeExt: Invoke { /// Invoke function `func` on instance `instance` using typed `Params` and `Results` #[instrument(level = "trace", skip(self, cx, params, paths))] @@ -250,12 +282,25 @@ pub trait InvokeExt: Invoke { } } + /// Returns a [`Timeout`], wrapping [Self] with an implementation of [Invoke], which will + /// error, if call to [Self::invoke] does not return within a supplied `timeout` fn timeout(&self, timeout: Duration) -> Timeout<'_, Self> { Timeout { inner: self, timeout, } } + + /// This is like [`InvokeExt::timeout`], but moves [Self] and returns corresponding [`TimeoutOwned`] + fn timeout_owned(self, timeout: Duration) -> TimeoutOwned + where + Self: Sized, + { + TimeoutOwned { + inner: self, + timeout, + } + } } impl InvokeExt for T {} diff --git a/crates/wasmtime-nats-cli/Cargo.toml b/crates/wasmtime-nats-cli/Cargo.toml index c7cce375..a8020316 100644 --- a/crates/wasmtime-nats-cli/Cargo.toml +++ b/crates/wasmtime-nats-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wrpc-wasmtime-nats-cli" -version = "0.3.2" +version = "0.3.3" description = "wRPC Wasmtime NATS CLI" authors.workspace = true @@ -23,6 +23,7 @@ clap = { workspace = true, features = [ "usage", ] } futures = { workspace = true } +humantime = { workspace = true } reqwest = { workspace = true } tokio = { workspace = true, features = ["fs"] } tokio-util = { workspace = true, features = ["codec"] } diff --git a/crates/wasmtime-nats-cli/src/lib.rs b/crates/wasmtime-nats-cli/src/lib.rs index d1e85788..f0c1166e 100644 --- a/crates/wasmtime-nats-cli/src/lib.rs +++ b/crates/wasmtime-nats-cli/src/lib.rs @@ -1,23 +1,32 @@ +#![allow(clippy::type_complexity)] + use core::pin::pin; +use core::time::Duration; + use std::sync::Arc; use anyhow::{anyhow, bail, Context as _}; use clap::Parser; use futures::StreamExt as _; use tokio::fs; +use tokio::sync::Mutex; use tokio::task::JoinSet; -use tracing::{error, info, instrument, trace, warn, Instrument as _}; +use tracing::{error, info, instrument, warn, Instrument as _, Span}; use url::Url; use wasmcloud_component_adapters::{ WASI_PREVIEW1_COMMAND_COMPONENT_ADAPTER, WASI_PREVIEW1_REACTOR_COMPONENT_ADAPTER, }; -use wasmtime::component::{types, Component, InstancePre, Linker}; -use wasmtime::Store; +use wasmtime::component::{types, Component, InstancePre, Linker, ResourceType}; +use wasmtime::{Engine, Store}; use wasmtime_wasi::{ResourceTable, WasiCtxBuilder}; use wasmtime_wasi::{WasiCtx, WasiView}; -use wrpc_runtime_wasmtime::{link_instance, ServeExt as _, WrpcView}; +use wrpc_runtime_wasmtime::{ + collect_component_resources, link_item, ServeExt as _, SharedResourceTable, WrpcView, +}; use wrpc_transport::Invoke; +const DEFAULT_TIMEOUT: &str = "10s"; + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] enum Command { @@ -32,6 +41,10 @@ struct RunArgs { #[arg(short, long, default_value = wrpc_cli::nats::DEFAULT_URL)] nats: String, + /// Invocation timeout + #[arg(long, default_value = DEFAULT_TIMEOUT)] + timeout: humantime::Duration, + /// Target prefix to send invocations to target: String, @@ -46,6 +59,10 @@ struct ServeArgs { #[arg(short, long, default_value = wrpc_cli::nats::DEFAULT_URL)] nats: String, + /// Invocation timeout + #[arg(long, default_value = DEFAULT_TIMEOUT)] + timeout: humantime::Duration, + /// NATS queue group to use #[arg(short, long)] group: Option, @@ -69,12 +86,24 @@ pub struct Ctx { pub table: ResourceTable, pub wasi: WasiCtx, pub wrpc: C, + pub shared_resources: SharedResourceTable, + pub timeout: Duration, } -impl WrpcView for Ctx { - fn client(&self) -> &C { +impl WrpcView for Ctx { + type Invoke = C; + + fn client(&self) -> &Self::Invoke { &self.wrpc } + + fn shared_resources(&mut self) -> &mut SharedResourceTable { + &mut self.shared_resources + } + + fn timeout(&self) -> Option { + Some(self.timeout) + } } impl WasiView for Ctx { @@ -86,17 +115,17 @@ impl WasiView for Ctx { } } -#[instrument(level = "trace", skip(cx))] +#[instrument(level = "trace", skip(adapter, cx))] async fn instantiate_pre( adapter: &[u8], cx: C::Context, workload: &str, -) -> anyhow::Result<(InstancePre>, wasmtime::Engine)> +) -> anyhow::Result<(InstancePre>, Engine, Arc<[ResourceType]>)> where C: Invoke, C::Context: Clone + 'static, { - let engine = wasmtime::Engine::new( + let engine = Engine::new( wasmtime::Config::new() .async_support(true) .wasm_component_model(true), @@ -145,30 +174,18 @@ where wasm }; - let component = Component::new(&engine, &wasm).context("failed to compile component")?; + let component = Component::new(&engine, wasm).context("failed to compile component")?; let mut linker = Linker::>::new(&engine); wasmtime_wasi::add_to_linker_async(&mut linker).context("failed to link WASI")?; - let (resolve, world) = - match wit_component::decode(&wasm).context("failed to decode WIT component")? { - wit_component::DecodedWasm::Component(resolve, world) => (resolve, world), - wit_component::DecodedWasm::WitPackages(..) => { - bail!("binary-encoded WIT packages not currently supported") - } - }; - - let wit_parser::World { imports, .. } = resolve - .worlds - .iter() - .find_map(|(id, w)| (id == world).then_some(w)) - .context("component world missing")?; - let ty = component.component_type(); - for (wk, _) in imports { - let instance_name = resolve.name_world_key(wk); + let mut resources = Vec::new(); + collect_component_resources(&engine, &ty, &mut resources); + let resources = Arc::from(resources); + for (name, item) in ty.imports(&engine) { // Avoid polyfilling instances, for which static bindings are linked - match instance_name.as_ref() { + match name { "wasi:cli/environment@0.2.0" | "wasi:cli/exit@0.2.0" | "wasi:cli/stderr@0.2.0" @@ -198,29 +215,15 @@ where | "wasi:sockets/udp@0.2.0" => continue, _ => {} } - let Some(types::ComponentItem::ComponentInstance(instance)) = - ty.get_import(&engine, &instance_name) - else { - trace!( - instance_name, - "component does not import the parsed instance" - ); - continue; - }; - - let mut linker = linker.root(); - let mut linker = match linker.instance(&instance_name) { - Ok(linker) => linker, - Err(err) => { - error!( - ?err, - ?instance_name, - "failed to instantiate interface from root" - ); - continue; - } - }; - if let Err(err) = link_instance(&engine, &mut linker, instance, instance_name, cx.clone()) { + if let Err(err) = link_item( + &engine, + &mut linker.root(), + Arc::clone(&resources), + item, + "", + name, + cx.clone(), + ) { error!(?err, "failed to polyfill instance"); } } @@ -228,10 +231,15 @@ where let pre = linker .instantiate_pre(&component) .context("failed to pre-instantiate component")?; - Ok((pre, engine)) + Ok((pre, engine, resources)) } -fn new_store(engine: &wasmtime::Engine, wrpc: C, arg0: &str) -> wasmtime::Store> { +fn new_store( + engine: &Engine, + wrpc: C, + arg0: &str, + timeout: Duration, +) -> wasmtime::Store> { Store::new( engine, Ctx { @@ -242,7 +250,9 @@ fn new_store(engine: &wasmtime::Engine, wrpc: C, arg0: &str) -> wasmt .args(&[arg0]) .build(), table: ResourceTable::new(), + shared_resources: SharedResourceTable::default(), wrpc, + timeout, }, ) } @@ -251,6 +261,7 @@ fn new_store(engine: &wasmtime::Engine, wrpc: C, arg0: &str) -> wasmt pub async fn handle_run( RunArgs { nats, + timeout, target, ref workload, }: RunArgs, @@ -259,12 +270,13 @@ pub async fn handle_run( .await .context("failed to connect to NATS")?; - let (pre, engine) = + let (pre, engine, _) = instantiate_pre(WASI_PREVIEW1_COMMAND_COMPONENT_ADAPTER, None, workload).await?; let mut store = new_store( &engine, wrpc_transport_nats::Client::new(nats, target, None), "command.wasm", + *timeout, ); let (cmd, _) = wasmtime_wasi::bindings::Command::instantiate_pre(&mut store, &pre) .await @@ -276,28 +288,159 @@ pub async fn handle_run( .map_err(|()| anyhow!("component failed")) } -#[instrument(level = "trace", ret)] -pub async fn handle_serve( - ServeArgs { - nats, - prefix, - target, - group, - ref workload, - }: ServeArgs, +#[instrument(level = "trace", skip_all, ret)] +pub async fn serve_shared( + handlers: &mut JoinSet<()>, + srv: wrpc_transport_nats::Client, + mut store: wasmtime::Store>, + pre: InstancePre>, + guest_resources: Arc<[ResourceType]>, ) -> anyhow::Result<()> { - let nats = wrpc_cli::nats::connect(nats) + let span = Span::current(); + let instance = pre + .instantiate_async(&mut store) .await - .context("failed to connect to NATS")?; - let nats = Arc::new(nats); - - let (pre, engine) = - instantiate_pre(WASI_PREVIEW1_REACTOR_COMPONENT_ADAPTER, None, workload).await?; - - let mut handlers = JoinSet::new(); - let clt = wrpc_transport_nats::Client::new(Arc::clone(&nats), target, None); - let srv = wrpc_transport_nats::Client::new(nats, prefix, group.map(Arc::from)); + .context("failed to instantiate component")?; + let engine = store.engine().clone(); + let store = Arc::new(Mutex::new(store)); for (name, ty) in pre.component().component_type().exports(&engine) { + match (name, ty) { + (name, types::ComponentItem::ComponentFunc(ty)) => { + info!(?name, "serving root function"); + let invocations = srv + .serve_function_shared( + Arc::clone(&store), + instance, + Arc::clone(&guest_resources), + ty, + "", + name, + ) + .await?; + handlers.spawn( + async move { + let mut invocations = pin!(invocations); + while let Some(invocation) = invocations.next().await { + match invocation { + Ok((headers, fut)) => { + info!(?headers, "serving root function invocation"); + if let Err(err) = fut.await { + warn!( + ?headers, + ?err, + "failed to serve root function invocation" + ); + } else { + info!("successfully served root function invocation"); + } + } + Err(err) => { + error!(?err, "failed to accept root function invocation"); + } + } + } + } + .instrument(span.clone()), + ); + } + (_, types::ComponentItem::CoreFunc(_)) => { + warn!(name, "serving root core function exports not supported yet"); + } + (_, types::ComponentItem::Module(_)) => { + warn!(name, "serving root module exports not supported yet"); + } + (_, types::ComponentItem::Component(_)) => { + warn!(name, "serving root component exports not supported yet"); + } + (instance_name, types::ComponentItem::ComponentInstance(ty)) => { + for (name, ty) in ty.exports(&engine) { + match ty { + types::ComponentItem::ComponentFunc(ty) => { + info!(?name, "serving instance function"); + let invocations = srv + .serve_function_shared( + Arc::clone(&store), + instance, + Arc::clone(&guest_resources), + ty, + instance_name, + name, + ) + .await?; + handlers.spawn(async move { + let mut invocations = pin!(invocations); + while let Some(invocation) = invocations.next().await { + match invocation { + Ok((headers, fut)) => { + info!(?headers, "serving instance function invocation"); + if let Err(err) = fut.await { + warn!( + ?headers, + ?err, + "failed to serve instance function invocation" + ); + } else { + info!( + "successfully served instance function invocation" + ); + } + } + Err(err) => { + error!( + ?err, + "failed to accept instance function invocation" + ); + } + } + } + } + .instrument(span.clone())); + } + types::ComponentItem::CoreFunc(_) => { + warn!( + instance_name, + name, "serving instance core function exports not supported yet" + ); + } + types::ComponentItem::Module(_) => { + warn!( + instance_name, + name, "serving instance module exports not supported yet" + ); + } + types::ComponentItem::Component(_) => { + warn!( + instance_name, + name, "serving instance component exports not supported yet" + ); + } + types::ComponentItem::ComponentInstance(_) => { + warn!( + instance_name, + name, "serving nested instance exports not supported yet" + ); + } + types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {} + } + } + } + (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {} + } + } + Ok(()) +} + +#[instrument(level = "trace", skip_all, ret)] +pub async fn serve_stateless( + handlers: &mut JoinSet<()>, + srv: wrpc_transport_nats::Client, + clt: wrpc_transport_nats::Client, + pre: InstancePre>, + engine: &Engine, + timeout: Duration, +) -> anyhow::Result<()> { + let span = Span::current(); + for (name, ty) in pre.component().component_type().exports(engine) { match (name, ty) { (name, types::ComponentItem::ComponentFunc(ty)) => { let clt = clt.clone(); @@ -305,48 +448,50 @@ pub async fn handle_serve( info!(?name, "serving root function"); let invocations = srv .serve_function( - move || new_store(&engine, clt.clone(), "reactor.wasm"), + move || new_store(&engine, clt.clone(), "reactor.wasm", timeout), pre.clone(), ty, "", name, ) .await?; - handlers.spawn(async move { - let mut invocations = pin!(invocations); - while let Some(invocation) = invocations.next().await { - match invocation { - Ok((headers, fut)) => { - info!(?headers, "serving root function invocation"); - if let Err(err) = fut.await { - warn!( - ?headers, - ?err, - "failed to serve root function invocation" - ); - } else { - info!("successfully served root function invocation") + handlers.spawn( + async move { + let mut invocations = pin!(invocations); + while let Some(invocation) = invocations.next().await { + match invocation { + Ok((headers, fut)) => { + info!(?headers, "serving root function invocation"); + if let Err(err) = fut.await { + warn!( + ?headers, + ?err, + "failed to serve root function invocation" + ); + } else { + info!("successfully served root function invocation"); + } + } + Err(err) => { + error!(?err, "failed to accept root function invocation"); } - } - Err(err) => { - error!(?err, "failed to accept root function invocation"); } } } - ().in_current_span() - }); + .instrument(span.clone()), + ); } (_, types::ComponentItem::CoreFunc(_)) => { - bail!("serving root core function exports not supported yet") + warn!(name, "serving root core function exports not supported yet"); } (_, types::ComponentItem::Module(_)) => { - bail!("serving root module exports not supported yet"); + warn!(name, "serving root module exports not supported yet"); } (_, types::ComponentItem::Component(_)) => { - bail!("serving root component exports not supported yet"); + warn!(name, "serving root component exports not supported yet"); } (instance_name, types::ComponentItem::ComponentInstance(ty)) => { - for (name, ty) in ty.exports(&engine) { + for (name, ty) in ty.exports(engine) { match ty { types::ComponentItem::ComponentFunc(ty) => { let clt = clt.clone(); @@ -354,7 +499,9 @@ pub async fn handle_serve( info!(?name, "serving instance function"); let invocations = srv .serve_function( - move || new_store(&engine, clt.clone(), "reactor.wasm"), + move || { + new_store(&engine, clt.clone(), "reactor.wasm", timeout) + }, pre.clone(), ty, instance_name, @@ -376,7 +523,7 @@ pub async fn handle_serve( } else { info!( "successfully served instance function invocation" - ) + ); } } Err(err) => { @@ -387,34 +534,76 @@ pub async fn handle_serve( } } } - ().in_current_span() - }); + }.instrument(span.clone())); } types::ComponentItem::CoreFunc(_) => { - bail!("serving instance core function exports not supported yet") + warn!( + instance_name, + name, "serving instance core function exports not supported yet" + ); } types::ComponentItem::Module(_) => { - bail!("serving instance module exports not supported yet") + warn!( + instance_name, + name, "serving instance module exports not supported yet" + ); } types::ComponentItem::Component(_) => { - bail!("serving instance component exports not supported yet") + warn!( + instance_name, + name, "serving instance component exports not supported yet" + ); } types::ComponentItem::ComponentInstance(_) => { - bail!("serving nested instance exports not supported yet") - } - types::ComponentItem::Type(_) => {} - types::ComponentItem::Resource(_) => { - bail!("serving instance resource exports not supported yet") + warn!( + instance_name, + name, "serving nested instance exports not supported yet" + ); } + types::ComponentItem::Type(_) | types::ComponentItem::Resource(_) => {} } } } - (_, types::ComponentItem::Type(_)) => {} - (_, types::ComponentItem::Resource(_)) => { - bail!("serving root resource exports not supported yet") - } + (_, types::ComponentItem::Type(_) | types::ComponentItem::Resource(_)) => {} } } + Ok(()) +} + +#[instrument(level = "trace", ret)] +pub async fn handle_serve( + ServeArgs { + nats, + timeout, + prefix, + target, + group, + ref workload, + }: ServeArgs, +) -> anyhow::Result<()> { + let nats = wrpc_cli::nats::connect(nats) + .await + .context("failed to connect to NATS")?; + let nats = Arc::new(nats); + + let (pre, engine, guest_resources) = + instantiate_pre(WASI_PREVIEW1_REACTOR_COMPONENT_ADAPTER, None, workload).await?; + + let clt = wrpc_transport_nats::Client::new(Arc::clone(&nats), target, None); + let srv = wrpc_transport_nats::Client::new(nats, prefix, group.map(Arc::from)); + let mut handlers = JoinSet::new(); + if guest_resources.is_empty() { + serve_stateless(&mut handlers, srv, clt, pre, &engine, *timeout).await?; + } else { + serve_shared( + &mut handlers, + srv, + new_store(&engine, clt, "reactor.wasm", *timeout), + pre, + guest_resources, + ) + .await?; + } while let Some(res) = handlers.join_next().await { if let Err(err) = res { error!(?err, "handler failed"); diff --git a/examples/rust/complex-component-client/src/main.rs b/examples/rust/complex-component-client/src/main.rs index 5bff41de..b94571b8 100644 --- a/examples/rust/complex-component-client/src/main.rs +++ b/examples/rust/complex-component-client/src/main.rs @@ -11,6 +11,7 @@ mod bindings { fn main() { let resource = resources::Foo::new(); - println!("foo.bar: {}", resource.bar()); println!("resources.bar: {}", resources::bar(&resource)); + println!("foo.bar: {}", resource.bar()); + println!("foo.foo: {}", resources::Foo::foo(resource)); } diff --git a/examples/rust/complex-component-server/Cargo.toml b/examples/rust/complex-component-server/Cargo.toml new file mode 100644 index 00000000..1e81e407 --- /dev/null +++ b/examples/rust/complex-component-server/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "complex-component-server" +version = "0.1.0" + +authors.workspace = true +categories.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wit-bindgen = { workspace = true, features = ["realloc", "macros"] } diff --git a/examples/rust/complex-component-server/src/lib.rs b/examples/rust/complex-component-server/src/lib.rs new file mode 100644 index 00000000..48604f5e --- /dev/null +++ b/examples/rust/complex-component-server/src/lib.rs @@ -0,0 +1,40 @@ +use bindings::exports::wrpc_examples::complex::resources::FooBorrow; + +#[allow(clippy::missing_safety_doc)] +mod bindings { + use crate::Handler; + + wit_bindgen::generate!({ + with: { + "wrpc-examples:complex/resources": generate + } + }); + + export!(Handler); +} + +pub struct Handler; + +pub struct Foo; + +impl bindings::exports::wrpc_examples::complex::resources::GuestFoo for Foo { + fn new() -> Self { + Self + } + + fn foo(_: bindings::exports::wrpc_examples::complex::resources::Foo) -> String { + "foo".to_string() + } + + fn bar(&self) -> String { + "bar".to_string() + } +} + +impl bindings::exports::wrpc_examples::complex::resources::Guest for Handler { + type Foo = Foo; + + fn bar(_: FooBorrow<'_>) -> String { + "bar".to_string() + } +} diff --git a/examples/rust/complex-component-server/wit/deps.lock b/examples/rust/complex-component-server/wit/deps.lock new file mode 100644 index 00000000..389d6c59 --- /dev/null +++ b/examples/rust/complex-component-server/wit/deps.lock @@ -0,0 +1,4 @@ +[complex] +path = "../../../wit/complex" +sha256 = "d47aff89928fe5bb30ddc79a4bae79a5af6fa075e8694953567544fefa507048" +sha512 = "2de2d6fccb3bde7f9c163890993b53191cc399fb16e28742dbd25f449fc1d8a2e0766a101d0cf4c252be07e2aa3c892daba876b009a0bd33e3c5dd5f74f790da" diff --git a/examples/rust/complex-component-server/wit/deps.toml b/examples/rust/complex-component-server/wit/deps.toml new file mode 100644 index 00000000..c23b5845 --- /dev/null +++ b/examples/rust/complex-component-server/wit/deps.toml @@ -0,0 +1 @@ +complex = "../../../wit/complex" diff --git a/examples/rust/complex-component-server/wit/deps/complex/complex.wit b/examples/rust/complex-component-server/wit/deps/complex/complex.wit new file mode 100644 index 00000000..4464ddac --- /dev/null +++ b/examples/rust/complex-component-server/wit/deps/complex/complex.wit @@ -0,0 +1,19 @@ +package wrpc-examples:complex; + +interface resources { + resource foo { + constructor(); + foo: static func(v: foo) -> string; + bar: func() -> string; + } + + bar: func(v: borrow) -> string; +} + +world client { + import resources; +} + +world server { + export resources; +} diff --git a/examples/rust/complex-component-server/wit/world.wit b/examples/rust/complex-component-server/wit/world.wit new file mode 100644 index 00000000..5a4e9a86 --- /dev/null +++ b/examples/rust/complex-component-server/wit/world.wit @@ -0,0 +1,5 @@ +package wrpc-examples:complex-component-server; + +world server { + include wrpc-examples:complex/server; +}