# Introduction

This book aims to describe a superior approach to build heavily asynchronous and distributed applications based on the actor model. Major part of the book is about the elfo framework and is illustrated with best practices of its usage. The second part of the book tells you about the best corporate practices of asynchronous applications' architecture.

## Goals

• Assist in building fault-tolerant systems.
• Be performant enough for low-latency systems.
• Be observable, provide a lot of metrics to detect problems.
• Provide built-in support of exposing log events, dumps, metrics, and trace events.
• Distributing actors across several machines should be as simple as possible.

## Non-goals

• Provide the most performant way to communicate between actors.
• Provide any HTTP server.

## Features

• Asynchronous actors with supervision and custom life cycle.
• Two-level routing system: between actor groups (pipelining) and inside them (sharding).
• Multiple protocols: actors (so-called gates) can handle messages from different protocols.
• Multiple patterns of communication: regular messages, request-response (TODO: subscriptions).
• Config updating and distribution on the fly.
• Appropriate for both low latency and high throughput tasks.
• Tracing: all messages have trace_id that spread across the system implicitly.
• Telemetry (via the metrics crate).
• Dumping: messages can be stored for further debugging.
• Seamless distribution across nodes TODO.
• Utils for simple testing.
• Utils for benchmarking TODO.

# Comparison aka "Why not X"?

## Where X = CSP

Usage of CSP in Rust can be illustrated in the following example:

// "Processes"
async fn read_file(path: &str, tx: Sender<Chunk>) { .. }
async fn decode_chunks(rx: Receiver<Chunk>, tx: Sender<SomeEvent>) { .. }
async fn process_events(rx: Receiver<SomeEvent>) { .. }

// "Channels"
let (chunks_tx, chunks_rx) = channel(100);
let (events_tx, events_rx) = channel(100);

spawn(decode_chunks(chunks_rx, events_tx));
spawn(process_events(events_rx));


The CSP approach is a perfect solution that doesn't require expertise in any frameworks for tools or simple applications with well-defined technical specifications and a small number of communications between processes. If this is your case, just use CSP.

However, complex applications tend to get more and more complicated over time, and their development and maintenance quickly become harder than in the actor model.

### Pros of CSP

• Implementation of channels can be chosen by a developer, while actor frameworks determine a mailbox implementation.
• Processes can share the same channel using MPMC channels to implement work-stealing behavior. It's not an option for actors, where a mailbox is owned by exactly one actor.

### Cons of CSP

• Processes in CSP are anonymous, while actors have identities. It means it's hard to distinguish logs and metrics because processes don't have names. Thus, the observability of CSP is much worse than that of the actor model.
• Actors are more decoupled; they can be discovered using some sort of service locators and even changed on the fly, e.g., due to restart.
• Actors can be distributed across several machines because they don't have to send messages directly to a mailbox; they can have a network before it.
• To add more connections between processes, we need to use more channels in one case and combine messages into big enumerations with unrelated items in other cases.

TODO

TODO

# Actors

The most important part of the actor model is, of course, the actor itself. It can be challenging to give the exact definition of this term. However, we can define an actor through its properties:

• An actor is a unit of logic encapsulation

Actors solve a specific task instead of doing it all at once.

• An actor is a unit of scheduling

Different threads cannot execute the same actor simultaneously1. However, many actors are executed concurrently, often parallel in many threads.

• An actor is a unit of data encapsulation

Actors shouldn't share their data with other actors, shouldn't expose implementation details, etc. Do not communicate by sharing memory; instead, share memory by communicating.

• An actor is a unit of failure encapsulation

Actors can fail, and it doesn't affect the work of other actors directly2.

• An actor is a unit of communication

Actors can communicate with others by sending and receiving messages. Actors are uniquely identified by their addresses.

These properties allow us to build highly scalable and fault-tolerant systems relatively thinkable and straightforwardly without using complex concurrent data structures.

1

Actually, sometimes it's useful to use rayon or alternatives. It's possible in elfo by wrapping threads into an actor's scope.

2

Actors should be designed to be tolerant to failures of other actors.

## A Mailbox

Every actor has his own mailbox, a queue containing envelopes sent by other actors to this one.

What's the envelope? The envelope is a wrapper around a message that includes also some useful metadata: the sender's address, time of sending moment, and some other information that is not so important for now.

A mailbox is the main source of messages for any actor. Messages are handled sequentially.

A mailbox can become full if the corresponding actor doesn't have time to process the message flow. In this case, the sending actor can decide to drop the message, wait for space in the mailbox or resend after some time. Such strategies will be discussed later.

## Functional actors

Let's define a some simple actor using elfo and figure out what's happening.

The simplest way to define an actor is functional style.

For example, let's define the simplest counter:

use elfo::prelude::*;

#[message]
pub struct Increment {
pub delta: u32,
}

#[message(ret = u32)]
pub struct GetValue;

pub fn counter() -> Schema {
ActorGroup::new().exec(|mut ctx| async move {
// Private state of the actor.
let mut value = 0;

// The main actor loop: receive a message, handle it, repeat.
// Returns None and breaks the loop if actor's mailbox is closed
// (usually when the system terminates).
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
Increment { delta } => {
value += delta;
},
// It's a syntax for requests.
(GetValue, token) => {
// ... and responses.
ctx.respond(token, value);
},
})
}
})
}


We haven't discussed actor groups yet, so don't pay attention for now.

Instead, let's talk about other things in the example:

• ctx.recv() allows us to wait for the next message asynchronously. Thus, if the mailbox is empty, the actor will return control to the scheduler instead of spending CPU cycles or sleeping.
• msg! allows us to unpack envelopes and match against different types of messages. It's required, because Rust's match must include patterns for the same data type only. However, we want to support different messages, often defined in different crates. Also, reusing the match syntax is highly desired in order to work well with tooling like rustfmt and rust-analyzer.
• (RequestType, token) is the syntax for handling requests. token can be used no more than once, thanks to Rust, so we cannot accidentally respond to the request twice. Also, the compiler will warn if we forget to handle token. If the token is explicitly dropped without responding, the sending side will get the special error and decide whether it's normal or not.

