WTX

CI crates.io Documentation License Rustc

A collection of different transport implementations and related tools focused primarily on web technologies. Features the in-house development of 6 IETF RFCs (6265, 6455, 7541, 7692, 8441, 9113), 2 formal specifications (gRPC, PostgreSQL) and several other invented ideas.

  1. Client API Framework
  2. Database Client
  3. Database Schema Manager
  4. gRPC Client/Server
  5. HTTP Client Framework
  6. HTTP Server Framework
  7. HTTP/2 Client/Server
  8. Pool Manager
  9. UI tools
  10. WebSocket Client/Server
  11. WebSocket over HTTP/2

Every feature is optional and must be set at compile time. Please see the intended documentation for further information.

Embedded devices with a working heap allocator can use this no_std crate.

Performance

Many things that generally improve performance are used in the project, to name a few:

  1. Manual vectorization: When an algorithm is known for processing large amounts of data, several experiments are performed to analyze the best way to split loops in order to allow the compiler to take advantage of SIMD instructions in x86 processors.
  2. Memory allocation: Whenever possible, all heap allocations are called only once at the start of an instance creation and additionally, stack memory usage is preferably prioritized over heap memory.
  3. Fewer dependencies: No third-party is injected by default. In other words, additional dependencies are up to the user through the selection of Cargo features, which decreases compilation times. For example, you can see the mere 16 dependencies required by the PostgreSQL client using cargo tree -e normal --features postgres.

Since memory are usually held at the instance level instead of being created and dropped on the fly, it is worth noting that its usage can growth significantly depending on the use-case. If appropriated, try using a shared pool of resources or try limiting how much data can be exchanged between parties.

High-level benchmarks

Checkout wtx-bench to see a variety of benchmarks or feel free to point any misunderstandings or misconfigurations.

WebSocket Benchmark

There are mainly 2 things that impact performance, the chosen runtime and the number of pre-allocated bytes. Specially for servers that have to create a new instance for each handshake, pre-allocating a high number of bytes for short-lived or low-transfer connections can have a negative impact.

PostgreSQL Benchmark

Low-level benchmarks

Anything marked with #[bench] in the repository is considered a low-level benchmark in the sense that they measure very specific operations that generally serve as the basis for other parts.

Take a look at https://bencher.dev/perf/wtx to see all low-level benchmarks over different periods of time.

Transport Layer Security (TLS)

When using a feature that requires network connection, it is often necessary to perform encrypted communication and since wtx is not hard-coded with a specific stream implementation, it is up to you to choose the best TLS provider.

Some utilities like TokioRustlsConnector or TokioRustlsAcceptor are available to make things more convenient but keep in mind that it is still necessary to activate a crate that provides certificates for client usage.

Examples

Demonstrations of different use-cases can be found in the wtx-instances directory as well as in the documentation.

Limitations

  • Does not support systems with a pointer length of 16 bits.

  • Expects the infallible sum of the lengths of an arbitrary number of slices, otherwise the program will likely trigger an overflow that can possibly result in unexpected operations. For example, in a 32bit system such a scenario should be viable without swap memory or through specific limiters like ulimit.

Client API Framework

A flexible client API framework for writing asynchronous, fast, organizable, scalable and maintainable applications. Supports several data formats, transports and custom parameters.

Checkout the wtx-apis project to see a collection of APIs based on wtx.

To use this functionality, it is necessary to activate the client-api-framework feature.

Objective

It is possible to directly decode responses using built-in methods provided by some transport implementations like reqwest or surf but as complexity grows, the cost of maintaining large sets of endpoints with ad-hoc solutions usually becomes unsustainable. Based on this scenario, wtx comes into play to organize and centralize data flow in a well-defined manner to increase productivity and maintainability.

For API consumers, the calling convention of wtx endpoints is based on fluent interfaces which makes the usage more pleasant and intuitive.

Moreover, the project may in the future create automatic bindings for other languages in order to avoid having duplicated API repositories.

Example

//! Illustrates how the `client-api-framework` feature facilitates the management and utilization
//! of large API endpoints for both HTTP and WebSocket requests.
//!
//! Contains one API called `GenericThrottlingApi` and its two endpoints: an HTTP JSON-RPC
//! `genericHttpRequest` and an WebSocket `genericWebSocketSubscription`.
//!
//! Everything that is not inside `main` should be constructed only once in your program.

extern crate serde;
extern crate tokio;
extern crate wtx;
extern crate wtx_macros;

use core::time::Duration;
use tokio::net::TcpStream;
use wtx::{
  client_api_framework::{
    misc::{Pair, RequestLimit, RequestThrottling},
    network::{transport::SendingRecievingTransport, HttpParams, WsParams},
    Api,
  },
  data_transformation::dnsn::SerdeJson,
  http::client_framework::ClientFrameworkTokio,
  misc::{simple_seed, Uri, Xorshift64},
  web_socket::{WebSocket, WebSocketBuffer},
};

