WTX
A collection of different transport implementations and related tools focused primarily on web technologies. Contains the implementations of 6 IETF RFCs (6265, 6455, 7541, 7692, 8441, 9113), 2 formal specifications (gRPC, PostgreSQL) and several other invented ideas.
- Client API Framework
- Database Client
- Database Schema Manager
- gRPC Client/Server
- HTTP Client Framework
- HTTP Server Framework
- HTTP/2 Client/Server
- Pool Manager
- UI tools
- WebSocket Client/Server
- WebSocket over HTTP/2
Every feature is optional and must be set at compile time. Please see the intended documentation for further information.
Embedded devices with a working heap allocator can use this no_std
crate.
Performance
Many things that generally improve performance are used in the project, to name a few:
- Manual vectorization: When an algorithm is known for processing large amounts of data, several experiments are performed to analyze the best way to split loops in order to allow the compiler to take advantage of SIMD instructions in x86 processors.
- Memory allocation: Whenever possible, all heap allocations are called only once at the start of an instance creation and additionally, stack memory usage is preferably prioritized over heap memory.
- Fewer dependencies: No third-party is injected by default. In other words, additional dependencies are up to the user through the selection of Cargo features, which decreases compilation times. For example, you can see the mere 16 dependencies required by the PostgreSQL client using
cargo tree -e normal --features postgres
.
Since memory are usually held at the instance level instead of being created and dropped on the fly, it is worth noting that its usage can growth significantly depending on the use-case. If appropriated, try using a shared pool of resources or try limiting how much data can be exchanged between parties.
High-level benchmarks
Checkout wtx-bench to see a variety of benchmarks or feel free to point any misunderstandings or misconfigurations.
There are mainly 2 things that impact performance, the chosen runtime and the number of pre-allocated bytes. Specially for servers that have to create a new instance for each handshake, pre-allocating a high number of bytes for short-lived or low-transfer connections can have a negative impact.
Low-level benchmarks
Anything marked with #[bench]
in the repository is considered a low-level benchmark in the sense that they measure very specific operations that generally serve as the basis for other parts.
Take a look at https://bencher.dev/perf/wtx to see all low-level benchmarks over different periods of time.
Transport Layer Security (TLS)
When using a feature that requires network connection, it is often necessary to perform encrypted communication and since wtx
is not hard-coded with a specific stream implementation, it is up to you to choose the best TLS provider.
Some utilities like TokioRustlsConnector
or TokioRustlsAcceptor
are available to make things more convenient but keep in mind that it is still necessary to activate a crate that provides certificates for client usage.
Examples
Demonstrations of different use-cases can be found in the wtx-instances
directory as well as in the documentation.
Limitations
-
Does not support systems with a pointer length of 16 bits.
-
Expects the infallible sum of the lengths of an arbitrary number of slices, otherwise the program will likely trigger an overflow that can possibly result in unexpected operations. For example, in a 32bit system such a scenario should be viable without swap memory or through specific limiters like
ulimit
.
Client API Framework
A flexible client API framework for writing asynchronous, fast, organizable, scalable and maintainable applications. Supports several data formats, transports and custom parameters.
Checkout the wtx-apis
project to see a collection of APIs based on wtx
.
To use this functionality, it is necessary to activate the client-api-framework
feature.
Objective
It is possible to directly decode responses using built-in methods provided by some transport implementations like reqwest
or surf
but as complexity grows, the cost of maintaining large sets of endpoints with ad-hoc solutions usually becomes unsustainable. Based on this scenario, wtx
comes into play to organize and centralize data flow in a well-defined manner to increase productivity and maintainability.
For API consumers, the calling convention of wtx
endpoints is based on fluent interfaces which makes the usage more pleasant and intuitive.
Moreover, the project may in the future create automatic bindings for other languages in order to avoid having duplicated API repositories.
Example
//! Illustrates how the `client-api-framework` feature facilitates the management and utilization //! of large API endpoints for both HTTP and WebSocket requests. //! //! Contains one API called `GenericThrottlingApi` and its two endpoints: an HTTP JSON-RPC //! `genericHttpRequest` and an WebSocket `genericWebSocketSubscription`. //! //! Everything that is not inside `main` should be constructed only once in your program. extern crate serde; extern crate tokio; extern crate wtx; extern crate wtx_macros; use core::time::Duration; use tokio::net::TcpStream; use wtx::{ client_api_framework::{ misc::{Pair, RequestLimit, RequestThrottling}, network::{transport::Transport, HttpParams, WsParams}, Api, }, data_transformation::dnsn::SerdeJson, http::client_framework::ClientFrameworkTokio, misc::{simple_seed, Uri, Xorshift64}, web_socket::{WebSocketBuffer, WebSocketClient}, }; wtx::create_packages_aux_wrapper!(); #[derive(Debug)] #[wtx_macros::api_params(pkgs_aux(PkgsAux), transport(http, ws))] pub struct GenericThrottlingApi { pub rt: RequestThrottling, } impl Api for GenericThrottlingApi { type Error = wtx::Error; async fn before_sending(&mut self) -> Result<(), Self::Error> { self.rt.rc.update_params(&self.rt.rl).await?; Ok(()) } } #[wtx_macros::pkg( api(crate::GenericThrottlingApi), data_format(json_rpc("genericHttpRequest")), transport(http) )] mod generic_http_request { #[pkg::aux] impl<A, DRSR> crate::HttpPkgsAux<A, DRSR> {} #[derive(Debug, serde::Serialize)] #[pkg::req_data] pub struct GenericHttpRequestReq(#[pkg::field(name = "generic_number")] i32); #[pkg::res_data] pub type GenericHttpRequestRes = (u8, u16, u32); } #[wtx_macros::pkg( api(crate::GenericThrottlingApi), data_format(json_rpc("genericWebSocketSubscription")), transport(ws) )] mod generic_web_socket_subscription { #[pkg::aux] impl<A, DRSR> crate::WsPkgsAux<A, DRSR> {} #[derive(Debug, serde::Serialize)] #[pkg::req_data] pub struct GenericWebSocketSubscriptionReq<'str> { generic_string: &'str str, #[serde(skip_serializing_if = "Option::is_none")] generic_number: Option<i32>, } #[pkg::res_data] pub type GenericWebSocketSubscriptionRes = u64; } #[tokio::main] async fn main() -> wtx::Result<()> { async fn http_pair( ) -> Pair<PkgsAux<GenericThrottlingApi, SerdeJson, HttpParams>, ClientFrameworkTokio> { Pair::new( PkgsAux::from_minimum( GenericThrottlingApi { rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))), }, SerdeJson, HttpParams::from_uri("ws://generic_web_socket_uri.com".into()), ), ClientFrameworkTokio::tokio(1).build(), ) } async fn web_socket_pair() -> wtx::Result< Pair< PkgsAux<GenericThrottlingApi, SerdeJson, WsParams>, WebSocketClient<(), TcpStream, WebSocketBuffer>, >, > { let uri = Uri::new("ws://generic_web_socket_uri.com"); let web_socket = WebSocketClient::connect( (), [], false, Xorshift64::from(simple_seed()), TcpStream::connect(uri.hostname_with_implied_port()).await?, &uri, WebSocketBuffer::default(), |_| wtx::Result::Ok(()), ) .await?; Ok(Pair::new( PkgsAux::from_minimum( GenericThrottlingApi { rt: RequestThrottling::from_rl(RequestLimit::new(40, Duration::from_secs(2))), }, SerdeJson, WsParams::default(), ), web_socket, )) } let mut hp = http_pair().await; let _http_response_tuple = hp .trans .send_recv_decode_contained( &mut hp.pkgs_aux.generic_http_request().data(123).build(), &mut hp.pkgs_aux, ) .await? .result?; let mut wsp = web_socket_pair().await?; let _web_socket_subscription_id = wsp .trans .send_recv_decode_contained( &mut wsp.pkgs_aux.generic_web_socket_subscription().data("Hello", None).build(), &mut wsp.pkgs_aux, ) .await? .result?; Ok(()) }
Client Connection
Provides a set of functions that establish connections, execute queries and manage data transactions with different databases.
At the current time PostgreSQL is the only supported database. Implements https://www.postgresql.org/docs/16/protocol.html.
More benchmarks are available at https://github.com/diesel-rs/metrics.
To use this functionality, it is necessary to activate the postgres
feature.
Example
//! Demonstrates different interactions with a PostgreSQL database. //! //! This snippet requires ~40 dependencies and has an optimized binary size of ~600K. extern crate tokio; extern crate wtx; extern crate wtx_instances; use wtx::database::{Executor as _, Record, Records, TransactionManager}; #[tokio::main] async fn main() -> wtx::Result<()> { let uri = "postgres://USER:PASSWORD@localhost/DATABASE"; let mut executor = wtx_instances::executor_postgres(&uri).await?; let mut tm = executor.transaction().await?; tm.executor().execute("CREATE TABLE IF NOT EXISTS example(id INT, name VARCHAR)", |_| {}).await?; let _ = tm .executor() .execute_with_stmt("INSERT INTO foo VALUES ($1, $2), ($3, $4)", (1u32, "one", 2u32, "two")) .await?; tm.commit().await?; let records = executor .fetch_many_with_stmt("SELECT id, name FROM example;", (), |_| Ok::<_, wtx::Error>(())) .await?; assert_eq!(records.get(0).as_ref().and_then(|record| record.decode("id").ok()), Some(1)); assert_eq!(records.get(1).as_ref().and_then(|record| record.decode("name").ok()), Some("two")); Ok(()) }
Schema Management
Embedded and CLI workflows using raw SQL commands. A schema manager is a tool thats allows developers to define, track and apply changes to database structures over time, ensuring consistency across different environments.
To use this functionality, it is necessary to activate the schema-manager
feature.
CLI
# Example
cargo install --git https://github.com/c410-f3r/wtx --features schema-manager-dev wtx-ui
echo DATABASE_URI="postgres://USER:PASSWORD@localhost:5432/DATABASE" > .env
RUST_LOG=debug wtx-cli migrate
The CLI application expects a configuration file that contains a set of paths where each path is a directory with multiple migrations.
# wtx.toml
migration_groups = [
"migrations/1__initial",
"migrations/2__fancy_stuff"
]
Each provided migration and group must contain an unique version and a name summarized by the following structure:
// Execution order of migrations is dictated by their numeric declaration order.
migrations
+-- 1__initial (Group)
+-- 1__create_author.sql (Migration)
+-- 2__create_post.sql (Migration)
+-- 2__fancy_stuff (Group)
+-- 1__something_fancy.sql (Migration)
wtx.toml
The SQL file itself is composed by two parts, one for migrations (-- wtx IN
section) and another for rollbacks (-- wtx OUT
section).
-- wtx IN
CREATE TABLE author (
id INT NOT NULL PRIMARY KEY,
added TIMESTAMP NOT NULL,
birthdate DATE NOT NULL,
email VARCHAR(100) NOT NULL,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL
);
-- wtx OUT
DROP TABLE author;
One cool thing about the expected file configuration is that it can also be divided into smaller pieces, for example, the above migration could be transformed into 1__author_up.sql
and 1__author_down.sql
.
-- 1__author_up.sql
CREATE TABLE author (
id INT NOT NULL PRIMARY KEY,
added TIMESTAMP NOT NULL,
birthdate DATE NOT NULL,
email VARCHAR(100) NOT NULL,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL
);
-- 1__author_down.sql
DROP TABLE author;
migrations
+-- 1__some_group (Group)
+-- 1__author (Migration directory)
+-- 1__author_down.sql (Down migration)
+-- 1__author_up.sql (Up migration)
+-- 1__author.toml (Optional configuration)
wtx.toml
Library
The library gives freedom to arrange groups and uses some external crates, bringing ~10 additional dependencies into your application. If this overhead is not acceptable, then you probably should discard the library and use the CLI binary instead as part of a custom deployment strategy.
extern crate tokio; extern crate wtx; use std::path::Path; use wtx::database::{schema_manager::Commands, DEFAULT_URI_VAR}; use wtx::misc::Vector; #[tokio::main] async fn main() { let mut commands = Commands::with_executor(()); commands .migrate_from_dir( (&mut String::default(), &mut Vector::default()), Path::new("my_custom_migration_group_path"), ) .await .unwrap(); }
Embedded migrations
To make deployment easier, the final binary of your application can embed all necessary migrations through the binary that is available in the wtx-ui
crate.
#![allow(unused)] fn main() { extern crate wtx; // This is an example! The actual contents are filled by the `wtx-ui embed-migrations` binary call. mod embedded_migrations { pub(crate) const GROUPS: wtx::database::schema_manager::EmbeddedMigrationsTy = &[]; } use wtx::database::schema_manager::Commands; use wtx::misc::Vector; async fn migrate() -> wtx::Result<()> { Commands::with_executor(()) .migrate_from_groups((&mut String::new(), &mut Vector::new()), embedded_migrations::GROUPS) .await } }
Conditional migrations
If one particular migration needs to be executed in a specific set of databases, then it is possible to use the -- wtx dbs
parameter in a file.
-- wtx dbs mssql,postgres
-- wtx IN
CREATE SCHEMA foo;
-- wtx OUT
DROP SCHEMA foo;
Repeatable migrations
Repeatability can be specified with -- wtx repeatability SOME_VALUE
where SOME_VALUE
can be either always
(regardless of the checksum) or on-checksum-change
(runs only when the checksums changes).
-- wtx dbs postgres
-- wtx repeatability always
-- wtx IN
CREATE OR REPLACE PROCEDURE something() LANGUAGE SQL AS $$ $$
-- wtx OUT
DROP PROCEDURE something();
Keep in mind that repeatable migrations might break subsequent operations, therefore, you must known what you are doing. If desirable, they can be separated into dedicated groups.
migrations/1__initial_repeatable_migrations
migrations/2__normal_migrations
migrations/3__final_repeatable_migrations
Namespaces/Schemas
For supported databases, there is no direct user parameter that inserts migrations inside a single database schema but it is possible to specify the schema inside the SQL file and arrange the migration groups structure in a way that most suits you.
-- wtx IN
CREATE TABLE cool_department_schema.author (
id INT NOT NULL PRIMARY KEY,
full_name VARCHAR(50) NOT NULL
);
-- wtx OUT
DROP TABLE cool_department_schema.author;
gRPC
Basic implementation that currently only supports unary calls. gRPC is an high-performance remote procedure call framework developed by Google that enables efficient communication between distributed systems, particularly in microservices architectures.
wtx
does not provide built-in deserialization or serialization utilities capable of manipulate protobuf files. Instead, users are free to choose any third-party that generates Rust bindings and implements the internal Deserialize
and Serialize
traits.
Due to the lack of an official parser, the definitions of a Service
must be manually typed.
Independent benchmarks are available https://github.com/LesnyRumcajs/grpc_bench.
To use this functionality, it is necessary to activate the grpc
feature.
Client Example
//! gRPC client that uses the structure definitions found in the `wtx_instances::grpc_bindings` //! module. //! //! This snippet requires ~40 dependencies and has an optimized binary size of ~700K. extern crate tokio; extern crate wtx; extern crate wtx_instances; use std::borrow::Cow; use wtx::{ data_transformation::dnsn::QuickProtobuf, grpc::Client, http::{client_framework::ClientFramework, ReqResBuffer, ReqResData}, }; use wtx_instances::grpc_bindings::wtx::{GenericRequest, GenericResponse}; #[tokio::main] async fn main() -> wtx::Result<()> { let mut client = Client::new(ClientFramework::tokio(1).build(), QuickProtobuf); let mut rrb = ReqResBuffer::empty(); rrb.uri.reset(format_args!("http://127.0.0.1:9000"))?; let res = client .send_unary_req( ("wtx", "GenericService", "generic_method"), GenericRequest { generic_request_field0: Cow::Borrowed(b"generic_request_value"), generic_request_field1: 123, }, rrb, ) .await?; let generic_response: GenericResponse = client.des_from_res_bytes(res.rrd.body())?; println!("{:?}", generic_response); Ok(()) }
Server Example
//! gRPC server that uses the structure definitions found in the `wtx_instances::grpc_bindings` //! module. extern crate tokio; extern crate wtx; extern crate wtx_instances; use std::borrow::Cow; use wtx::{ data_transformation::dnsn::QuickProtobuf, grpc::{GrpcManager, GrpcMiddleware}, http::{ server_framework::{post, Router, ServerFrameworkBuilder, State}, ReqResBuffer, StatusCode, }, misc::{simple_seed, Xorshift64}, }; use wtx_instances::grpc_bindings::wtx::{GenericRequest, GenericResponse}; #[tokio::main] async fn main() -> wtx::Result<()> { let router = Router::new( wtx::paths!(("wtx.GenericService/generic_method", post(wtx_generic_service_generic_method))), GrpcMiddleware, )?; ServerFrameworkBuilder::new(router) .with_req_aux(|| QuickProtobuf::default()) .tokio_rustls( (wtx_instances::CERT, wtx_instances::KEY), &wtx_instances::host_from_args(), Xorshift64::from(simple_seed()), |error| eprintln!("{error}"), |_| Ok(()), ) .await } async fn wtx_generic_service_generic_method( state: State<'_, (), GrpcManager<QuickProtobuf>, ReqResBuffer>, ) -> wtx::Result<StatusCode> { let _generic_request: GenericRequest = state.stream_aux.des_from_req_bytes(&state.req.rrd.body)?; state.req.rrd.clear(); state.stream_aux.ser_to_res_bytes( &mut state.req.rrd.body, GenericResponse { generic_response_field0: Cow::Borrowed(b"generic_response_value"), generic_response_field1: 321, }, )?; Ok(StatusCode::Ok) }
HTTP/2
Implementation of RFC7541 and RFC9113. HTTP/2 is the second major version of the Hypertext Transfer Protocol, introduced in 2015 to improve web performance, it addresses limitations of HTTP/1.1 while maintaining backwards compatibility.
Passes the hpack-test-case
and the h2spec
test suites. Due to official deprecation, prioritization is not supported and due to the lack of third-party support, server-push is also not supported.
To use this functionality, it is necessary to activate the http2
feature.
Client Example
//! Fetches an URI using low-level HTTP/2 resources. //! //! This snippet requires ~25 dependencies and has an optimized binary size of ~700K. extern crate tokio; extern crate wtx; extern crate wtx_instances; use tokio::net::TcpStream; use wtx::{ http::{Method, ReqResBuffer, Request}, http2::{Http2Buffer, Http2ErrorCode, Http2Params, Http2Tokio}, misc::{from_utf8_basic, simple_seed, Uri, Xorshift64}, }; #[tokio::main] async fn main() -> wtx::Result<()> { let uri = Uri::new("http://www.example.com"); let (frame_reader, mut http2) = Http2Tokio::connect( Http2Buffer::new(Xorshift64::from(simple_seed())), Http2Params::default(), TcpStream::connect(uri.hostname_with_implied_port()).await?.into_split(), ) .await?; let _jh = tokio::spawn(frame_reader); let rrb = ReqResBuffer::empty(); let mut stream = http2.stream().await?; stream.send_req(Request::http2(Method::Get, b"Hello!"), &uri.to_ref()).await?; let (_, res_rrb) = stream.recv_res(rrb).await?; stream.common().clear(false).await?; println!("{}", from_utf8_basic(&res_rrb.body)?); http2.send_go_away(Http2ErrorCode::NoError).await; Ok(()) }
Server Example
//! HTTP/2 server that uses optioned parameters. extern crate tokio; extern crate tokio_rustls; extern crate wtx; extern crate wtx_instances; use tokio::{io::WriteHalf, net::TcpStream}; use tokio_rustls::server::TlsStream; use wtx::{ http::{ is_web_socket_handshake, AutoStream, ManualServerStreamTokio, OperationMode, OptionedServer, ReqResBuffer, Response, StatusCode, }, http2::{Http2Buffer, Http2Params, WebSocketOverStream}, misc::{simple_seed, TokioRustlsAcceptor, Vector, Xorshift64}, web_socket::{Frame, OpCode}, }; #[tokio::main] async fn main() -> wtx::Result<()> { OptionedServer::http2_tokio( &wtx_instances::host_from_args(), auto, || { Ok(( (), Http2Buffer::new(Xorshift64::from(simple_seed())), Http2Params::default() .set_enable_connect_protocol(true) .set_max_hpack_len((128 * 1024, 128 * 1024)), )) }, |error| eprintln!("{error}"), manual, |_, protocol, req, _| { Ok(( (), if is_web_socket_handshake(&mut req.rrd.headers, req.method, protocol) { OperationMode::Manual } else { OperationMode::Auto }, )) }, || Ok((Vector::new(), ReqResBuffer::empty())), ( || { TokioRustlsAcceptor::without_client_auth() .http2() .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY) }, |acceptor| acceptor.clone(), |acceptor, stream| async move { Ok(tokio::io::split(acceptor.accept(stream).await?)) }, ), ) .await } async fn auto( _: (), mut ha: AutoStream<(), Vector<u8>>, ) -> Result<Response<ReqResBuffer>, wtx::Error> { ha.req.rrd.clear(); Ok(ha.req.into_response(StatusCode::Ok)) } async fn manual( _: (), mut hm: ManualServerStreamTokio<(), Http2Buffer, Vector<u8>, WriteHalf<TlsStream<TcpStream>>>, ) -> Result<(), wtx::Error> { let rng = Xorshift64::from(simple_seed()); hm.req.rrd.headers.clear(); let mut wos = WebSocketOverStream::new(&hm.req.rrd.headers, false, rng, hm.stream).await?; loop { let mut frame = wos.read_frame(&mut hm.stream_aux).await?; match (frame.op_code(), frame.text_payload()) { (_, Some(elem)) => println!("{elem}"), (OpCode::Close, _) => break, _ => {} } wos.write_frame(&mut Frame::new_fin(OpCode::Text, frame.payload_mut())).await?; } wos.close().await?; Ok(()) }
HTTP Client Framework
High-level pool of HTTP clients that currently only supports HTTP/2. Allows multiple connections that can be referenced in concurrent scenarios.
To use this functionality, it is necessary to activate the http-client-framework
feature.
Example
//! Fetches and prints the response body of a provided URI. //! //! This snippet requires ~25 dependencies and has an optimized binary size of ~700K. //! //! Currently, only HTTP/2 is supported. extern crate tokio; extern crate wtx; extern crate wtx_instances; use wtx::{ http::{client_framework::ClientFramework, Method, ReqResBuffer}, misc::{from_utf8_basic, Uri}, }; #[tokio::main] async fn main() -> wtx::Result<()> { let uri = Uri::new("http://www.example.com"); let buffer = ReqResBuffer::empty(); let client = ClientFramework::tokio(1).build(); let res = client.send(Method::Get, buffer, &uri.to_ref()).await?; println!("{}", from_utf8_basic(&res.rrd.body)?); Ok(()) }
HTTP Server Framework
A small and fast to compile framework that can interact with many built-in features.
- Databases
- JSON
- Middlewares
- Streaming
- URI router
- WebSocket
If dynamic or nested routes are needed, then please activate the matchit
feature. Without it, only simple and flat routes will work.
To use this functionality, it is necessary to activate the http-server-framework
feature.
Example
//! An HTTP server framework showcasing nested routes, middlewares, manual streams, dynamic routes, //! PostgreSQL connections and JSON deserialization/serialization. //! //! Currently, only HTTP/2 is supported. //! //! This snippet requires ~50 dependencies and has an optimized binary size of ~900K. extern crate serde; extern crate tokio; extern crate wtx; extern crate wtx_instances; use core::{fmt::Write, ops::ControlFlow}; use tokio::net::{tcp::OwnedWriteHalf, TcpStream}; use wtx::{ database::{Executor, Record}, http::{ server_framework::{ get, post, Middleware, PathOwned, Router, SerdeJson, ServerFrameworkBuilder, StateClean, }, ManualStream, ReqResBuffer, Request, Response, StatusCode, }, http2::{Http2Buffer, Http2DataTokio, Http2ErrorCode, ServerStream}, misc::{simple_seed, Xorshift64}, pool::{PostgresRM, SimplePoolTokio}, }; type Pool = SimplePoolTokio<PostgresRM<wtx::Error, TcpStream>>; #[tokio::main] async fn main() -> wtx::Result<()> { let router = Router::paths(wtx::paths!( ("/db/{id}", get(db)), ("/json", post(json)), ( "/say", Router::new(wtx::paths!(("/hello", get(hello)), ("/world", get(world))), CustomMiddleware,)?, ), ("/stream", get(stream)), ))?; let rm = PostgresRM::tokio("postgres://USER:PASSWORD@localhost/DB_NAME".into()); let pool = Pool::new(4, rm); ServerFrameworkBuilder::new(router) .with_req_aux(move || pool.clone()) .tokio( &wtx_instances::host_from_args(), Xorshift64::from(simple_seed()), |error| eprintln!("{error:?}"), |_| Ok(()), ) .await } #[derive(serde::Deserialize)] struct DeserializeExample { _foo: i32, _bar: u64, } #[derive(serde::Serialize)] struct SerializeExample { _baz: [u8; 4], } async fn db( state: StateClean<'_, (), Pool, ReqResBuffer>, PathOwned(id): PathOwned<u32>, ) -> wtx::Result<StatusCode> { let mut lock = state.stream_aux.get().await?; let record = lock.fetch_with_stmt("SELECT name FROM persons WHERE id = $1", (id,)).await?; let name = record.decode::<_, &str>(0)?; state.req.rrd.body.write_fmt(format_args!("Person of id `1` has name `{name}`"))?; Ok(StatusCode::Ok) } async fn hello() -> &'static str { "hello" } async fn json(_: SerdeJson<DeserializeExample>) -> wtx::Result<SerdeJson<SerializeExample>> { Ok(SerdeJson(SerializeExample { _baz: [1, 2, 3, 4] })) } async fn stream( mut manual_stream: ManualStream< (), ServerStream<Http2DataTokio<Http2Buffer, OwnedWriteHalf, false>>, Pool, >, ) -> wtx::Result<()> { manual_stream.stream.common().send_go_away(Http2ErrorCode::NoError).await; Ok(()) } async fn world() -> &'static str { "world" } struct CustomMiddleware; impl Middleware<(), wtx::Error, Pool> for CustomMiddleware { type Aux = (); #[inline] fn aux(&self) -> Self::Aux { () } async fn req( &self, _: &mut (), _: &mut Self::Aux, _: &mut Request<ReqResBuffer>, _: &mut Pool, ) -> wtx::Result<ControlFlow<StatusCode, ()>> { println!("Inspecting request"); Ok(ControlFlow::Continue(())) } async fn res( &self, _: &mut (), _: &mut Self::Aux, _: Response<&mut ReqResBuffer>, _: &mut Pool, ) -> wtx::Result<ControlFlow<StatusCode, ()>> { println!("Inspecting response"); Ok(ControlFlow::Continue(())) } }
Pool
An asynchronous pool of arbitrary objects where each element is dynamically created or re-created when invalid.
Can also be used for database connections, which is quite handy because it enhances the performance of executing commands and alleviates the use of hardware resources.
To use this functionality, it is necessary to activate the pool
feature.
Example
//! Minimal code that shows the creation of a management structure that always yields `123`. extern crate tokio; extern crate wtx; use wtx::pool::{ResourceManager, SimplePoolTokio}; pub struct CustomManager; impl ResourceManager for CustomManager { type CreateAux = (); type Error = wtx::Error; type RecycleAux = (); type Resource = i32; async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> { Ok(123) } async fn is_invalid(&self, _: &Self::Resource) -> bool { false } async fn recycle(&self, _: &Self::RecycleAux, _: &mut Self::Resource) -> Result<(), Self::Error> { Ok(()) } } #[tokio::main] async fn main() -> wtx::Result<()> { let pool = SimplePoolTokio::new(1, CustomManager); let resource = ***pool.get().await?; assert_eq!(resource, 123); Ok(()) }
UI tools
wtx-ui
is a standalone crate intended to allow interactions with the wtx
project through an user interface. At the current time only CLI interfaces are available.
- Embeds SQL migrations for
schema-manager
. Activation feature is calledembed-migrations
. - Runs SQL migrations managed by
schema-manager
. Activation feature is calledschema-manager
orschema-manager-dev
. - Performs very basic WebSocket Client/Server operations. Activation feature is called
web-socket
. - Makes requests to arbitrary URIs mimicking the interface of
cURL
. Activation feature is calledhttp-client
.
WebSocket
Implementation of RFC6455 and RFC7692. WebSocket is a communication protocol that enables full-duplex communication between a client (typically a web browser) and a server over a single TCP connection. Unlike traditional HTTP, which is request-response based, WebSocket allows real-time data exchange without the need for polling.
To use this functionality, it is necessary to activate the web-socket
feature.
Autobahn Reports
Compression
The "permessage-deflate" extension is the only supported compression format and is backed by the zlib-rs
project that performs as well as zlib-ng
.
To get the most performance possible, try compiling your program with RUSTFLAGS='-C target-cpu=native'
to allow zlib-rs
to use more efficient SIMD instructions.
No masking
Although not officially endorsed, the no-masking
parameter described at https://datatracker.ietf.org/doc/html/draft-damjanovic-websockets-nomasking-02 is supported to increase performance. If such a thing is not desirable, please make sure to check the handshake parameters to avoid accidental scenarios.
To make everything work as intended both partys, client and server, need to implement this feature. For example, web browser won't stop masking frames.
Client Example
//! WebSocket CLI client that enables real-time communication by allowing users to send and //! receive messages through typing. //! //! This snippet requires ~35 dependencies and has an optimized binary size of ~550K. extern crate tokio; extern crate wtx; extern crate wtx_instances; use tokio::{ io::{AsyncBufReadExt, BufReader}, net::TcpStream, }; use wtx::{ misc::{simple_seed, Uri, Xorshift64}, web_socket::{Frame, OpCode, WebSocketBuffer, WebSocketClient}, }; #[tokio::main] async fn main() -> wtx::Result<()> { let uri = Uri::new("ws://www.example.com"); let mut ws = WebSocketClient::connect( (), [], false, Xorshift64::from(simple_seed()), TcpStream::connect(uri.hostname_with_implied_port()).await?, &uri.to_ref(), WebSocketBuffer::default(), |_| wtx::Result::Ok(()), ) .await?; let mut buffer = Vec::new(); let mut reader = BufReader::new(tokio::io::stdin()); loop { tokio::select! { frame_rslt = ws.read_frame() => { let frame = frame_rslt?; match (frame.op_code(), frame.text_payload()) { (_, Some(elem)) => println!("{elem}"), (OpCode::Close, _) => break, _ => {} } } read_rslt = reader.read_until(b'\n', &mut buffer) => { let _ = read_rslt?; ws.write_frame(&mut Frame::new_fin(OpCode::Text, &mut buffer)).await?; } } } Ok(()) }
Server Example
//! Serves requests using low-level WebSockets resources along side self-made certificates. extern crate tokio; extern crate tokio_rustls; extern crate wtx; extern crate wtx_instances; use tokio::net::TcpStream; use tokio_rustls::server::TlsStream; use wtx::{ http::OptionedServer, misc::TokioRustlsAcceptor, web_socket::{OpCode, WebSocketBuffer, WebSocketServer}, }; #[tokio::main] async fn main() -> wtx::Result<()> { OptionedServer::web_socket_tokio( &wtx_instances::host_from_args(), None, || {}, |error| eprintln!("{error}"), handle, ( || { TokioRustlsAcceptor::without_client_auth() .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY) }, |acceptor| acceptor.clone(), |acceptor, stream| async move { Ok(acceptor.accept(stream).await?) }, ), ) .await } async fn handle( mut ws: WebSocketServer<(), TlsStream<TcpStream>, &mut WebSocketBuffer>, ) -> wtx::Result<()> { let (mut common, mut reader, mut writer) = ws.parts(); loop { let mut frame = reader.read_frame(&mut common).await?; match frame.op_code() { OpCode::Binary | OpCode::Text => { writer.write_frame(&mut common, &mut frame).await?; } OpCode::Close => break, _ => {} } } Ok(()) }
WebSocket over HTTP/2
At the current time only servers support the handshake procedure defined in RFC8441.
While HTTP/2 inherently supports full-duplex communication, web browsers typically don't expose this functionality directly to developers and that is why WebSocket tunneling over HTTP/2 is important.
- Servers can efficiently handle multiple concurrent streams within a single TCP connection
- Client applications can continue using existing WebSocket APIs without modification
For this particular scenario the no-masking
parameter defined in https://datatracker.ietf.org/doc/html/draft-damjanovic-websockets-nomasking-02 is also supported.
To use this functionality, it is necessary to activate the http2
and web-socket
features.
Example
//! Low level HTTP/2 server that only accepts one WebSocket stream for demonstration purposes. //! //! It is worth noting that the optioned server in the `http2-server` example automatically //! handles WebSocket connections. extern crate tokio; extern crate wtx; extern crate wtx_instances; use core::mem; use tokio::net::TcpListener; use wtx::{ http::{is_web_socket_handshake, Headers, ReqResBuffer}, http2::{Http2Buffer, Http2Params, Http2Tokio, WebSocketOverStream}, misc::{simple_seed, Either, TokioRustlsAcceptor, Vector, Xorshift64}, web_socket::{Frame, OpCode}, }; #[tokio::main] async fn main() -> wtx::Result<()> { let listener = TcpListener::bind(&wtx_instances::host_from_args()).await?; let mut rng = Xorshift64::from(simple_seed()); let (tcp_stream, _) = listener.accept().await?; let acceptor = TokioRustlsAcceptor::without_client_auth() .http2() .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY)?; let (frame_reader, mut http2) = Http2Tokio::accept( Http2Buffer::new(&mut rng), Http2Params::default() .set_enable_connect_protocol(true) .set_max_hpack_len((128 * 1024, 128 * 1024)), tokio::io::split(acceptor.accept(tcp_stream).await?), ) .await?; tokio::spawn(frame_reader); let (mut stream, headers_opt) = match http2 .stream(ReqResBuffer::empty(), |req, protocol| { let rslt = is_web_socket_handshake(&req.rrd.headers, req.method, protocol); rslt.then(|| mem::take(&mut req.rrd.headers)) }) .await? { Either::Left(_) => return Ok(()), Either::Right(elem) => elem, }; let Some(_headers) = headers_opt else { return Ok(()); }; let mut buffer = Vector::new(); let mut wos = WebSocketOverStream::new(&Headers::new(), false, rng, &mut stream).await?; loop { let mut frame = wos.read_frame(&mut buffer).await?; match (frame.op_code(), frame.text_payload()) { (_, Some(elem)) => println!("{elem}"), (OpCode::Close, _) => break, _ => {} } wos.write_frame(&mut Frame::new_fin(OpCode::Text, frame.payload_mut())).await?; } wos.close().await?; stream.common().clear(false).await?; Ok(()) }