WTX

CI crates.io Documentation License Rustc

A collection of different transport implementations and related tools focused primarily on web technologies. Contains the implementations of 5 IETF RFCs (RFC6265, RFC6455, RFC7541, RFC7692, RFC9113), 2 formal specifications (gRPC, PostgreSQL) and several other invented ideas.

  1. Client API Framework
  2. Database Client
  3. Database Schema Manager
  4. gRPC Client/Server
  5. HTTP Client Framework
  6. HTTP Server Framework
  7. HTTP2 Client/Server
  8. Pool Manager
  9. UI tools
  10. WebSocket Client/Server

Embedded devices with a working heap allocator can use this no_std crate.

Performance

Many things that generally improve performance are used in the project, to name a few:

  1. Manual vectorization: When an algorithm is known for processing large amounts of data, several experiments are performed to analyze the best way to split loops in order to allow the compiler to take advantage of SIMD instructions in x86 processors.
  2. Memory allocation: Whenever possible, all heap allocations are called only once at the start of an instance creation and additionally, stack memory usage is preferably prioritized over heap memory.
  3. Fewer dependencies: No third-party is injected by default. In other words, additional dependencies are up to the user through the selection of Cargo features, which decreases compilation times. For example, you can see the mere 16 dependencies required by the PostgreSQL client using cargo tree -e normal --features postgres.

Since memory are usually held at the instance level instead of being created and dropped on the fly, it is worth noting that its usage can growth significantly depending on the use-case. If appropriated, try using a shared pool of resources or try limiting how much data can be exchanged between parties.

High-level benchmarks

Checkout wtx-bench to see a variety of benchmarks or feel free to point any misunderstandings or misconfigurations.

WebSocket Benchmark

There are mainly 2 things that impact performance, the chosen runtime and the number of pre-allocated bytes. Specially for servers that have to create a new instance for each handshake, pre-allocating a high number of bytes for short-lived or low-transfer connections can have a negative impact.

PostgreSQL Benchmark

Low-level benchmarks

Anything marked with #[bench] in the repository is considered a low-level benchmark in the sense that they measure very specific operations that generally serve as the basis for other parts.

Take a look at https://bencher.dev/perf/wtx to see all low-level benchmarks over different periods of time.

Examples

Demonstrations of different use-cases can be found in the wtx-instances directory as well as in the documentation.

Limitations

Does not support systems with 16bit memory addresses and expects the infallible addition of the sizes of 8 allocated chunks of memories, otherwise the program will overflow in certain arithmetic operations involving usize potentially resulting in unexpected operations.

For example, in a 32bit system you can allocate a maximum of 2^29 bytes of memory for at most 8 elements. Such a scenario should be viable with little swap memory due to the likely triggering of the OOM killer or through specific limiters like ulimit.

Possible future features

Client API Framework

A flexible client API framework for writing asynchronous, fast, organizable, scalable and maintainable applications. Supports several data formats, transports and custom parameters.

Activation feature is called client-api-framework. Checkout the wtx-apis project to see a collection of APIs based on wtx.

Objective

It is possible to directly decode responses using built-in methods provided by some transport implementations like reqwest or surf but as complexity grows, the cost of maintaining large sets of endpoints with ad-hoc solutions usually becomes unsustainable. Based on this scenario, wtx comes into play to organize and centralize data flow in a well-defined manner to increase productivity and maintainability.

For API consumers, the calling convention of wtx endpoints is based on fluent interfaces which makes the usage more pleasant and intuitive.

Moreover, the project may in the future create automatic bindings for other languages in order to avoid having duplicated API repositories.

Example

//! Illustrates how the `client-api-framework` feature facilitates the management and utilization
//! of large API endpoints for both HTTP and WebSocket requests.
//!
//! Contains one API called `GenericThrottlingApi` and its two endpoints: a HTTP JSON-RPC
//! `genericHttpRequest` and an WebSocket `genericWebSocketSubscription`.
//!
//! Everything that is not inside `main` should be constructed only once in your program.

extern crate serde;
extern crate tokio;
extern crate wtx;
extern crate wtx_macros;