wtx::create_packages_aux_wrapper!();

#[derive(Debug)]
#[wtx_macros::api_params(pkgs_aux(PkgsAux), transport(http, ws))]
pub struct GenericThrottlingApi {
  pub rt: RequestThrottling,
}

impl Api for GenericThrottlingApi {
  type Error = wtx::Error;

  async fn before_sending(&mut self) -> Result<(), Self::Error> {
    self.rt.rc.update_params(&self.rt.rl).await?;
    Ok(())
  }
}

#[wtx_macros::pkg(
  api(crate::GenericThrottlingApi),
  data_format(json_rpc("genericHttpRequest")),
  transport(http)
)]
mod generic_http_request {
  #[pkg::aux]
  impl<A, DRSR> crate::HttpPkgsAux<A, DRSR> {}

  #[derive(Debug, serde::Serialize)]
  #[pkg::req_data]
  pub struct GenericHttpRequestReq(#[pkg::field(name = "generic_number")] i32);

  #[pkg::res_data]
  pub type GenericHttpRequestRes = (u8, u16, u32);
}

#[wtx_macros::pkg(
  api(crate::GenericThrottlingApi),
  data_format(json_rpc("genericWebSocketSubscription")),
  transport(ws)
)]
mod generic_web_socket_subscription {
  #[pkg::aux]
  impl<A, DRSR> crate::WsPkgsAux<A, DRSR> {}

  #[derive(Debug, serde::Serialize)]
  #[pkg::req_data]
  pub struct GenericWebSocketSubscriptionReq<'str> {
    generic_string: &'str str,
    #[serde(skip_serializing_if = "Option::is_none")]
    generic_number: Option<i32>,
  }

  #[pkg::res_data]
  pub type GenericWebSocketSubscriptionRes = u64;
}

async fn http_pair(
) -> Pair<PkgsAux<GenericThrottlingApi, SerdeJson, HttpParams>, ClientFrameworkTokio> {
  Pair::new(
    PkgsAux::from_minimum(
      GenericThrottlingApi {
        rt: RequestThrottling::from_rl(RequestLimit::new(5, Duration::from_secs(1))),
      },
      SerdeJson,
      HttpParams::from_uri("ws://generic_web_socket_uri.com".into()),
    ),
    ClientFrameworkTokio::tokio(1).build(),
  )
}

async fn web_socket_pair() -> wtx::Result<
  Pair<
    PkgsAux<GenericThrottlingApi, SerdeJson, WsParams>,
    WebSocket<(), TcpStream, WebSocketBuffer, true>,
  >,
> {
  let uri = Uri::new("ws://generic_web_socket_uri.com");
  let web_socket = WebSocket::connect(
    (),
    [],
    false,
    Xorshift64::from(simple_seed()),
    TcpStream::connect(uri.hostname_with_implied_port()).await?,
    &uri,
    WebSocketBuffer::default(),
    |_| wtx::Result::Ok(()),
  )
  .await?;
  Ok(Pair::new(
    PkgsAux::from_minimum(
      GenericThrottlingApi {
        rt: RequestThrottling::from_rl(RequestLimit::new(40, Duration::from_secs(2))),
      },
      SerdeJson,
      WsParams::default(),
    ),
    web_socket,
  ))
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let mut hp = http_pair().await;
  let _http_response_tuple = hp
    .trans
    .send_recv_decode_contained(
      &mut hp.pkgs_aux.generic_http_request().data(123).build(),
      &mut hp.pkgs_aux,
    )
    .await?
    .result?;

  let mut wsp = web_socket_pair().await?;
  let _web_socket_subscription_id = wsp
    .trans
    .send_recv_decode_contained(
      &mut wsp.pkgs_aux.generic_web_socket_subscription().data("Hello", None).build(),
      &mut wsp.pkgs_aux,
    )
    .await?
    .result?;
  Ok(())
}

Client Connection

Provides a set of functions that establish connections, execute queries and manage data transactions with different databases.

At the current time PostgreSQL is the only supported database. Implements https://www.postgresql.org/docs/16/protocol.html.

More benchmarks are available at https://github.com/diesel-rs/metrics.

To use this functionality, it is necessary to activate the postgres feature.

PostgreSQL Benchmark

Example