Now let's define another actor to communicate with the counter:

use elfo::prelude::*;
use counter::{Increment, GetValue};

pub fn sample() -> Schema {
ActorGroup::new().exec(|ctx| async move {
// Increment the counter, we aren't interested in errors.
let _ = ctx.send(Increment { delta: 1 }).await;
// ... and again.
let _ = ctx.send(Increment { delta: 3 }).await;

// Request the current counter's value and wait for the response.
if let Ok(value) = ctx.request(GetValue).resolve().await {
tracing::info!(value, "got it!");
}
}
}


We haven't connected our actors in any way, this will be discussed later.

TODO

# Actor lifecycle

An actor goes through several stages in life. Transitions between stages are accompanied by statuses. Statuses help us to understand better what's happening with actors. So, a good way to understand actor lifecycle is to get familiar with statuses.

## Statuses

• Initializing

An initial status. The actor doesn't handle incoming messages and is doing some initialization, e.g. subscribing to other actors, collecting an initial state, connecting to DB, etc.

• Normal

The actor handles incoming messages.

This status appears on first ctx.(try_)recv() call.

• Terminating

An actor is preparing to termination, e.g. doing some cleanup, flushing data, etc.

It happens when the actor's mailbox is closed and all messages are handled. Additionally, if the actor uses TerminationPolicy::manually, it also happens when Terminate is received.

• Terminated

A terminal status. The actor's exec() finished without errors.

• Alarming

The actor has some long term problem, but still handles messages, maybe in a special way.

Currently, this status can be set manually only.

• Failed

A terminal status. The actor panicked or his exec() returns Err.

## Built-in status transitions

The schema doesn't include the Alarming status, because it can be set only manually for now.

From the point of view of the main actor's loop:

async fn exec(mut ctx: Context) {
// Status: Initializing
//  subscribe to other actors, connect to DB, etc

while let Some(envelope) = ctx.recv().await {
// Status: Normal
//  handle messages
}

// Status: Terminating
//  make cleanup, flush data, etc
} // Status: Terminated or Failed


## Manual status management

It's possible to avoid managing statuses totally, built-in logic is reasonable enough. However, with the increasing complexity of actors, it can be helpful to provide more information about the current status.

The basic way to change status:

ctx.set_status(ActorStatus::ALARMING);


Also, details can be provided with each status:

ctx.set_status(ActorStatus::INITIALIZING.with_details("loading state"));


## Subscribing to actor's statuses

TODO: SubscribeToActorStatuses, ActorStatusReport

# Communication

Actors can communicate in many ways depending on the situation and desired guarantees.

## Fire and Forget

The most straightforward way is to send a message with minimal guarantees.

However, it's possible to choose the desired behavior by calling the most appropriate method. All variants can be described by the following template: (try_|unbounded_)send(_to).

Methods with the _to suffix allow to specify a destination address as the first argument. Otherwise, the routing subsystem will be used.

The prefix describes what should happen if the destination mailbox is full:

SyntaxError casesDescription
send().awaitClosedBlocks until a destination mailbox has space for new messages
try_send()Full, ClosedReturns Full if the mailbox is full
unbounded_send()ClosedForcibly increase the mailbox, not implemented yet

All methods can return Closed if the destination actor is closed.

The form send().await is used when desired behavior is backpressure, while try_send() is for actors with predicted latency, when it's acceptable to lose messages or it can be handled manually. unbounded_send() should be avoided because it can increase mailboxes unpredictably, leading to OOM.

### Examples

#[message]
struct SomeMessage;

// Do not care if the target actor is closed or full.
let _ = ctx.try_send(SomeMessage);

// Block if the destination mailbox is full and ignore if closed.
let _ = ctx.send(SomeMessage).await;

// Fail the current actor if the destination is closed.
ctx.send(SomeMessage).await?;

// Manually implement backpressure, e.g. store messages.
if let Err(err) = ctx.try_send(SomeMessage) {
let msg = err.into_inner(); // msg: SomeMessage
}


## Blocking Request-Response

Some communications between actors require response message being sent back to the sender.

TODO

### Examples

#[message(ret = Result<(), DoSomethingRejected>)]
struct DoSomething;

#[message]
struct DoSomethingRejected(String);

TODO


TODO

TODO

TODO

TODO

TODO

# Actor Groups

Before talking about actor groups, we need to consider another vital topic: scaling.

## Scaling

Scaling is helpful both to increase throughput of a system and reduce latency.

Two known ways to do scaling are pipelining and sharding.

### Pipelining

Pipelining implies dividing the system into several sequential parts. Thus, instead of doing all work in one actor, different actors do the work in parts.

Pipelining increases minimal possible latency because of overhead costs but can reduce maximal latency because of parallel execution of different parts. Throughput is the minimum of parts' throughputs. However, total throughput increases because every actor in the pipeline does less work.

Usually, it's not necessary to think about pipelining because well-designed systems are already divided into multiple actors according to their responsibilities, not because of performance requirements.

### Sharding

Sharding implies multiple running actors with the same, possible parameterized, code and responsibility.

Sharding requires some work to route messages to the corresponding shard. Thus, throughput is increased sublinearly. Similarly to pipelining, maximal latency is reduced because arrival messages see ahead of themself fewer messages.

## Actor Groups

Actor groups in elfo are a solution for the sharding problem. Each actor has some actor key, a unique label inside a group, that can be used for routing purposes.

The group's router is not an actor but some shared code that's executed on sending side to determine which actor should receive a message. Usually, routers are stateless; thus, this approach is more performant and scalable than routers implemented as separate actors.

elfo doesn't support running actors without actor groups. Instead, it's ok to use groups with only one actor inside if it's meaningless to do sharding for now.

Note that actor keys can be arbitrary structures, not mandatory strings.

See the next chapter to get more details about routing.

## Stability

TODO: move to the first usage.

It's useful to know the term stability. Stable systems are restricted only by input rate and don't have 100% utilization. We usually want to minimize latency in such systems while keeping throughput higher than real-life requirements. Most of systems are supposed to be stable and have predictable latency. Unstable systems have 100% utilization and reach their throughput. For instance, ETL systems.

# Routing

Routing is the process of searching for the actor that will handle a sent message. In order to append the message to his mailbox, we need to discover the actor's address.

elfo offers a two-level routing system. What does it mean? Messages can pass through up to two steps of routing, depending on used methods, as shown on the following diagram:

1. If we don't know any address (ctx.(try_)send(msg) and ctx.request(msg)), the inter-group router is called to determine which groups are interested in the message. Then, the corresponding inner-group router is called for each interested group to decide which shards should receive the message.
2. Only the inner-group router is called if we already know a group's address (ctx.(try_)send_to(group_addr, msg) and ctx.request_to(group_addr, msg)).
3. If we already know an actor's address (ctx.(try_)send_to(actor_addr, msg) and ctx.request_to(actor_addr, msg)), nothing additional is done because we already know which actor should handle the message.

Note that if several actors are interested in the message, each receives a copy of the message. It's ok for messages without heap-allocated fields or for rare messages, but for the big ones consider wrapping into Arc to reduce the clone() overhead.

## Inter-group routing

Inter-group routing is responsible for connecting actor groups among themselves.

Let's consider the following architecture scheme:

Actor groups and connections between them are defined in so-called "topology":

fn topology(config_path: &str) -> elfo::Topology {
let topology = elfo::Topology::empty();
let logger = elfo::logger::init();

// Define system groups.
let loggers = topology.local("system.loggers");
let telemeters = topology.local("system.telemeters");
let dumpers = topology.local("system.dumpers");
let pingers = topology.local("system.pingers");
let configurers = topology.local("system.configurers").entrypoint();

// Define user groups.
let group_a = topology.local("group_a");
let group_b = topology.local("group_b");
let group_c = topology.local("group_c");

// Define connections between user actor groups.
group_a.route_to(&group_b, |e| { // "e" means "Envelope"
msg!(match e {
MessageX | MessageY => true,
_ => false,
})
});
group_a.route_to(&group_c, |e| {
msg!(match e {
MessageX => true,
_ => false,
})
});
group_b.route_to(&group_c, |e| {
msg!(match e {
MessageZ => true,
_ => false,
})
});
group_c.route_to(&group_a, |e| {
msg!(match e {
MessageW => true,
_ => false,
})
});

// Mount specific implementations.
loggers.mount(logger);
telemeters.mount(elfo::telemeter::new());
dumpers.mount(elfo::dumper::new());
pingers.mount(elfo::pinger::new(&topology));

// Actors can use topology as a service locator.
// Usually it should be used for utilities only.
configurers.mount(elfo::configurer::from_path(&topology, config_path));

group_a.mount(group_a::new());
group_b.mount(group_b::new());
group_c.mount(group_c::new());

topology
}


Not all messages should be specified at this level. Usually, only requests and some multicast events are specified here, but not messages that will be passed directly, such as responses or events of subscriptions.

## Inner-group routing

Inner-group routing is responsible for choosing which shards should handle incoming messages.

The inner-group router is defined next to the actor implementation, in the group declaration.

### Stateless router

use elfo::routers::{Outcome, MapRouter};

ActorGroup::new()
.router(MapRouter::new(|e| {
msg!(match e {
MessageX { key, .. } => Outcome::Unicast(key),
MessageY { keys, .. } => Outcome::Multicast(keys.to_vec()),
_ => Outcome::Default,
})
}))
.exec(exec);


Possible Outcome's variants:

• Outcome::Unicast sends the message only to the actor with a specified key. If there is no active or restarting actor for the key, the new one will be started.
• Outcome::GentleUnicast works like Outcome::Unicast, but doesn't lead to spawning new actors, instead a message is discarded.
• Outcome::Multicast sends to several actors. New actors will be started.
• Outcome::GentleMulticast works like Outcome::Multicast, but doesn't lead to spawning new actors, instead a message is discarded.
• Outcome::Broadcast sends to all active actors.
• Outcome::Discard drops the message (that leads to an error on sending side).
• Outcome::Default works as Outcome::Discard for user-defined messages, but as Outcome::Broadcast for the system ones, such as UpdateConfig, ValidateConfig, Terminate and so on.

### Stateful router

Sometimes we need to use a router with rarely changing state. If the state should be changed often, consider using a dedicated actor as a router.

To create a stateful router use the MapRouter::with_state constructor. Note that the state type should implement Default, Send and Sync.

ActorGroup::new()
.router(MapRouter::with_state(
// Called if the config is changed.
|config: &Config, state| make_new_state(..),
// The routing function.
|e, state| {
msg!(match e { .. })
}
))
.exec(exec);


The state is recreated every time when the config is changed. Useful when all needed information (e.g. a list of actors) can be extracted from the config. Note that recreation doesn't block execution; the state is atomically replaced only once make_new_state(..) is finished.

# Supervision

## Startup

Once the system is started, the system.init actor starts all actors marked as entrypoint() in the topology. Usually, it's only the system.configurers group. Entry points must have empty configs.

Then, system.configurers loads the config file and sends UpdateConfig to all actor groups. Thus, if you need to start actors, the UpdateConfig message must be routed to the necessary keys.

Note that the actor system terminates immediately if the config file is invalid at startup (elfo::start() returns an error).

## Reconfiguration

Reconfiguration can be caused in several cases:

• The configurer receives ReloadConfigs or TryRealodConfigs.
• The process receives SIGHUP, it's an equivalent to receiving ReloadConfigs::default().
• The process receives SIGUSR2, it's an equivalent to receiving ReloadConfigs::default().with_force(true).

What's the difference between default behavior and with_force(true) one? By default, group's up-to-date configs aren't sent across the system. In the force mode, all configs are updated, even unchanged ones.

The reconfiguration process consists of several stages:

1. Config validation: the configurer sends the ValidateConfig request to all groups and waits for responses.
2. If all groups respond Ok(_) or ignore the request, the config is considered valid, and the configurer sends UpdateConfig to all groups. If at least one actor responds Err(_), reconfiguration is aborted, and a new config isn't used.

More information about configs and the reconfiguration process is available on the corresponding page.

## Restart

Three restart policies are implemented now:

• RestartPolicy::on_failures (by default) — actors are restarted only after failures (the exec function returned Err or panicked).
• RestartPolicy::always — actors are restarted after termination (the exec function returned Ok or ()) and failures.
• RestartPolicy::never — actors are never restarted. However, they can be started again on incoming messages.

If the actor is scheduled to be restarted, incoming messages cannot spawn another actor for his key.

Repetitive restarts are limited by a linear backoff mechanism:

• If there are no restarts for 5s, restart an actor immediately.
• Otherwise, increase restart time by 5s and schedule restarting.
• Maximum restart time is limited by 30s.

These constants aren't configurable for now (elfo#62).

The restart policy can be chosen while creating ActorGroup:

use elfo::group::{RestartPolicy, ActorGroup};

ActorGroup::new().restart_policy(RestartPolicy::always())
ActorGroup::new().restart_policy(RestartPolicy::on_failures())
ActorGroup::new().restart_policy(RestartPolicy::never())


## Termination

System termination is started by the system.init actor in several cases:

• Received SIGTERM or SIGINT signals, Unix only.
• Received CTRL_C_EVENT or CTRL_BREAK_EVENT events, Windows only.
• Too high memory usage, Unix only. Now it's 90% (ratio of RSS to total memory) and not configurable (elfo#60).

The process consists of several stages:

1. system.init sends to all user-defined groups "polite" Terminate::default() message.
• Groups' supervisors stop spawning new actors.
• Terminate is routed as a usual message with Outcome::Broadcast by default.
• For TerminationPolicy::closing (by default): the mailbox is closed instantly, ctx.recv() returns None after already stored messages.
• For TerminationPolicy::manually: the mailbox isn't closed, ctx.recv() returns Terminate in order to handle in on your own. Use ctx.close() to close the mailbox.
2. If some groups haven't terminated after 30s, system.init sends Terminate::closing() message.
• Terminate is routed as a usual message with Outcome::Broadcast by default.
• For any policy, the mailbox is closed instantly, ctx.recv() returns None after already stored messages.
3. If some groups haven't terminated after another 15s, system.init stops waiting for that groups.
4. Repeat the above stages for system groups.

Timeouts above aren't configurable for now (elfo#61).

The termination policy can be chosen while creating ActorGroup:

use elfo::group::{TerminationPolicy, ActorGroup};

ActorGroup::new().termination_policy(TerminationPolicy::closing())
ActorGroup::new().termination_policy(TerminationPolicy::manually())


TODO

TODO

# Telemetry

## Introduction

TODO: metrics crate, metric types.

All metrics are provided with the actor_group and, optionally, actor_key labels. The last one is added for actor groups with enabled system.telemetry.per_actor_key option.

TODO: tips, prefer increment_gauge! over gauge!

## Configuration

Telemetry can be configured separately for each actor group. Possible options and their default values:

[some_group]
system.telemetry.per_actor_group = true
system.telemetry.per_actor_key = false


Note that using per_actor_key can highly increase a number of metrics. Use it only for low cardinality groups.

TODO: elfo-telemeter config.

## Built-in metrics

elfo is shipped with a lot of metrics. All of them start with the elfo_ prefix to avoid collisions with user defined metrics.

### Statuses

• Gauge elfo_active_actors{status}

The number of active actors in the specified status.

• Gauge elfo_restarting_actors

The number of actors that will be restarted after some time.

• Counter elfo_actor_status_changes_total{status}

The number of transitions into the specified status.

### Messages

• Counter elfo_sent_messages_total{message, protocol}

The number of sent messages.

• Summary elfo_message_handling_time_seconds{message, protocol}

Spent time on handling the message, measured between (try_)recv() calls. Used to detect slow handlers.

• Summary elfo_message_waiting_time_seconds

Elapsed time between send() and corresponding recv() calls. Usually it represents a time that a message spends in a mailbox. Used to detect places that should be sharded to reduce a total latency.

• Summary elfo_busy_time_seconds

Spent time on polling a task with an actor. More precisely, the time for which the task executor is blocked. Equals to CPU time if blocking IO isn't used.

### Log events

• Counter elfo_emitted_events_total{level}

The number of emitted events per level (Error, Warn, Info, Debug, Trace).

• Counter elfo_limited_events_total{level}

The number of events that haven't been emitted because the limit was reached.

• Counter elfo_lost_events_total{level}

The number of events that hasn't been emitted because the event storage is full.

### Dump events

• Counter elfo_emitted_dumps_total

The number of emitted dumps.

• Counter elfo_limited_dumps_total

The number of dumps that haven't been emitted because the limit was reached.

• Counter elfo_lost_dumps_total

The number of dumps that hasn't been emitted because the dump storage is full.

### Other metrics

TODO: specific to elfo-logger, elfo-dumper, elfo_telemeter

## Derived metrics

TODO

### Incoming/outgoing rate

TODO

rate(elfo_message_handling_time_seconds_count{actor_group="${actor_group:raw}",actor_key=""}[$__rate_interval])


### Waiting time

TODO

rate(elfo_message_waiting_time_seconds{actor_group="${actor_group:raw}",actor_key="",quantile=~"0.75|0.9|0.95"}[$__rate_interval])


### Utilization

TODO

rate(elfo_message_handling_time_seconds_sum{actor_group="${actor_group:raw}",actor_key=""}[$__rate_interval])


### Executor utilization (≈ CPU usage)

TODO

The time for which the task executor is blocked. Equals to CPU time if blocking IO isn't used.

rate(elfo_busy_time_seconds_sum[\$__rate_interval])


TODO

TODO

# Dumping

## Introduction

Dumping is the process of storing incoming and outgoing messages for every actor, including ones from mailboxes and all other sources like timers, streams, and so on. The primary purpose is future tracing, but it also can be used for regression testing.

Dumping has a lot of common with logging, but it's more efficient and optimized for storing a lot of messages, so it has high throughput requirements instead of low latency like in the logging task, where records are supposed to be delivered as soon as possible, especially warnings and errors.

## Usage

### Enable dumping

Dumping is disabled by default until the topology contains the system.dumpers group.

Elfo provides the default implementation for such group, that's available with the full feature and exported as elfo::dumper:

let topology = elfo::Topology::empty();
let dumpers = topology.local("system.dumpers");

// ...

dumpers.mount(elfo::dumper::new());


Besides this, the path to a dump file must be specified in the config:

[system.dumpers]
path = "path/to/dump/file.dump"


### Configure dumping on a per-group basis

Dumping settings can be specified for each actor group individually. Note that the settings can be changed and applied on the fly.

Example:

[some_actor_group]
system.dumping.disabled = true      # false by default
system.dumping.rate_limit = 100500  # 100000 by default


Dumps above the rate limit are lost, but the sequence number is incremented anyway to detect missed messages later.

### Configure dumping on a per-message basis

Simply add the message(dumping = "disabled") attribute to the message. Another and default value of the attribute is "full".

#[message(dumping = "disabled")]
pub struct SomethingHappened {
// ...
}


### Shorten fields of a message

Sometimes the content of messages is too large, for instance, in writing a backend for graph plotting, where every response can contain thousands of points. We don't want to lose additional information about responses, but saving whole messages is very expensive in this case.

For this situation, elfo provides a helper to hide specified fields during serialization, but only in the dumping context. So, these messages still will be properly sent over the network, where serialization is used too.

#[message]
pub struct ChunkProduced {
pub graph_id: GraphId,
#[serde(serialize_with = "elfo::dumping::hide")]
pub points: Vec<(f64, f64)>,   // will be dumped as "<hidden>"
}


Such messages cannot be deserialized properly; that's ok until they are used as input for [regression testing][regression].

TODO

## Local storage

The default implementation of dumpers writes all dumps to a file on the local file system.

Even home-purpose SSDs can achieve 3GiB/s in 2021, which should be more than enough to avoid a bottleneck in this place.

Dumps are stored in an uncompressed way so that they can take a lot of space. So, it's essential to rotate the dump file timely and delete outdated ones.

Note that message ordering between actor groups (and even inside the same actor) can be easily violated because of implementation details. Therefore, in the case of reading from local dump files, you should sort rows by the timestamp field.

## The structure of dump files

Dump files contain messages in the newline-delimited JSON format. Each line is object containing the following properties:

• g — an actor group's name
• k — an optional actor's key
• n — node_no
• ssequence_no, unique inside an actor group
• ttrace_id
• ts — timestamp
• d — direction, "In" or "Out"
• cl — an optional class
• mn — a message's name
• mp — a message's protocol, usually a crate, which contains the message
• mk — a message's kind, "Regular", "Request" or "Response"
• m — a nullable message's body
• c — an optional correlation id, which links requests with corresponding responses

Terms:

• optional means that the property can be omitted, but if it's present, then its value isn't null.
• nullable means that the property is present always, but the value can be null.

The sequence_no field can be used to detect missed messages because of limiting.

TODO: note about classes

## Dump file rotation

elfo::dumper doesn't use any kind of partitioning and relies on an external file rotation mechanism instead. It means some additional configuration is required, but it provides more flexibility and simplifies the dumper.

The dumper listens to the SIGHUP signal to reopen the active dump file. Besides this, the dumper accepts the elfo::dumper::ReopenDumpFile command.

The most popular solution for file rotation is, of course, logrotate.

TODO: logrotate config

Tip: if dumps are not supposed to be delivered to DB, use hard links to save the dump file for later discovery and avoid deletion.

## Remote storage

Depending on your goals, you may or may not want to send your dump files to remote file storage. It can highly improve search capabilities (primarily because of indexing trace_id) and space usage but requires additional infrastructure for dumping. Usually, it's ok for many services to use only local storage. Dumps are stored in a time-ordered way and, thanks to the structure of trace_id, can be used for good enough search. However, elfo doesn't provide the utility to search these files for now.

Ok, so you want to store dumps in DB. The right choice, if you can afford it. What should you do?

The common schema looks like

TODO: add a link to the example with vector.dev and clickhouse

## Implementation details

TODO: implementation has been changed a lot, need to update the section

At a top level, dumping is separated into two parts: the dumping subsystem and the dumper.

The dumping subsystem is based on sharded in-memory storage containing a limited queue of messages. We use a predefined number of shards for now, but we will likely use the number of available cores in the future. Every thread writes to its dedicated shard. Such an approach reduces contention and false sharing between threads.

The dumper sequentially, in a round-robin way, replaces the shard's queue with the extra one, then reads and serializes all messages and writes them to the dump file. All this work happens on a timer tick. Firstly, it's one of the simplest ways to get appropriate batching. Secondly, because the dumper uses tokio::task::spawn_blocking and blocking writes insides, that's more effective than using async tokio::fs directly. The timer approach allows us to reduce the impact on the tokio executor. However, this behavior is going to be improved for environments with io_uring in the future.

The dumper holds the lock only for a small amount of time to replace the queue inside a shard with another one, which was drained by the dumper on the previous tick. Thus, the actual number of queues is one more than shards.

All actors in a group share the same handler with some common things like sequence_no generator and rate limiter. Whenever an actor sends or receives a message, the handler is used to push message to the shard according to the current thread. Thus, messages produced by the same actor can reorder if it's migrated to another thread by a scheduler.

Yep, we can restore order in the dumper, but don't do it now because remote DB is doing it anyway. However, we can add the corresponding option in the future. It's not trivial, although.

# Tracing

TODO

## TraceId

This ID is periodically monotonic and fits great as a primary key for tracing entries. It was accomplished by using rarely (approx. once a year) wrapping timestamp ID component.

TraceId essentially is:

$\operatorname{trace\_id} = \operatorname{timestamp} \dot{} 2^{38} + \operatorname{node\_no} \dot{} 2^{22} + \operatorname{chunk\_no} \dot{} 2^{10} + \operatorname{counter}$

This formula's parameters were chosen carefully to leave a good stock of spare space for TraceId's components inside the available set of 63-bit positive integers. You may see the general idea by looking at the bits distribution table. From MSB to LSB:

BitsDescriptionRangeSource
1Reserved0-
25timestamp in secs0..=33_554_431Clock at runtime
16node_no0..=65_535Externally specified node (process) configuration
12chunk_no0..=4095Some number produced at runtime
10counter1..=1023A counter inside the chunk

The code generating TraceId of course can be optimized by using bit shifts for fast multiplication on a power of two:

trace_id = (timestamp << 38) | (node_no << 22) | (chunk_no << 10) | counter


TraceId uses time as its monotonicity source so timestamp is probably the most important part of the ID. Note that timestamp has pretty rough resolution — in seconds. How long you can count seconds inside 25 bits?

$\operatorname{timestamp}_{max} = \frac{2^{25} - 1}{60 \dot{} 60 \dot{} 24} = \frac{33554431}{86400} \approx 388 \text{ days}$

Which is almost a year plus 23 days. What happens when this almost-one-year term ends? timestamp starts counting from 0 once again:

TIMESTAMP_MAX = (1 << 25) - 1 // 0x1ff_ffff
timestamp = now_s() & TIMESTAMP_MAX


This means that primary key is guaranteed to act as a unique identifier for one year. To keep an order of entries between monotonic periods of TraceId use some fully monotonic (but not necessarily unique) field as a sorting key for your entries (created_at or seq_no / sequential_number).

TraceId with such timestamp part is well optimized to produce a lot of entities worth querying for only a limited period of time: logs, dumps or tracing entries. However you're able to store such entries as long as you like and use data skipping indices for quick time series queries.

To make IDs unique across several instances of your system without any synchronization between them node_no ID component should be externally specified from the system deployment configuration.

TODO: actualize information about thread_id: it's not used by elfo, but appropriate way to generate trace_id in other systems that want to interact with elfo.

thread_id shares bit space with counter. At the start of the system you should determine how much threads your node could possibly have and choose appropriate $$\operatorname{counter}_{max}$$ according to that.

Previous components segregated entries produced by separate threads on every instance of the system at different seconds. To create multiple unique IDs during a single second inside a single thread we use counter ID component.

Let's calculate how much records per second ($$\operatorname{RPS}_{max}$$) allows us to produce counter with the bounds we've chosen. Assuming we have 32 threads:

$\operatorname{RPS}_{max} = \frac{2^{22} - 1}{\operatorname{threads\_count}} = \frac{2^{22} - 1}{2^{5}} \approx 2^{17} \approx 1.3 \dot{} 10^{5} \text{ } \frac{\text{records}}{\text{second}}$

Which seems more than enough for the most of the applications. Note that counter is limited to be at least 1 to keep the invariant: $$\operatorname{id} \geqslant 1$$. Every other component of TraceId could be zero.

# Protocol Evolution

It's hard to update all nodes in distributed systems simultaneously. Firstly, it requires synchronization of releases and teams. Secondly, it means all nodes become unavailable until the update is finished, which can be highly unpleasant for users.

An alternative approach is updating nodes one by one without any kind of locks between them. However, they can become incompatible with each other because the communication protocol has evolved. That's where a protocol evolution appears. If a message is sent to another node, we should be more careful when changing it.

Communication is based on the msgpack format. It's close to JSON, but binary encoded and more feature-rich: it supports arbitrary map keys, special values of floats (NaN, ±inf), and binary data. However, the ability to evolve is the same, so the rules of evolution are also similar.

It's worth noting that the compatibility of nodes can be checked automatically on a per-message basis (because it's known where a message is used on the sender and receiver part). However, it's unimplemented right now.