use core::time::Duration;
use tokio::net::TcpStream;
use wtx::{
  client_api_framework::{
    misc::{Pair, RequestLimit, RequestThrottling},
    network::{transport::Transport, HttpParams, WsParams},
    Api,
  },
  data_transformation::dnsn::SerdeJson,
  http::client_framework::ClientFrameworkTokio,
  misc::{NoStdRng, Uri},
  web_socket::{FrameBufferVec, HeadersBuffer, WebSocketBuffer, WebSocketClient},
};

wtx::create_packages_aux_wrapper!();

#[derive(Debug)]
#[wtx_macros::api_params(pkgs_aux(PkgsAux), transport(http, ws))]
pub struct GenericThrottlingApi {
  pub rt: RequestThrottling,
}

impl Api for GenericThrottlingApi {
  type Error = wtx::Error;

  async fn before_sending(&mut self) -> Result<(), Self::Error> {
    self.rt.rc.update_params(&self.rt.rl).await?;
    Ok(())
  }
}

#[wtx_macros::pkg(
  api(crate::GenericThrottlingApi),
  data_format(json_rpc("genericHttpRequest")),
  transport(http)
)]
mod generic_http_request {
  #[pkg::aux]
  impl<A, DRSR> crate::HttpPkgsAux<A, DRSR> {}

  #[derive(Debug, serde::Serialize)]
  #[pkg::req_data]
  pub struct GenericHttpRequestReq(#[pkg::field(name = "generic_number")] i32);

  #[pkg::res_data]
  pub type GenericHttpRequestRes = (u8, u16, u32);
}

#[wtx_macros::pkg(
  api(crate::GenericThrottlingApi),
  data_format(json_rpc("genericWebSocketSubscription")),
  transport(ws)
)]
mod generic_web_socket_subscription {
  #[pkg::aux]
  impl<A, DRSR> crate::WsPkgsAux<A, DRSR> {}

  #[derive(Debug, serde::Serialize)]
  #[pkg::req_data]
  pub struct GenericWebSocketSubscriptionReq<'str> {
    generic_string: &'str str,
    #[serde(skip_serializing_if = "Option::is_none")]
    generic_number: Option<i32>,
  }

  #[pkg::res_data]
  pub type GenericWebSocketSubscriptionRes = u64;
}

async fn http_pair(
) -> Pair<PkgsAux<GenericThrottlingApi, SerdeJson, HttpParams>, ClientFrameworkTokio> {
  Pair::new(
    PkgsAux::from_minimum(
      GenericThrottlingApi {
        rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))),
      },
      SerdeJson,
      HttpParams::from_uri("ws://generic_web_socket_uri.com".into()),
    ),
    ClientFrameworkTokio::tokio(1).build(),
  )
}

async fn web_socket_pair() -> wtx::Result<
  Pair<
    PkgsAux<GenericThrottlingApi, SerdeJson, WsParams>,
    (FrameBufferVec, WebSocketClient<(), NoStdRng, TcpStream, WebSocketBuffer>),
  >,
> {
  let mut fb = FrameBufferVec::default();
  let uri = Uri::new("ws://generic_web_socket_uri.com");
  let web_socket = WebSocketClient::connect(
    (),
    &mut fb,
    [],
    &mut HeadersBuffer::default(),
    NoStdRng::default(),
    TcpStream::connect(uri.hostname_with_implied_port()).await?,
    &uri,
    WebSocketBuffer::default(),
  )
  .await?
  .1;
  Ok(Pair::new(
    PkgsAux::from_minimum(
      GenericThrottlingApi {
        rt: RequestThrottling::from_rl(RequestLimit::new(40, Duration::from_secs(2))),
      },
      SerdeJson,
      WsParams::default(),
    ),
    (fb, web_socket),
  ))
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let mut hp = http_pair().await;
  let _http_response_tuple = hp
    .trans
    .send_recv_decode_contained(
      &mut hp.pkgs_aux.generic_http_request().data(123).build(),
      &mut hp.pkgs_aux,
    )
    .await?
    .result?;

  let mut wsp = web_socket_pair().await?;
  let _web_socket_subscription_id = wsp
    .trans
    .send_recv_decode_contained(
      &mut wsp.pkgs_aux.generic_web_socket_subscription().data("Hello", None).build(),
      &mut wsp.pkgs_aux,
    )
    .await?
    .result?;
  Ok(())
}

Client Connection