//! Demonstrates different interactions with a PostgreSQL database.
//!
//! This snippet requires ~40 dependencies and has an optimized binary size of ~600K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use wtx::database::{Executor as _, Record, Records, TransactionManager};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = "postgres://USER:PASSWORD@localhost/DATABASE";
  let mut executor = wtx_instances::executor_postgres(&uri).await?;
  let mut tm = executor.transaction().await?;
  tm.executor().execute("CREATE TABLE IF NOT EXISTS example(id INT, name VARCHAR)", |_| {}).await?;
  let _ = tm
    .executor()
    .execute_with_stmt("INSERT INTO foo VALUES ($1, $2), ($3, $4)", (1u32, "one", 2u32, "two"))
    .await?;
  tm.commit().await?;
  let records = executor
    .fetch_many_with_stmt("SELECT id, name FROM example;", (), |_| Ok::<_, wtx::Error>(()))
    .await?;
  assert_eq!(records.get(0).as_ref().and_then(|record| record.decode("id").ok()), Some(1));
  assert_eq!(records.get(1).as_ref().and_then(|record| record.decode("name").ok()), Some("two"));
  Ok(())
}

Schema Management

Embedded and CLI workflows using raw SQL commands. A schema manager is a tool thats allows developers to define, track and apply changes to database structures over time, ensuring consistency across different environments.

To use this functionality, it is necessary to activate the schema-manager feature.

CLI

# Example

cargo install --git https://github.com/c410-f3r/wtx --features schema-manager-dev wtx-ui
echo DATABASE_URI="postgres://USER:PASSWORD@localhost:5432/DATABASE" > .env
RUST_LOG=debug wtx-cli migrate

The CLI application expects a configuration file that contains a set of paths where each path is a directory with multiple migrations.

# wtx.toml

migration_groups = [
  "migrations/1__initial",
  "migrations/2__fancy_stuff"
]

Each provided migration and group must contain an unique version and a name summarized by the following structure:

// Execution order of migrations is dictated by their numeric declaration order.

migrations
+-- 1__initial (Group)
    +-- 1__create_author.sql (Migration)
    +-- 2__create_post.sql (Migration)
+-- 2__fancy_stuff (Group)
    +-- 1__something_fancy.sql (Migration)
wtx.toml

The SQL file itself is composed by two parts, one for migrations (-- wtx IN section) and another for rollbacks (-- wtx OUT section).

-- wtx IN

CREATE TABLE author (
  id INT NOT NULL PRIMARY KEY,
  added TIMESTAMP NOT NULL,
  birthdate DATE NOT NULL,
  email VARCHAR(100) NOT NULL,
  first_name VARCHAR(50) NOT NULL,
  last_name VARCHAR(50) NOT NULL
);

-- wtx OUT

DROP TABLE author;

One cool thing about the expected file configuration is that it can also be divided into smaller pieces, for example, the above migration could be transformed into 1__author_up.sql and 1__author_down.sql.

-- 1__author_up.sql

CREATE TABLE author (
  id INT NOT NULL PRIMARY KEY,
  added TIMESTAMP NOT NULL,
  birthdate DATE NOT NULL,
  email VARCHAR(100) NOT NULL,
  first_name VARCHAR(50) NOT NULL,
  last_name VARCHAR(50) NOT NULL
);
-- 1__author_down.sql

DROP TABLE author;
migrations
+-- 1__some_group (Group)
    +-- 1__author (Migration directory)
        +-- 1__author_down.sql (Down migration)
        +-- 1__author_up.sql (Up migration)
        +-- 1__author.toml (Optional configuration)
wtx.toml

Library

The library gives freedom to arrange groups and uses some external crates, bringing ~10 additional dependencies into your application. If this overhead is not acceptable, then you probably should discard the library and use the CLI binary instead as part of a custom deployment strategy.

extern crate tokio;
extern crate wtx;

use std::path::Path;
use wtx::database::{schema_manager::Commands, DEFAULT_URI_VAR};
use wtx::misc::Vector;

#[tokio::main]
async fn main() {
  let mut commands = Commands::with_executor(());
  commands
    .migrate_from_dir(
      (&mut String::default(), &mut Vector::default()),
      Path::new("my_custom_migration_group_path"),
    )
    .await
    .unwrap();
}

Embedded migrations

To make deployment easier, the final binary of your application can embed all necessary migrations through the binary that is available in the wtx-ui crate.

#![allow(unused)]
fn main() {
extern crate wtx;

// This is an example! The actual contents are filled by the `wtx-ui embed-migrations` binary call.
mod embedded_migrations {
  pub(crate) static GROUPS: wtx::database::schema_manager::EmbeddedMigrationsTy = &[];
}

use wtx::database::schema_manager::Commands;
use wtx::misc::Vector;

async fn migrate() -> wtx::Result<()> {
  Commands::with_executor(())
    .migrate_from_groups((&mut String::new(), &mut Vector::new()), embedded_migrations::GROUPS)
    .await
}
}

Conditional migrations

If one particular migration needs to be executed in a specific set of databases, then it is possible to use the -- wtx dbs parameter in a file.