## The common principle

Messages are defined in so-called protocol crates. They shouldn't contain any logic, only message definitions and convenient constructors, getters, and setters for them. To understand the common principle of evolution, it needs to distinguish nodes that send a message from those that receive it. Senders and receivers are compiled with different versions of the same protocol crate.

The common principle states: senders should send a more specific version of a message than receivers accept.

In this way, if a message in the protocol becomes more specific (e.g., it gets more fields), then senders must be updated first. And, vice versa, if a message becomes less specific (e.g., losing fields), receivers must be updated first. Some changes allow any orders of updating; see the next section for details.

In the case of downgrading nodes, the update order is the opposite.

Let's consider some example of evolution:

It's a simple example of evolution with only adding required fields, but it helps to understand the common principle. Providing more fields from a sender than a receiver expects is always acceptable because a receiver can skip these fields during deserialization. If a receiver is updated first, it will expect more fields, and deserialization will fail.

This principle can be propagated to multiple updates as well:

At step 2, the message becomes more specific, so senders are updated first. At step 4, the message becomes less specific, so receivers are updated first.

## Examples

 Action Current version Next version Update ordering Adding a new required field #[message] struct Sample { a: u32, }  #[message] struct Sample { a: u32, b: u32, }  Sender, Receiver Adding a new optional field #[message] struct Sample { a: u32, }  #[message] struct Sample { a: u32, #[serde(default)] b: u32, c: Option, }  Any Renaming a field #[message] struct Sample { a: u32, }  #[message] struct Sample { #[serde(alias = "a")] b: u32, }  Receiver, Sender Removing a required field #[message] struct Sample { a: u32, b: u32, }  #[message] struct Sample { a: u32, }  Receiver, Sender Promotion of numbers1 #[message] struct Sample { a: u32, }  #[message] struct Sample { a: u64, // Any numeric // a: i16 }  Any (Un)wrapping into a newtype #[message] struct Sample { a: u32, }  #[message] struct Sample { a: U32, } #[message(part)] struct U32(u32);  Any Adding a new variant #[message] enum Sample { A(u32), }  #[message] enum Sample { A(u32), B(u32), }  Receiver, Sender Renaming a variant #[message] enum Sample { A(u32), }  #[message] enum Sample { #[serde(alias = "A")] B(u32), }  Receiver, Sender Removing a variant #[message] enum Sample { A(u32), B(u32), }  #[message] enum Sample { A(u32), }  Sender, Receiver Replacing a field with an union (a untagged enumeration) #[message] struct Sample { a: u32, }  #[message] struct Sample { a: NumOrStr, } #[message(part)] #[serde(untagged)] enum NumOrStr { Num(u32), Str(String), }  Any Removing a variant #[message] enum Sample { A(u32), B(u32), }  #[message] enum Sample { A(u32), }  Sender, Receiver Enumerations with serde(other) #[message] enum Sample { A, B, }  #[message] enum Sample { A, #[serde(other)] O, }  Receiver, Sender