PostgreSQL is currently the only supported database. Implements https://www.postgresql.org/docs/16/protocol.html.

Activation feature is called postgres.

PostgreSQL Benchmark

Example

//! Demonstrates different interactions with a PostgreSQL database.
//!
//! This snippet requires ~40 dependencies and has an optimized binary size of ~600K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use wtx::database::{Executor as _, Record, Records, TransactionManager};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = "postgres://USER:PASSWORD@localhost/DATABASE";
  let mut executor = wtx_instances::executor(&uri).await?;
  let mut tm = executor.transaction().await?;
  tm.executor().execute("CREATE TABLE IF NOT EXISTS example(id INT, name VARCHAR)", |_| {}).await?;
  let _ = tm
    .executor()
    .execute_with_stmt("INSERT INTO foo VALUES ($1, $2), ($3, $4)", (1u32, "one", 2u32, "two"))
    .await?;
  tm.commit().await?;
  let records = executor
    .fetch_many_with_stmt("SELECT id, name FROM example;", (), |_| Ok::<_, wtx::Error>(()))
    .await?;
  assert_eq!(records.get(0).as_ref().and_then(|record| record.decode("id").ok()), Some(1));
  assert_eq!(records.get(1).as_ref().and_then(|record| record.decode("name").ok()), Some("two"));
  Ok(())
}

Schema Management

Embedded and CLI workflows using raw SQL commands.

Activation feature is called schema-manager.

CLI

# Example

cargo install --git https://github.com/c410-f3r/wtx --features schema-manager-dev wtx-ui
echo DATABASE_URI="postgres://USER:PASSWORD@localhost:5432/DATABASE" > .env
RUST_LOG=debug wtx-cli migrate

The CLI application expects a configuration file that contains a set of paths where each path is a directory with multiple migrations.

# wtx.toml

migration_groups = [
  "migrations/1__initial",
  "migrations/2__fancy_stuff"
]

Each provided migration and group must contain an unique version and a name summarized by the following structure:

// Execution order of migrations is dictated by their numeric declaration order.

migrations
+-- 1__initial (Group)
    +-- 1__create_author.sql (Migration)
    +-- 2__create_post.sql (Migration)
+-- 2__fancy_stuff (Group)
    +-- 1__something_fancy.sql (Migration)
wtx.toml

The SQL file itself is composed by two parts, one for migrations (-- wtx IN section) and another for rollbacks (-- wtx OUT section).

-- wtx IN

CREATE TABLE author (
  id INT NOT NULL PRIMARY KEY,
  added TIMESTAMP NOT NULL,
  birthdate DATE NOT NULL,
  email VARCHAR(100) NOT NULL,
  first_name VARCHAR(50) NOT NULL,
  last_name VARCHAR(50) NOT NULL
);

-- wtx OUT

DROP TABLE author;

One cool thing about the expected file configuration is that it can also be divided into smaller pieces, for example, the above migration could be transformed into 1__author_up.sql and 1__author_down.sql.

-- 1__author_up.sql

CREATE TABLE author (
  id INT NOT NULL PRIMARY KEY,
  added TIMESTAMP NOT NULL,
  birthdate DATE NOT NULL,
  email VARCHAR(100) NOT NULL,
  first_name VARCHAR(50) NOT NULL,
  last_name VARCHAR(50) NOT NULL
);
-- 1__author_down.sql

DROP TABLE author;
migrations
+-- 1__some_group (Group)
    +-- 1__author (Migration directory)
        +-- 1__author_down.sql (Down migration)
        +-- 1__author_up.sql (Up migration)
        +-- 1__author.toml (Optional configuration)
wtx.toml

Library

The library gives freedom to arrange groups and uses some external crates, bringing ~10 additional dependencies into your application. If this overhead is not acceptable, then you probably should discard the library and use the CLI binary instead as part of a custom deployment strategy.

extern crate tokio;
extern crate wtx;

use std::path::Path;
use wtx::database::{schema_manager::Commands, DEFAULT_URI_VAR};
use wtx::misc::Vector;

#[tokio::main]
async fn main() {
  let mut commands = Commands::with_executor(());
  commands
    .migrate_from_dir(
      (&mut String::default(), &mut Vector::default()),
      Path::new("my_custom_migration_group_path"),
    )
    .await
    .unwrap();
}