-- wtx dbs mssql,postgres

-- wtx IN

CREATE SCHEMA foo;

-- wtx OUT

DROP SCHEMA foo;

Repeatable migrations

Repeatability can be specified with -- wtx repeatability SOME_VALUE where SOME_VALUE can be either always (regardless of the checksum) or on-checksum-change (runs only when the checksums changes).

-- wtx dbs postgres
-- wtx repeatability always

-- wtx IN

CREATE OR REPLACE PROCEDURE something() LANGUAGE SQL AS $$ $$

-- wtx OUT

DROP PROCEDURE something();

Keep in mind that repeatable migrations might break subsequent operations, therefore, you must known what you are doing. If desirable, they can be separated into dedicated groups.

migrations/1__initial_repeatable_migrations
migrations/2__normal_migrations
migrations/3__final_repeatable_migrations

Namespaces/Schemas

For supported databases, there is no direct user parameter that inserts migrations inside a single database schema but it is possible to specify the schema inside the SQL file and arrange the migration groups structure in a way that most suits you.

-- wtx IN

CREATE TABLE cool_department_schema.author (
  id INT NOT NULL PRIMARY KEY,
  full_name VARCHAR(50) NOT NULL
);

-- wtx OUT

DROP TABLE cool_department_schema.author;

gRPC

Basic implementation that currently only supports unary calls. gRPC is an high-performance remote procedure call framework developed by Google that enables efficient communication between distributed systems, particularly in microservices architectures.

wtx does not provide built-in deserialization or serialization utilities capable of manipulate protobuf files. Instead, users are free to choose any third-party that generates Rust bindings and implements the internal Deserialize and Serialize traits.

Due to the lack of an official parser, the definitions of a Service must be manually typed.

Independent benchmarks are available https://github.com/LesnyRumcajs/grpc_bench.

To use this functionality, it is necessary to activate the grpc feature.

Client Example

//! gRPC client that uses the structure definitions found in the `wtx_instances::grpc_bindings`
//! module.
//!
//! This snippet requires ~40 dependencies and has an optimized binary size of ~700K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use std::borrow::Cow;
use wtx::{
  data_transformation::dnsn::QuickProtobuf,
  grpc::Client,
  http::{client_framework::ClientFramework, ReqResBuffer, ReqResData},
};
use wtx_instances::grpc_bindings::wtx::{GenericRequest, GenericResponse};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let mut client = Client::new(ClientFramework::tokio(1).build(), QuickProtobuf);
  let mut rrb = ReqResBuffer::empty();
  rrb.uri.reset(format_args!("http://127.0.0.1:9000"))?;
  let res = client
    .send_unary_req(
      ("wtx", "GenericService", "generic_method"),
      GenericRequest {
        generic_request_field0: Cow::Borrowed(b"generic_request_value"),
        generic_request_field1: 123,
      },
      rrb,
    )
    .await?;
  let generic_response: GenericResponse = client.des_from_res_bytes(res.rrd.body())?;
  println!("{:?}", generic_response);
  Ok(())
}

Server Example

//! gRPC server that uses the structure definitions found in the `wtx_instances::grpc_bindings`
//! module.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use std::borrow::Cow;
use wtx::{
  data_transformation::dnsn::QuickProtobuf,
  grpc::{GrpcManager, GrpcMiddleware},
  http::{
    server_framework::{post, Router, ServerFrameworkBuilder, State},
    ReqResBuffer, StatusCode,
  },
  misc::{simple_seed, Xorshift64},
};
use wtx_instances::grpc_bindings::wtx::{GenericRequest, GenericResponse};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let router = Router::new(
    wtx::paths!(("wtx.GenericService/generic_method", post(wtx_generic_service_generic_method))),
    GrpcMiddleware,
  )?;
  ServerFrameworkBuilder::new(router)
    .with_req_aux(|| QuickProtobuf::default())
    .tokio_rustls(
      (wtx_instances::CERT, wtx_instances::KEY),
      &wtx_instances::host_from_args(),
      Xorshift64::from(simple_seed()),
      |error| eprintln!("{error}"),
      |_| Ok(()),
    )
    .await
}

async fn wtx_generic_service_generic_method(
  state: State<'_, (), GrpcManager<QuickProtobuf>, ReqResBuffer>,
) -> wtx::Result<StatusCode> {
  let _generic_request: GenericRequest =
    state.stream_aux.des_from_req_bytes(&state.req.rrd.body)?;
  state.req.rrd.clear();
  state.stream_aux.ser_to_res_bytes(
    &mut state.req.rrd.body,
    GenericResponse {
      generic_response_field0: Cow::Borrowed(b"generic_response_value"),
      generic_response_field1: 321,
    },
  )?;
  Ok(StatusCode::Ok)
}