1

Promotion to/from i128 and u128 doesn't work due to msgpack's implementation details.

# Project Structure

Although the project structure can be arbitrary, it looks reasonable to provide some advices here:

• Use a separate crate for every actor group.
• It speeds up a build time.
• It helps to isolate code.
• Use separate crates for protocols, where messages are defined. Internal messages (timer ticks, inner group messages etc) are defined inside a group's crate.
• Actor crates shouldn't depend on each other. Instead, they should depend on protocol crates.
• It helps to achieve loosely coupling.
• Prefer integration tests (tests/) over unit tests (mod tests). Unit tests are still fine for internal structures.
• Integration tests use no implementation details and relies only on protocols.
• Store libraries (additional code potentially used by multiple actors) separately from actors' code. It's ok to depends on protocols in that code, but don't depend on actors. libs/ in the example below.
• Store different parts of a group in dedicated files, it siplifies navigation:
• exec() and message handling in actor.rs.
• An inner-group router in router.rs.
• A group's config in config.rs.
• Store topology definitions separately from actors' code. Services can share actors. Also, it's a good place for additional files like systemd units and so on. services/ in the example below.

For instance:

actors/
<actor_name_1>/
src/
mod.rs
actor.rs
config.rs
router.rs
tests/
helpers.rs
test_<name_1>.rs
test_<name_2>.rs
Cargo.toml
<actor_name_2>/
protocol/
src/
lib.rs
Cargo.toml
libs/
<lib_name_1>/
<lib_name_2>/
services/
<service_name_1>/
<service_name_2>/
Cargo.toml
Cargo.lock
rustfmt.toml