Embedded migrations

To make deployment easier, the final binary of your application can embed all necessary migrations through the binary that is available in the wtx-ui crate.

#![allow(unused)]
fn main() {
extern crate wtx;

// This is an example! The actual contents are filled by the `wtx-ui embed-migrations` binary call.
mod embedded_migrations {
  pub(crate) const GROUPS: wtx::database::schema_manager::EmbeddedMigrationsTy = &[];
}

use wtx::database::schema_manager::Commands;
use wtx::misc::Vector;

async fn migrate() -> wtx::Result<()> {
  Commands::with_executor(())
    .migrate_from_groups((&mut String::new(), &mut Vector::new()), embedded_migrations::GROUPS)
    .await
}
}

Conditional migrations

If one particular migration needs to be executed in a specific set of databases, then it is possible to use the -- wtx dbs parameter in a file.

-- wtx dbs mssql,postgres

-- wtx IN

CREATE SCHEMA foo;

-- wtx OUT

DROP SCHEMA foo;

Repeatable migrations

Repeatability can be specified with -- wtx repeatability SOME_VALUE where SOME_VALUE can be either always (regardless of the checksum) or on-checksum-change (runs only when the checksums changes).

-- wtx dbs postgres
-- wtx repeatability always

-- wtx IN

CREATE OR REPLACE PROCEDURE something() LANGUAGE SQL AS $$ $$

-- wtx OUT

DROP PROCEDURE something();

Keep in mind that repeatable migrations might break subsequent operations, therefore, you must known what you are doing. If desirable, they can be separated into dedicated groups.

migrations/1__initial_repeatable_migrations
migrations/2__normal_migrations
migrations/3__final_repeatable_migrations

Namespaces/Schemas

For supported databases, there is no direct user parameter that inserts migrations inside a single database schema but it is possible to specify the schema inside the SQL file and arrange the migration groups structure in a way that most suits you.

-- wtx IN

CREATE TABLE cool_department_schema.author (
  id INT NOT NULL PRIMARY KEY,
  full_name VARCHAR(50) NOT NULL
);

-- wtx OUT

DROP TABLE cool_department_schema.author;

gRPC

Basic implementation that currently supports only unary calls.

wtx does not provide built-in deserialization or serialization utilities capable of manipulate protobuf files. Instead, users are free to choose any third-party that generates Rust bindings and implements the internal Deserialize and Serialize traits.

Due to the lack of an official parser, the definitions of a Service must be manually typed.

Activation feature is called grpc.

Client Example

//! gRPC client that uses the structure definitions found in the `wtx_instances::grpc_bindings`
//! module.
//!
//! This snippet requires ~40 dependencies and has an optimized binary size of ~700K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use std::borrow::Cow;
use wtx::{
  data_transformation::dnsn::QuickProtobuf,
  grpc::Client,
  http::{client_framework::ClientFramework, ReqResBuffer, ReqResData},
};
use wtx_instances::grpc_bindings::wtx::{GenericRequest, GenericResponse};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let mut client = Client::new(ClientFramework::tokio(1).build(), QuickProtobuf);
  let mut rrb = ReqResBuffer::default();
  rrb.uri.reset(format_args!("http://127.0.0.1:9000"))?;
  let res = client
    .send_unary_req(
      ("wtx", "GenericService", "generic_method"),
      GenericRequest {
        generic_request_field0: Cow::Borrowed(b"generic_request_value"),
        generic_request_field1: 123,
      },
      rrb,
    )
    .await?;
  let generic_response: GenericResponse = client.des_from_res_bytes(res.rrd.body())?;
  println!("{:?}", generic_response);
  Ok(())
}

Server Example

//! gRPC server that uses the structure definitions found in the `wtx_instances::grpc_bindings`
//! module.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use std::borrow::Cow;
use wtx::{
  data_transformation::dnsn::QuickProtobuf,
  grpc::{GrpcManager, GrpcResMiddleware},
  http::{
    server_framework::{post, Router, ServerFrameworkBuilder, State},
    ReqResBuffer, StatusCode,
  },
};
use wtx_instances::grpc_bindings::wtx::{GenericRequest, GenericResponse};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let router = Router::new(
    wtx::paths!(("wtx.GenericService/generic_method", post(wtx_generic_service_generic_method))),
    (),
    GrpcResMiddleware,
  )?;
  ServerFrameworkBuilder::new(router)
    .with_req_aux(|| QuickProtobuf::default())
    .listen_tls(
      (wtx_instances::CERT, wtx_instances::KEY),
      &wtx_instances::host_from_args(),
      |error| eprintln!("{error}"),
    )
    .await
}