HTTP/2

Implementation of RFC7541 and RFC9113. HTTP/2 is the second major version of the Hypertext Transfer Protocol, introduced in 2015 to improve web performance, it addresses limitations of HTTP/1.1 while maintaining backwards compatibility.

Passes the hpack-test-case and the h2spec test suites. Due to official deprecation, prioritization is not supported and due to the lack of third-party support, server-push is also not supported.

To use this functionality, it is necessary to activate the http2 feature.

Client Example

//! Fetches an URI using low-level HTTP/2 resources.
//!
//! This snippet requires ~25 dependencies and has an optimized binary size of ~700K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use tokio::net::TcpStream;
use wtx::{
  http::{Method, ReqResBuffer, Request},
  http2::{Http2Buffer, Http2ErrorCode, Http2Params, Http2Tokio},
  misc::{from_utf8_basic, simple_seed, Uri, Xorshift64},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = Uri::new("http://www.example.com");
  let (frame_reader, mut http2) = Http2Tokio::connect(
    Http2Buffer::new(Xorshift64::from(simple_seed())),
    Http2Params::default(),
    TcpStream::connect(uri.hostname_with_implied_port()).await?.into_split(),
  )
  .await?;
  let _jh = tokio::spawn(frame_reader);
  let rrb = ReqResBuffer::empty();
  let mut stream = http2.stream().await?;
  stream.send_req(Request::http2(Method::Get, b"Hello!"), &uri.to_ref()).await?;
  let (_, res_rrb) = stream.recv_res(rrb).await?;
  stream.common().clear(false).await?;
  println!("{}", from_utf8_basic(&res_rrb.body)?);
  http2.send_go_away(Http2ErrorCode::NoError).await;
  Ok(())
}

Server Example

//! HTTP/2 server that uses optioned parameters.

extern crate tokio;
extern crate tokio_rustls;
extern crate wtx;
extern crate wtx_instances;

use tokio::{io::WriteHalf, net::TcpStream};
use tokio_rustls::server::TlsStream;
use wtx::{
  http::{
    is_web_socket_handshake, AutoStream, ManualServerStreamTokio, OperationMode, OptionedServer,
    ReqResBuffer, Response, StatusCode,
  },
  http2::{Http2Buffer, Http2Params, WebSocketOverStream},
  misc::{simple_seed, TokioRustlsAcceptor, Vector, Xorshift64},
  web_socket::{Frame, OpCode},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  OptionedServer::http2_tokio(
    &wtx_instances::host_from_args(),
    auto,
    || {
      Ok((
        (),
        Http2Buffer::new(Xorshift64::from(simple_seed())),
        Http2Params::default()
          .set_enable_connect_protocol(true)
          .set_max_hpack_len((128 * 1024, 128 * 1024)),
      ))
    },
    |error| eprintln!("{error}"),
    manual,
    |_, protocol, req, _| {
      Ok((
        (),
        if is_web_socket_handshake(&mut req.rrd.headers, req.method, protocol) {
          OperationMode::Manual
        } else {
          OperationMode::Auto
        },
      ))
    },
    || Ok((Vector::new(), ReqResBuffer::empty())),
    (
      || {
        TokioRustlsAcceptor::without_client_auth()
          .http2()
          .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY)
      },
      |acceptor| acceptor.clone(),
      |acceptor, stream| async move { Ok(tokio::io::split(acceptor.accept(stream).await?)) },
    ),
  )
  .await
}

async fn auto(
  _: (),
  mut ha: AutoStream<(), Vector<u8>>,
) -> Result<Response<ReqResBuffer>, wtx::Error> {
  ha.req.rrd.clear();
  Ok(ha.req.into_response(StatusCode::Ok))
}

async fn manual(
  _: (),
  mut hm: ManualServerStreamTokio<(), Http2Buffer, Vector<u8>, WriteHalf<TlsStream<TcpStream>>>,
) -> Result<(), wtx::Error> {
  let rng = Xorshift64::from(simple_seed());
  hm.req.rrd.headers.clear();
  let mut wos = WebSocketOverStream::new(&hm.req.rrd.headers, false, rng, hm.stream).await?;
  loop {
    let mut frame = wos.read_frame(&mut hm.stream_aux).await?;
    match (frame.op_code(), frame.text_payload()) {
      (_, Some(elem)) => println!("{elem}"),
      (OpCode::Close, _) => break,
      _ => {}
    }
    wos.write_frame(&mut Frame::new_fin(OpCode::Text, frame.payload_mut())).await?;
  }
  wos.close().await?;
  Ok(())
}

HTTP Client Framework

High-level pool of HTTP clients that currently only supports HTTP/2. Allows multiple connections that can be referenced in concurrent scenarios.