# Pattern: ID Generation

It should be noted that ID generation approach which was chosen in your app affects your app's architecture and overall performance. Here we discuss how to design good ID and show a couple of great ID designs.

## Choosing the domain for your IDs

Despite of wide spread of 64-bit computer architecture several popular technologies still don't support 64-bit unsigned integer numbers completely. For example several popular versions of Java have only i64 type so you can't have positive integer numbers bigger than $$2^{63} - 1$$ without excess overhead. Therefore a lot of Java-powered apps (e.g. Graylog) have limitations on IDs' domain.

At some external apps you can gain a couple of problems because of IDs less than 1, e.g. value 0 may be explicitly forbidden. Avoiding 0 value in IDs simplifies some popular memory optimizations and allows you to use zero values instead of Nullable columns which can improve performance in some databases (e.g. ClickHouse).

This leads us to the following integer numbers' domain for IDs:

MIN = 1
MAX = 2 ^ 63 - 1 =
= 9_223_372_036_854_775_807 =
= 0x7fff_ffff_ffff_ffff ≈
≈ 9.2e18


Probably today you don't need to be compatible with such domain-limiting systems but in most cases such limitations are very easy to withstand and there's no reason for your project not to be ready for integration with such obsolete systems in the future. Such domain allows you to store $$2^{63} - 1$$ entities which is bigger than $$9.2 \dot{} 10^{18}$$ and is enough to uniquely identify entities in almost every possible practical task. You shouldn't forget though that a good ID generation scheme is just a trade-off between a dense domain usage and robust sharded unique IDs generation algorithm.