async fn wtx_generic_service_generic_method(
  state: State<'_, (), GrpcManager<QuickProtobuf>, ReqResBuffer>,
) -> wtx::Result<StatusCode> {
  let _generic_request: GenericRequest = state.ra.des_from_req_bytes(&state.req.rrd.data)?;
  state.req.rrd.clear();
  state.ra.ser_to_res_bytes(
    &mut state.req.rrd.data,
    GenericResponse {
      generic_response_field0: Cow::Borrowed(b"generic_response_value"),
      generic_response_field1: 321,
    },
  )?;
  Ok(StatusCode::Ok)
}

HTTP/2

Implementation of RFC7541 and RFC9113. In other words, a low-level HTTP.

Passes the hpack-test-case and the h2spec test suites. Due to official deprecation, server push and prioritization are not supported.

Activation feature is called http2.

Client Example

//! Fetches an URI using low-level HTTP/2 resources.
//!
//! This snippet requires ~25 dependencies and has an optimized binary size of ~700K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use tokio::net::TcpStream;
use wtx::{
  http::{Method, ReqResBuffer, Request},
  http2::{Http2Buffer, Http2ErrorCode, Http2Params, Http2Tokio},
  misc::{from_utf8_basic, NoStdRng, Uri},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = Uri::new("http://www.example.com");
  let (frame_reader, mut http2) = Http2Tokio::connect(
    Http2Buffer::new(NoStdRng::default()),
    Http2Params::default(),
    TcpStream::connect(uri.hostname_with_implied_port()).await?.into_split(),
  )
  .await?;
  let _jh = tokio::spawn(frame_reader);
  let rrb = ReqResBuffer::default();
  let mut stream = http2.stream().await?;
  stream.send_req(Request::http2(Method::Get, b"Hello!"), &uri.to_ref()).await?;
  let (res_rrb, opt) = stream.recv_res(rrb).await?;
  let _status_code = opt.unwrap();
  println!("{}", from_utf8_basic(&res_rrb.data)?);
  http2.send_go_away(Http2ErrorCode::NoError).await;
  Ok(())
}

Server Example

//! Serves requests using low-level HTTP/2 resources along side self-made certificates.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use wtx::{
  http::{LowLevelServer, ReqResBuffer, Request, Response, StatusCode},
  http2::{Http2Buffer, Http2Params},
  misc::{StdRng, TokioRustlsAcceptor},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  LowLevelServer::tokio_http2(
    &wtx_instances::host_from_args(),
    || Ok(((), Http2Buffer::new(StdRng::default()), Http2Params::default())),
    |error| eprintln!("{error}"),
    handle,
    || Ok(((), ReqResBuffer::default())),
    (
      || {
        TokioRustlsAcceptor::without_client_auth()
          .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY)
      },
      |acceptor| acceptor.clone(),
      |acceptor, stream| async move { Ok(tokio::io::split(acceptor.accept(stream).await?)) },
    ),
  )
  .await
}

async fn handle(
  _: (),
  _: (),
  mut req: Request<ReqResBuffer>,
) -> Result<Response<ReqResBuffer>, wtx::Error> {
  req.rrd.clear();
  Ok(req.into_response(StatusCode::Ok))
}

HTTP Client Framework

High-level pool of HTTP clients that currently only supports HTTP/2. Allows multiple connections that can be referenced in concurrent scenarios.

Activation feature is called http-client-framework.

Example

//! Fetches and prints the response body of a provided URI.
//!
//! This snippet requires ~25 dependencies and has an optimized binary size of ~700K.
//!
//! Currently, only HTTP/2 is supported.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use wtx::{
  http::{client_framework::ClientFramework, Method, ReqResBuffer},
  misc::{from_utf8_basic, Uri},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = Uri::new("http://www.example.com");
  let buffer = ReqResBuffer::default();
  let client = ClientFramework::tokio(1).build();
  let res = client.send(Method::Get, buffer, &uri.to_ref()).await?;
  println!("{}", from_utf8_basic(&res.rrd.data)?);
  Ok(())
}