To use this functionality, it is necessary to activate the http-client-framework feature.

Example

//! Fetches and prints the response body of a provided URI.
//!
//! This snippet requires ~25 dependencies and has an optimized binary size of ~700K.
//!
//! Currently, only HTTP/2 is supported.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use wtx::{
  http::{client_framework::ClientFramework, Method, ReqResBuffer},
  misc::{from_utf8_basic, Uri},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = Uri::new("http://www.example.com");
  let buffer = ReqResBuffer::empty();
  let client = ClientFramework::tokio(1).build();
  let res = client.send(Method::Get, buffer, &uri.to_ref()).await?;
  println!("{}", from_utf8_basic(&res.rrd.body)?);
  Ok(())
}

HTTP Server Framework

A small and fast to compile framework that can interact with many built-in features.

  • Databases
  • JSON
  • Middlewares
  • Streaming
  • URI router
  • WebSocket

If dynamic or nested routes are needed, then please activate the matchit feature. Without it, only simple and flat routes will work.

To use this functionality, it is necessary to activate the http-server-framework feature.

HTTP/2 Benchmarks

Example

//! An HTTP server framework showcasing nested routes, middlewares, manual streams, dynamic routes,
//! PostgreSQL connections and JSON deserialization/serialization.
//!
//! Currently, only HTTP/2 is supported.
//!
//! This snippet requires ~50 dependencies and has an optimized binary size of ~900K.

extern crate serde;
extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use core::{fmt::Write, ops::ControlFlow};
use tokio::net::{tcp::OwnedWriteHalf, TcpStream};
use wtx::{
  database::{Executor, Record},
  http::{
    server_framework::{
      get, post, Middleware, PathOwned, Router, SerdeJson, ServerFrameworkBuilder, StateClean,
    },
    ManualStream, ReqResBuffer, Request, Response, StatusCode,
  },
  http2::{Http2Buffer, Http2DataTokio, Http2ErrorCode, ServerStream},
  misc::{simple_seed, Xorshift64},
  pool::{PostgresRM, SimplePoolTokio},
};

type Pool = SimplePoolTokio<PostgresRM<wtx::Error, TcpStream>>;

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let router = Router::paths(wtx::paths!(
    ("/db/{id}", get(db)),
    ("/json", post(json)),
    (
      "/say",
      Router::new(wtx::paths!(("/hello", get(hello)), ("/world", get(world))), CustomMiddleware,)?,
    ),
    ("/stream", get(stream)),
  ))?;
  let rm = PostgresRM::tokio("postgres://USER:PASSWORD@localhost/DB_NAME".into());
  let pool = Pool::new(4, rm);
  ServerFrameworkBuilder::new(router)
    .with_req_aux(move || pool.clone())
    .tokio(
      &wtx_instances::host_from_args(),
      Xorshift64::from(simple_seed()),
      |error| eprintln!("{error:?}"),
      |_| Ok(()),
    )
    .await
}

#[derive(serde::Deserialize)]
struct DeserializeExample {
  _foo: i32,
  _bar: u64,
}

#[derive(serde::Serialize)]
struct SerializeExample {
  _baz: [u8; 4],
}

async fn db(
  state: StateClean<'_, (), Pool, ReqResBuffer>,
  PathOwned(id): PathOwned<u32>,
) -> wtx::Result<StatusCode> {
  let mut lock = state.stream_aux.get().await?;
  let record = lock.fetch_with_stmt("SELECT name FROM persons WHERE id = $1", (id,)).await?;
  let name = record.decode::<_, &str>(0)?;
  state.req.rrd.body.write_fmt(format_args!("Person of id `1` has name `{name}`"))?;
  Ok(StatusCode::Ok)
}

async fn hello() -> &'static str {
  "hello"
}

async fn json(_: SerdeJson<DeserializeExample>) -> wtx::Result<SerdeJson<SerializeExample>> {
  Ok(SerdeJson(SerializeExample { _baz: [1, 2, 3, 4] }))
}

async fn stream(
  mut manual_stream: ManualStream<
    (),
    ServerStream<Http2DataTokio<Http2Buffer, OwnedWriteHalf, false>>,
    Pool,
  >,
) -> wtx::Result<()> {
  manual_stream.stream.common().send_go_away(Http2ErrorCode::NoError).await;
  Ok(())
}

async fn world() -> &'static str {
  "world"
}

struct CustomMiddleware;

impl Middleware<(), wtx::Error, Pool> for CustomMiddleware {
  type Aux = ();

  #[inline]
  fn aux(&self) -> Self::Aux {
    ()
  }

  async fn req(
    &self,
    _: &mut (),
    _: &mut Self::Aux,
    _: &mut Request<ReqResBuffer>,
    _: &mut Pool,
  ) -> wtx::Result<ControlFlow<StatusCode, ()>> {
    println!("Inspecting request");
    Ok(ControlFlow::Continue(()))
  }