That's why we recommend using 63-bit positive integers as IDs.

### Systems with ECMAScript

In theory JSON allows you to use numbers of any length, but in a bunch of scenarios it will be great to process or display your data using ECMAScript (JavaScript, widely spread in web browsers) and it has only f64 primitive type for numbers so you can't have positive integer numbers bigger than $$2^{53} - 1$$ without excess overhead. This leads us to the following integer numbers' domain for IDs in systems with ECMAScript:

MIN = 1
MAX = 2 ^ 53 - 1 =
= 9_007_199_254_740_991 =
= 0x1f_ffff_ffff_ffff ≈
≈ 9.0e12


From the other hand you can add an extra serialization step storing your IDs in JSON as strings which will be a decent workaround for this problem.

## Properties of your ID generation algorithm

We call generated IDs sequence monotonic if every subsequent ID is bigger than a previous one as a number.

### Why monotonic IDs are so great

The app should gain parameters for ID generation algorithm after every restart taking into account previously issued IDs. Monotonic IDs generation allows you to take into account only the most recently generated IDs or don't bother about existing entries at all.

Most of the modern storages allow you to take a notable performance advance if your data was put down in an ordered or partially-ordered form. The storage lays down the data according to sorting key which may differ from the primary key (ID). To perform time series queries on partially-ordered data very quickly one can use the cheapest form of data skipping indices — the minmax index.

