Sources

In the context of the elfo actor system, sources serve as conduits for integrating various streams of incoming messages, such as timers, signals, futures, and streams. They allow for a seamless amalgamation of these additional streams with the messages arriving in the mailbox. Consequently, features like tracing, telemetry, and dumps are uniformly available for both regular and source-generated messages.

You can instantiate sources using dedicated constructors that correspond to different types. It's important to note that initially, these sources are inactive; they only start generating messages once they are linked to the context through the method ctx.attach(_). This method also returns a handler, facilitating the management of the source. Here is how you can utilize this method:

let unattached_source = SomeSource::new();
let source_handle = ctx.attach(unattached_source);

If necessary, you can detach sources at any time by invoking the handle.terminate() method, as shown below:

source_handle.terminate();

Under the hood, the storage and utilization of sources are optimized significantly to support multiple sources at the same time, thanks to the unicycle crate. While the system supports an unlimited number of sources without offering backpressure, it's essential to moderate the usage to prevent potential out-of-memory (OOM) errors.

Intervals

The Interval source is designed to generate messages at a defined time period.

To activate the source, employ either the start(period) or start_after(delay, period) methods, as shown below:

use elfo::time::Interval;

#[message]
struct MyTick; // adhering to best practice by using the 'Tick' suffix

ctx.attach(Interval::new(MyTick))
    .start(Duration::from_secs(42));

while let Some(envelope) = ctx.recv().await {
    msg!(match envelope {
        MyTick => { /* handling code here */ },
    });
}

Adjusting the period

In instances where you need to adjust the timer's interval, possibly as a result of configuration changes, the interval.set_period() method comes in handy:

use elfo::{time::Interval, messages::ConfigUpdated};

#[message]
struct MyTick; // adhering to best practice by using the 'Tick' suffix

let interval = ctx.attach(Interval::new(MyTick));
interval.start(ctx.config().period);

while let Some(envelope) = ctx.recv().await {
    msg!(match envelope {
        ConfigUpdated => {
            interval.set_period(ctx.config().period);
        },
        MyTick => { /* handling code here */ },
    });
}

To halt the timer without detaching the interval, use interval.stop(). This method differs from interval.terminate() as it allows for the possibility to restart the timer later using interval.start(period) or start_after(delay, period) methods.

It's essential to note that calling interval.start() at different points can yield varied behavior compared to invoking interval.set_period() on an already active interval. The interval.set_period() method solely modifies the existing interval without resetting the time origin, contrasting with the rescheduling functions (start_* methods). Here's a visual representation to illustrate the differences between these two approaches:

set_period(10s): | 5s | 5s | 5s |  # 10s  |   10s   |
start(10s):      | 5s | 5s | 5s |  #   10s   |   10s   |
                                   #
                              called here

Tracing

Every message starts a new trace, thus a new TraceId is generated and assigned to the current scope.

Delays

The Delay source is designed to generate one message after a specified time:

use elfo::time::Delay;

#[message]
struct MyTick; // adhering to best practice by using the 'Tick' suffix

while let Some(envelope) = ctx.recv().await {
    msg!(match envelope {
        SomeEvent => {
            ctx.attach(Delay::new(ctx.config().delay, MyTick));
        },
        MyTick => { /* handling code here */ },
    });
}

This source is detached automatically after emitting a message, there is no way to reschedule it. To stop delay before emitting, use the delay.terminate() method.

Tracing

The emitted message continues the current trace. The reason for it is that this source is usually used for delaying specific action, so logically it's continues the current trace.

Signals

The Signal source is designed to generate a message once a signal is received:

use elfo::signal::{Signal, SignalKind};

#[message]
struct ReloadFile;

ctx.attach(Signal::new(SignalKind::UnixHangup, ReloadFile));

while let Some(envelope) = ctx.recv().await {
    msg!(match envelope {
        ReloadFile => { /* handling code here */ },
    });
}

It's based on the tokio implementation, so it should be useful to read about caveats.

Tracing

Every message starts a new trace, thus a new trace id is generated and assigned to the current scope.

Streams

The Stream source is designed to wrap existing futures/streams of messages. Items can be either any instance of Message or Result<impl Message, impl Message>.

Once stream is exhausted, it's detached automatically.

Futures

Utilize Stream::once() when implementing subtasks such as initiating a background request:

use elfo::stream::Stream;

#[message]
struct DataFetched(u32);

#[message]
struct FetchDataFailed(String);

async fn fetch_data() -> Result<DataFetched, FetchDataFailed> {
    // ... implementation details ...
}

while let Some(envelope) = ctx.recv().await {
    msg!(match envelope {
        SomeEvent => {
            ctx.attach(Stream::once(fetch_data()));
        },
        DataFetched => { /* handling code here */ },
        FetchDataFailed => { /* error handling code here */ },
    });
}

futures::Stream

Stream::from_futures03 is used to wrap existing futures::Stream:

use elfo::stream::Stream;

#[message]
struct MyItem(u32);

let stream = futures::stream::iter(vec![MyItem(0), MyItem(1)]);
ctx.attach(Stream::from_futures03(stream));

while let Some(envelope) = ctx.recv().await {
    msg!(match envelope {
        MyItem => { /* handling code here */ },
    });
}

To produce messages of different types from the stream, it's possible to cast specific messages into AnyMessage (undocumented for now):

futures::stream::iter(vec![MyItem(0).upcast(), AnotherItem.upcast()])

Generators

Stream::generate is an alternative to the async-stream crate, offering the same functionality without the need for macros, thereby being formatted by rustfmt:

use elfo::stream::Stream;

#[message]
struct SomeMessage(u32);

#[message]
struct AnotherMessage;

ctx.attach(Stream::generate(|mut e| async move {
    e.emit(SomeMessage(42)).await;
    e.emit(AnotherMessage).await;
}));

while let Some(envelope) = ctx.recv().await {
    msg!(match envelope {
        SomeMessage(no) | AnotherMessage => { /* handling code here */ },
    });
}

Tracing

The trace handling varies depending upon the method used to create the stream:

  • For Stream::from_futures03(): each message initiates a new trace.
  • For Stream::once() and Stream::generate(): the existing trace is continued.

To override the current trace, leverage scope::set_trace_id() at any time.