HTTP Server Framework

A small and fast to compile framework that can interact with many built-in features like PostgreSQL connections.

Activation feature is called http-server-framework.

HTTP/2 Benchmarks

Example

//! An HTTP server framework showcasing nested routes, request middlewares, response
//! middlewares, dynamic routes, PostgreSQL connections and JSON deserialization/serialization.
//!
//! Currently, only HTTP/2 is supported.
//!
//! This snippet requires ~50 dependencies and has an optimized binary size of ~900K.

extern crate serde;
extern crate serde_json;
extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use core::fmt::Write;
use tokio::net::TcpStream;
use wtx::{
  database::{Executor, Record},
  http::{
    server_framework::{
      get, post, PathOwned, Router, SerdeJson, ServerFrameworkBuilder, StateClean,
    },
    ReqResBuffer, Request, Response, StatusCode,
  },
  misc::FnFutWrapper,
  pool::{PostgresRM, SimplePoolTokio},
};

type Pool = SimplePoolTokio<PostgresRM<wtx::Error, TcpStream>>;

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let router = Router::paths(wtx::paths!(
    ("/db/{id}", get(db)),
    ("/json", post(json)),
    (
      "/say",
      Router::new(
        wtx::paths!(("/hello", get(hello)), ("/world", get(world))),
        FnFutWrapper::from(request_middleware),
        FnFutWrapper::from(response_middleware),
      )?,
    ),
  ))?;
  let rm = PostgresRM::tokio("postgres://USER:PASSWORD@localhost/DB_NAME".into());
  let pool = Pool::new(4, rm);
  ServerFrameworkBuilder::new(router)
    .with_req_aux(move || pool.clone())
    .listen(&wtx_instances::host_from_args(), |error| eprintln!("{error:?}"))
    .await
}

#[derive(serde::Deserialize)]
struct DeserializeExample {
  _foo: i32,
  _bar: u64,
}

#[derive(serde::Serialize)]
struct SerializeExample {
  _baz: [u8; 4],
}

async fn db(
  state: StateClean<'_, (), Pool, ReqResBuffer>,
  PathOwned(id): PathOwned<u32>,
) -> wtx::Result<StatusCode> {
  let mut lock = state.ra.get().await?;
  let record = lock.fetch_with_stmt("SELECT name FROM persons WHERE id = $1", (id,)).await?;
  let name = record.decode::<_, &str>(0)?;
  state.req.rrd.data.write_fmt(format_args!("Person of id `1` has name `{name}`"))?;
  Ok(StatusCode::Ok)
}

async fn hello() -> &'static str {
  "hello"
}

async fn json(_: SerdeJson<DeserializeExample>) -> wtx::Result<SerdeJson<SerializeExample>> {
  Ok(SerdeJson(SerializeExample { _baz: [1, 2, 3, 4] }))
}

async fn request_middleware(
  _: &mut (),
  _: &mut Pool,
  _: &mut Request<ReqResBuffer>,
) -> wtx::Result<()> {
  println!("Before response");
  Ok(())
}

async fn response_middleware(
  _: &mut (),
  _: &mut Pool,
  _: Response<&mut ReqResBuffer>,
) -> wtx::Result<()> {
  println!("After response");
  Ok(())
}

async fn world() -> &'static str {
  "world"
}

Pool

An asynchronous pool of arbitrary objects where each element is dynamically created or re-created when invalid.

Can also be used for database connections, which is quite handy because it enhances the performance of executing commands and alleviates the use of hardware resources.

Activation feature is called pool.

Example

//! Minimal code that shows the creation of a management structure that always yields `123`.

extern crate tokio;
extern crate wtx;

use wtx::pool::{ResourceManager, SimplePoolTokio};

pub struct CustomManager;

impl ResourceManager for CustomManager {
  type CreateAux = ();
  type Error = wtx::Error;
  type RecycleAux = ();
  type Resource = i32;