Partially-monotonic data can be compressed using specialized codecs very effectively, thanks to storing deltas instead of absolute values. Effective data compression is able to lower storage consumption and increase DB throughput significantly.

This means that you should have a good reason to use non-monotonic IDs.

### Level of monotonicity

Depending on the length of monotonic segments in the generated IDs sequence we can divide ID generation algorithms to:

• Fully monotonic — generated ID sequence is monotonic during the whole lifetime of the system (see also IDs produced by DB).
• Periodically monotonic — ID generation algorithm reaches the maximal value for ID several times during the lifetime of the system and starts counting from the beginning of the domain. This means that the storage would contain a couple of monotonic segments e.g. with data for several months long each (see also TraceId).
• With sharded monotonicity — the app generates IDs monotonically for several segments frequently switching between these segments. E.g. every actor generates only monotonic IDs but because of concurrent nature of actors' work the storage should handle a lot of intersecting consecutive IDs inserts (see also DecimalId).

### Monotonicity source

There're several means to gain monotonic IDs. Your choice of them would affect application restart strategy:

### The reasons for non-monotonic IDs

If ID generation algorithm shouldn't look predictive because of information security concerns it's a good reason to use multiplicative hashing or random permutations to generate your IDs.

It's usually a great idea to apply randomness tests to your IDs in such scenarios.