  async fn res(
    &self,
    _: &mut (),
    _: &mut Self::Aux,
    _: Response<&mut ReqResBuffer>,
    _: &mut Pool,
  ) -> wtx::Result<ControlFlow<StatusCode, ()>> {
    println!("Inspecting response");
    Ok(ControlFlow::Continue(()))
  }
}

Pool

An asynchronous pool of arbitrary objects where each element is dynamically created or re-created when invalid.

Can also be used for database connections, which is quite handy because it enhances the performance of executing commands and alleviates the use of hardware resources.

To use this functionality, it is necessary to activate the pool feature.

Example

//! Minimal code that shows the creation of a management structure that always yields `123`.

extern crate tokio;
extern crate wtx;

use wtx::pool::{ResourceManager, SimplePoolTokio};

pub struct CustomManager;

impl ResourceManager for CustomManager {
  type CreateAux = ();
  type Error = wtx::Error;
  type RecycleAux = ();
  type Resource = i32;

  async fn create(&self, _: &Self::CreateAux) -> Result<Self::Resource, Self::Error> {
    Ok(123)
  }

  async fn is_invalid(&self, _: &Self::Resource) -> bool {
    false
  }

  async fn recycle(&self, _: &Self::RecycleAux, _: &mut Self::Resource) -> Result<(), Self::Error> {
    Ok(())
  }
}

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let pool = SimplePoolTokio::new(1, CustomManager);
  let resource = ***pool.get().await?;
  assert_eq!(resource, 123);
  Ok(())
}

UI tools

wtx-ui is a standalone crate intended to allow interactions with the wtx project through an user interface. At the current time only CLI interfaces are available.

  • Embeds SQL migrations for schema-manager. Activation feature is called embed-migrations.
  • Runs SQL migrations managed by schema-manager. Activation feature is called schema-manager or schema-manager-dev.
  • Performs very basic WebSocket Client/Server operations. Activation feature is called web-socket.
  • Makes requests to arbitrary URIs mimicking the interface of cURL. Activation feature is called http-client.

WebSocket

Implementation of RFC6455 and RFC7692. WebSocket is a communication protocol that enables full-duplex communication between a client (typically a web browser) and a server over a single TCP connection. Unlike traditional HTTP, which is request-response based, WebSocket allows real-time data exchange without the need for polling.

In-house benchmarks are available at https://c410-f3r.github.io/wtx-bench. If you are aware of other benchmark tools, please open a discussion in the GitHub project.

To use this functionality, it is necessary to activate the web-socket feature.

WebSocket Benchmark

Autobahn Reports

  1. fuzzingclient
  2. fuzzingserver

Compression

The "permessage-deflate" extension is the only supported compression format and is backed by the zlib-rs project that performs as well as zlib-ng.

To get the most performance possible, try compiling your program with RUSTFLAGS='-C target-cpu=native' to allow zlib-rs to use more efficient SIMD instructions.

No masking

Although not officially endorsed, the no-masking parameter described at https://datatracker.ietf.org/doc/html/draft-damjanovic-websockets-nomasking-02 is supported to increase performance. If such a thing is not desirable, please make sure to check the handshake parameters to avoid accidental scenarios.

To make everything work as intended both parties, client and server, need to implement this feature. For example, web browser won't stop masking frames.

Client Example

//! WebSocket CLI client that enables real-time communication by allowing users to send and
//! receive messages through typing.
//!
//! This snippet requires ~35 dependencies and has an optimized binary size of ~550K.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use tokio::{
  io::{AsyncBufReadExt, BufReader},
  net::TcpStream,
};
use wtx::{
  misc::{simple_seed, Uri, Xorshift64},
  web_socket::{Frame, OpCode, WebSocket, WebSocketBuffer},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let uri = Uri::new("ws://www.example.com");
  let mut ws = WebSocket::connect(
    (),
    [],
    false,
    Xorshift64::from(simple_seed()),
    TcpStream::connect(uri.hostname_with_implied_port()).await?,
    &uri.to_ref(),
    WebSocketBuffer::default(),
    |_| wtx::Result::Ok(()),
  )
  .await?;
  let mut buffer = Vec::new();
  let mut reader = BufReader::new(tokio::io::stdin());
  loop {
    tokio::select! {
      frame_rslt = ws.read_frame() => {
        let frame = frame_rslt?;
        match (frame.op_code(), frame.text_payload()) {
          (_, Some(elem)) => println!("{elem}"),
          (OpCode::Close, _) => break,
          _ => {}
        }
      }
      read_rslt = reader.read_until(b'\n', &mut buffer) => {
        let _ = read_rslt?;
        ws.write_frame(&mut Frame::new_fin(OpCode::Text, &mut buffer)).await?;
      }
    }
  }
  Ok(())
}