  async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
    Ok(123)
  }

  async fn is_invalid(&self, _: &Self::Resource) -> bool {
    false
  }

  async fn recycle(&self, _: &Self::RecycleAux, _: &mut Self::Resource) -> Result<(), Self::Error> {
    Ok(())
  }
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let pool = SimplePoolTokio::new(1, CustomManager);
  let resource = ***pool.get().await?;
  assert_eq!(resource, 123);
  Ok(())
}

UI tools

wtx-ui is a standalone crate intended to allow interactions with the wtx project through an user interface. At the current time only CLI interfaces are available.

  • Embeds SQL migrations for schema-manager. Activation feature is called embed-migrations.
  • Runs SQL migrations managed by schema-manager. Activation feature is called schema-manager or schema-manager-dev.
  • Performs very basic WebSocket Client/Server operations. Activation feature is called web-socket.
  • Makes requests to arbitrary URIs mimicking the interface of cURL. Activation feature is called http-client.

WebSocket

Implementation of RFC6455 and RFC7692.

Activation feature is called web-socket.

WebSocket Benchmark

Autobahn Reports

  1. fuzzingclient
  2. fuzzingserver

Compression

The "permessage-deflate" extension is the only supported compression format and is backed by the zlib-rs project that performs as well as zlib-ng.

To get the most performance possible, try compiling your program with RUSTFLAGS='-C target-cpu=native' to allow zlib-rs to use more efficient SIMD instructions.

Client Example

//! WebSocket CLI client that enables real-time communication by allowing users to send and
//! receive messages through typing.
//!
//! This snippet requires ~35 dependencies and has an optimized binary size of ~550K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use tokio::{
  io::{AsyncBufReadExt, BufReader},
  net::TcpStream,
};
use wtx::{
  misc::{StdRng, Uri},
  web_socket::{
    FrameBufferVec, FrameMutVec, HeadersBuffer, OpCode, WebSocketBuffer, WebSocketClient,
  },
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = Uri::new("ws://www.example.com");
  let fb = &mut FrameBufferVec::default();
  let (_, mut ws) = WebSocketClient::connect(
    (),
    fb,
    [],
    &mut HeadersBuffer::default(),
    StdRng::default(),
    TcpStream::connect(uri.hostname_with_implied_port()).await?,
    &uri.to_ref(),
    WebSocketBuffer::default(),
  )
  .await?;
  let mut buffer = String::new();
  let mut reader = BufReader::new(tokio::io::stdin());
  loop {
    tokio::select! {
      frame_rslt = ws.read_frame(fb) => {
        let frame = frame_rslt?;
        match (frame.op_code(), frame.text_payload()) {
          (_, Some(elem)) => println!("{elem}"),
          (OpCode::Close, _) => break,
          _ => {}
        }
      }
      read_rslt = reader.read_line(&mut buffer) => {
        let _ = read_rslt?;
        ws.write_frame(&mut FrameMutVec::new_fin(fb, OpCode::Text, buffer.as_bytes())?).await?;
      }
    }
  }
  Ok(())
}

Server Example

//! Serves requests using low-level WebSockets resources along side self-made certificates.

extern crate tokio;
extern crate tokio_rustls;
extern crate wtx;
extern crate wtx_instances;

use tokio::net::TcpStream;
use tokio_rustls::server::TlsStream;
use wtx::{
  http::LowLevelServer,
  misc::{StdRng, TokioRustlsAcceptor},
  web_socket::{FrameBufferVec, OpCode, WebSocketBuffer, WebSocketServer},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  LowLevelServer::tokio_web_socket(
    &wtx_instances::host_from_args(),
    None,
    || {},
    |error| eprintln!("{error}"),
    handle,
    (
      || {
        TokioRustlsAcceptor::without_client_auth()
          .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY)
      },
      |acceptor| acceptor.clone(),
      |acceptor, stream| async move { Ok(acceptor.accept(stream).await?) },
    ),
  )
  .await
}

async fn handle(
  fb: &mut FrameBufferVec,
  mut ws: WebSocketServer<(), StdRng, TlsStream<TcpStream>, &mut WebSocketBuffer>,
) -> wtx::Result<()> {
  loop {
    let mut frame = ws.read_frame(fb).await?;
    match frame.op_code() {
      OpCode::Binary | OpCode::Text => {
        ws.write_frame(&mut frame).await?;
      }
      OpCode::Close => break,
      _ => {}
    }
  }
  Ok(())
}