Please note that really secure IDs would require security algorithms (or at least some versions of UUID). Algorithms mentioned above are for IDs that should just look random.

### Common or separate domains for several entities' IDs

Should we use common ID generator for several entities?

In such scenario IDs for various entities will never intersect so you have no chance to successfully query entity A by ID of entity B by mistake and you essentially have “fail fast” approach on entities' querying.

Unfortunately common ID generator for several entities will exhaust ID domain more quickly.

## Great IDs case study

### IDs produced by DB

Probably you did already use AUTO_INCREMENT SQL DDL instruction for primary keys. The general idea with it is to never generate IDs by ourselves and instead rely on the counter inside the database as a single source of truth. This means that you don't know the IDs of entities you'd like to insert before you actually insert them so you should essentially support separate schema just for entities' “construction stubs” and your app's latency is bound by DB latency by design. If you need a cycle reference between entities you'd like to insert you're most likely in trouble. If you need database replication you'll be in trouble too.

Alternative — is to use atomic compare-and-swap integrated into several DB engines, for example Cassandra's lightweight transactions. IDs produced by DB are great for storing entities appearing with relatively low frequency (probably less than 10000 per second): like users or accounts.

Such approach gives us fully monotonic IDs and uses the whole 63-bit space to store ID's positive value right away.

### DecimalId

Probably your application already has some kind of launch log storing information about release ticket in the issue tracking system, launch time, user initiated the launch, etc. If you have such entity in your system DecimalId will probably serve to you really good. This ID has sharded monotonicity. The primary idea behind DecimalId is to increase ID's legibility by storing its parts in a separate decimal places.

DecimalId essentially is:

$\operatorname{decimal\_id} = \operatorname{counter} \dot{} 10^{10} + \operatorname{generator\_id} \dot{} 10^{5} + \operatorname{launch\_id}$

This formula corresponds to the following decimal places distribution table:

DigitsDescriptionRangeSource
9counter1..=922_337_202Not synchronized runtime parameter
5generator_id0..=99_999Synchronized runtime parameter
5launch_id1..=99_999, 0 for testsExternally specified and consistent across the whole system during a single launch

The code implementing DecimalId generation can't use bit shift hack as we did for TraceId, but IDs generated using such schema are much more legible. You're able to read their decimal representation like this: cc*ggggglllll (counter, generator ID, launch ID — respectively). For instance:

               ID 14150009200065
ccccggggglllll
▲    ▲    ▲
counter (1415) ─┘    │    │
generator_id (92) ──────┘    │
launch_id (65) ───────────┘


Maximal possible ID value is 922_337_203___68_547___75_807 however counter is limited by 922_337_202 (please note the decrement). Such trick prevents limiting generator_id (next ID component) by 68_547 (now it's limited by 99_999). Note that $$\operatorname{counter}_{min} = 1$$ to keep the invariant: $$\operatorname{id} \geqslant 1$$ because every other conmonent of DecimalId could be zero.

Every time system produces more than $$\operatorname{counter}_{max}$$ elements it requests persistent synchronized counter global for the whole system for the next generator_id. This increment happens at the every start of the app and only once for every $$9.2 \dot{} 10^{8}$$ records so contention on it is negligible.

At every launch of your app launch_id should be taken from the persistent launch log of the system. To make deployment more robust we recommend generating launch_id outside of the app in the deployment configuration.

launch_id = 0 should never appear in production records and may only be used for testing.