Join Sets

Join sets are a core mechanism for handling structured concurrency and asynchronous operations in Obelisk workflows. A join set represents a collection of pending operations (like activity calls or spawned child workflows). A workflow can interact with a join set in the following ways:

Defining Features

Join Set Kinds

There are three kinds of join sets:

Allowed Named Join Set Charset

The name of a join set is limited to the following charset: Alphanumeric characters (both upper and lower case), and these special characters: _-/

See Join Set ID for details.

API

The API for join sets is as defined in workflow-support.wit .

To create a join set in Rust, use

let named = new_join_set_named("some-name", ClosingStrategy::Complete).unwrap(); // See allowed join set characters
let generated = new_join_set_generated(ClosingStrategy::Complete)

One-off (implicit) Join Set

The following example uses join sets implicitly by calling the child executions directly. Each call marked as 1-5 will create a one-off join set and await the result.

fn star_added(login: String, repo: String) -> Result<(), String> {
    // 1. Persist the user giving a star to the project.
    let description = db::user::add_star_get_description(&login, &repo)?; // Create a one-off join set implicitly.
    if description.is_none() {
        // 2. Fetch the account info from GitHub.
        let info = github::account::account_info(&login)?; // Create a one-off join set implicitly.
        // 3. Fetch the prompt from the database.
        let settings_json = db::llm::get_settings_json()?; // Create a one-off join set implicitly.
        // 4. Generate the user's description.
        let description = llm::respond(&info, &settings_json)?; // Create a one-off join set implicitly.
        // 5. Persist the generated description.
        db::user::update_user_description(&login, &description)?; // Create a one-off join set implicitly.
    }
    Ok(())
}

Named and Generated Join Set

The implicit form has one problem: the workflow will not be able to handle the child execution failure due to timeouts or errors. This would lead to an unhandled child execution error that will mark the whole execution as failed. This is the reason why -await-next returns a result with the following error:

/// Error that is thrown by `-await-next` extension functions.
variant await-next-extension-error {
    /// Execution response was awaited and marked as processed, but it finished with an error.
    execution-failed(execution-failed),
    /// All submitted requests and their responses of specified function and join set were already processed.
    all-processed,
    /// Execution response was awaited and marked as processed, but it belongs to a different function.
    /// This can happen when join set contains responses of multiple functions or delay requests.
    function-mismatch(function-mismatch),
}

The following example creates two named join sets and uses them to parallelize account-info and get-settings-json activities.

fn star_added_parallel(login: String, repo: String) -> Result<(), String> {
    // Persist the user giving a star to the project - a direct call that creates and awaits a one-off join set.
    let description = db::user::add_star_get_description(&login, &repo)?; // Create a one-off join set implicitly.
    if description.is_none() {
        // 1. Create two join sets for the two child workflows.
        let join_set_info =
            new_join_set_named(&format!("{login}-info"), ClosingStrategy::Complete).unwrap();
        let join_set_settings =
            new_join_set_named("settings", ClosingStrategy::Complete).unwrap();
        // 2. Submit the two child workflows asynchronously.
        account_info_submit(&join_set_info, &login);
        get_settings_json_submit(&join_set_settings);
        // 3. Await the results.
        let info = account_info_await_next(&join_set_info)
            .map_err(err_to_string)?
            .1?;
        let settings_json = get_settings_json_await_next(&join_set_settings)
            .map_err(err_to_string)?
            .1?;
        // Generate the user's description - a blocking call.
        let description = llm::respond(&info, &settings_json)?;
        // Persist the generated description - a blocking call.
        db::user::update_user_description(&login, &description)?;
    }
    Ok(())
}

The only difference between a named and generated join set is the way how it is created. The example above could be slightly simplified by switching to the join sets with generated names:

let join_set_info = new_join_set_generated(ClosingStrategy::Complete);

One benefit of naming join sets is that they can be used to group child executions that are logically related, which can be useful for debugging and monitoring.

Homogenous Join Sets

When a join set contains only child executions of the same function, it is called homogenous. It is safe to call -await-next until all submitted executions are awaited. Calling -await-next more times will result in await-next-extension-error::all-processed error.

let join_set = new_join_set_generated(ClosingStrategy::Complete);
// Submit the same execution with different parameters.
for _ in 0..9 {
    let _execution_id = account_info_submit(&join_set, foo());
}
// Await the responses as they arrive.
for _ in 0..9 {
    let (_execution_id, res) = account_info_await_next(&join_set).unwrap();
}

Heterogenous Join Sets

When a join set contains child executions of multiple functions, or delay requests, it is called heterogenous. It is generally not safe to call -await-next extension as the resolved response might not belong to the awaited function. Therefore one must use join-next workflow support function together with -get extension function based on the returned response-id.

let join_set = new_join_set_generated(ClosingStrategy::Complete);
// Submit the two child executions and a delay request.
let execution_id_account = account_info_submit(&join_set, &login);
let execution_id_settings = get_settings_json_submit(&join_set);
let _delay_id = submit_delay(&join_set, ScheduleAt::In(Duration::Seconds(10)));
// Use `join-next` to get the first response.
match join_next(&join_set) {
    Ok(ResponseId::DelayId(_)) => println!("delay won"),
    Ok(ResponseId::ExecutionId(execution_id)) => {
        if execution_id.id == execution_id_account.id {
            // use `-get` to obtain the response
            match account_info_get(&execution_id) {
                Ok(resp) => println!("account-info response won: {resp:?}"),
                Err(GetExtensionError::ExecutionFailed(_)) => todo!("handle failure"),
                Err(GetExtensionError::FunctionMismatch(_)) => {
                    unreachable!("this must be account-info based on execution id")
                }
                Err(GetExtensionError::NotFoundInProcessedResponses) => {
                    unreachable!("execution-id must have been processed as it was returned by `join-next`")
                }
            }
        }
    }
    Err(JoinNextError::AllProcessed) => unreachable!("submitted 3 child requests"),
}

Join Set Close

Join sets are automatically closed when a they go out of scope or when workflow execution finishes. During this process, all child executions that the parent execution submitted but did not await for will be automatically processed. The parent workflow will be marked as finished only when all unattended child executions are finished.

Currently, only closing-strategy::complete closing strategy is supported. This means that all child executions that were submitted but not awaited will be awaited for during workflow teardown.

In the future, closing-strateg::cancel will attempt to cancel each unattended child execution.

Example - Join set without awaiting child execution results

The following example creates a named join set for each user fetched from the database and uses it to parallelize the execution of the star_added function.

fn backfill_parallel(repo: String) -> Result<(), String> {
    let page_size = 5;
    let mut cursor = None;
    while let Some(resp) = // fetch a batch of users using an one-off join set
        github::account::list_stargazers(&repo, page_size, cursor.as_deref())?
    {
        let mut join_set_batch = Vec::new();
        for login in &resp.logins {
            let join_set = new_join_set_named(login, ClosingStrategy::Complete)
                    .expect("github login does not contain illegal characters");
            // `-submit`-ting child executions without `-await`-ing results
            star_added_parallel_submit(
                &join_set,
                login,
                &repo,
            );
            join_set_batch.push(join_set); // closing join set here would kill parallelism
        }
        if resp.logins.len() < usize::from(page_size) {
            // last batch of join sets closed here.
            break;
        }
        cursor = Some(resp.cursor);
        // join set batch closed here.
    }
    Ok(())
}

Note that there is no corresponding await-next call in this example. The scoped Join Set Close adds back-pressure - the first batch of five child executions are submitted, then implicitly awaited before the while loop begins again:

Lines o1_1 and o2_1 are one-off join sets containing the list-stargazers activity.