Server Example

//! Serves requests using low-level WebSockets resources along side self-made certificates.

extern crate tokio;
extern crate tokio_rustls;
extern crate wtx;
extern crate wtx_instances;

use tokio::net::TcpStream;
use tokio_rustls::server::TlsStream;
use wtx::{
  http::OptionedServer,
  misc::TokioRustlsAcceptor,
  web_socket::{OpCode, WebSocket, WebSocketBuffer},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  OptionedServer::web_socket_tokio(
    &wtx_instances::host_from_args(),
    None,
    || {},
    |error| eprintln!("{error}"),
    handle,
    (
      || {
        TokioRustlsAcceptor::without_client_auth()
          .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY)
      },
      |acceptor| acceptor.clone(),
      |acceptor, stream| async move { Ok(acceptor.accept(stream).await?) },
    ),
  )
  .await
}

async fn handle(
  mut ws: WebSocket<(), TlsStream<TcpStream>, &mut WebSocketBuffer, false>,
) -> wtx::Result<()> {
  let (mut common, mut reader, mut writer) = ws.parts_mut();
  loop {
    let mut frame = reader.read_frame(&mut common).await?;
    match frame.op_code() {
      OpCode::Binary | OpCode::Text => {
        writer.write_frame(&mut common, &mut frame).await?;
      }
      OpCode::Close => break,
      _ => {}
    }
  }
  Ok(())
}

WebSocket over HTTP/2

At the current time only servers support the handshake procedure defined in RFC8441.

While HTTP/2 inherently supports full-duplex communication, web browsers typically don't expose this functionality directly to developers and that is why WebSocket tunneling over HTTP/2 is important.

  1. Servers can efficiently handle multiple concurrent streams within a single TCP connection
  2. Client applications can continue using existing WebSocket APIs without modification

For this particular scenario the no-masking parameter defined in https://datatracker.ietf.org/doc/html/draft-damjanovic-websockets-nomasking-02 is also supported.

To use this functionality, it is necessary to activate the http2 and web-socket features.

Example

//! Low level HTTP/2 server that only accepts one WebSocket stream for demonstration purposes.
//!
//! It is worth noting that the optioned server in the `http2-server` example automatically
//! handles WebSocket connections.

extern crate tokio;
extern crate wtx;
extern crate wtx_instances;

use core::mem;
use tokio::net::TcpListener;
use wtx::{
  http::{is_web_socket_handshake, Headers, ReqResBuffer},
  http2::{Http2Buffer, Http2Params, Http2Tokio, WebSocketOverStream},
  misc::{simple_seed, Either, TokioRustlsAcceptor, Vector, Xorshift64},
  web_socket::{Frame, OpCode},
};

#[tokio::main]
async fn main() -> wtx::Result<()> {
  let listener = TcpListener::bind(&wtx_instances::host_from_args()).await?;
  let mut rng = Xorshift64::from(simple_seed());
  let (tcp_stream, _) = listener.accept().await?;
  let acceptor = TokioRustlsAcceptor::without_client_auth()
    .http2()
    .build_with_cert_chain_and_priv_key(wtx_instances::CERT, wtx_instances::KEY)?;
  let (frame_reader, mut http2) = Http2Tokio::accept(
    Http2Buffer::new(&mut rng),
    Http2Params::default()
      .set_enable_connect_protocol(true)
      .set_max_hpack_len((128 * 1024, 128 * 1024)),
    tokio::io::split(acceptor.accept(tcp_stream).await?),
  )
  .await?;
  tokio::spawn(frame_reader);
  let (mut stream, headers_opt) = match http2
    .stream(ReqResBuffer::empty(), |req, protocol| {
      let rslt = is_web_socket_handshake(&req.rrd.headers, req.method, protocol);
      rslt.then(|| mem::take(&mut req.rrd.headers))
    })
    .await?
  {
    Either::Left(_) => return Ok(()),
    Either::Right(elem) => elem,
  };
  let Some(_headers) = headers_opt else {
    return Ok(());
  };
  let mut buffer = Vector::new();
  let mut wos = WebSocketOverStream::new(&Headers::new(), false, rng, &mut stream).await?;
  loop {
    let mut frame = wos.read_frame(&mut buffer).await?;
    match (frame.op_code(), frame.text_payload()) {
      (_, Some(elem)) => println!("{elem}"),
      (OpCode::Close, _) => break,
      _ => {}
    }
    wos.write_frame(&mut Frame::new_fin(OpCode::Text, frame.payload_mut())).await?;
  }
  wos.close().await?;
  stream.common().clear(false).await?;
  Ok(())
}