User service with CQRS + ES example in Rust (Part 3)

CQRS with Hexagonal Architecture and DDD

In this part we will have a break of ES and focus on modeling the domain and building the hexagonal architecture.

If you want to do real DDD, there's no going around modeling the domain. It's a necessary task and one of my favorites. Remember there is no such thing as "no model". No model probably means a bad model. Modeling normally is defining which are the aggregate roots of the service, the value objects, and their relationships. A loose -non-strict- UML diagram could be the output, or just the code of the main classes and, of course, its tests.

Modeling seems very object-oriented where you define the aggregate roots - "objects"-, value objects and so on, we should still apply some functional programming thinking: for one, we should strive for a "pure domain", purely calculations, pure functions only. Executing the domain with the same input will always give back the same output. We should be able to cover the whole domain with simple unit tests with no mocks (use stubs for ease of use). You can make use of any tool you have as a developer. I bet you eventually will need to define a "struct" or some class though. Functional programming is another tool under your belt. Just strive for a pure functional domain - no actions/impure functions allowed- and make sure to show with a test how you expect the "clients" to use/ execute your domain.

Here we will apply tactical patterns of DDD, basically, aggregate roots and value objects to model our domain. We should ask many questions to the domain expert, and since this is a personal project, the one who answers all the questions is me -It doesn't help that I have a severe case of constant hesitation.

Modeling the Domain

Basically, we just have one aggregate 'root', and its value objects. This is the result:

#[derive(Serialize, Deserialize, Debug)]
pub struct User {
    id: UserId, // init default state has not even Id.
    email: UserEmail,
    password: UserPassword,
    is_registered: bool,
    recorded_events: Vec<UserDomainEvent>
}

The recorded events are not part of modeling the domain (and nor is the derive stuff, it's just helpful, and since I am new to Rust, I am not sure how "impure" I am making the domain).

We need somewhere in our domain to eventually publish all the domain events that happened during the execution of the domain. A sensible place to save those are in the same aggregate, it might help with the mental image of persisting those events in a message store, thinking that the aggregateId could be a "primary key" of the DB, so you can easily filter all the events that affected this entity.

But it's not the only solution, you can publish them from domain services or whatever. This solution kind of introduces implementation concerns in our domain, but it's still valid, ES is a way of implementing the domain, and it's good to see explicit details in the code.

Value Objects

The rest of the struct are value objects, except the is_registered, where it's just a boolean.

Value objects, are one of the most powerful and disregarded tactical patterns of DDD.

Value Objects can be hard to identify and many times they are wrongly modeled as entities. All of this deserves a full blog entry so let me oversimplify them by saying that you may consider a value object an entity attribute.

Let's focus on the password Value Object. First let's look at the tests, where I explain how I want this to be used. (even though I want this value object to be accessed only through the aggregate root).

#[cfg(test)]
mod tests {
    use super::*;
    use claim::{assert_err, assert_ok};
    #[test]
    fn validate_password() {
        let password = "secret_password";
        let user_password = UserPassword::new(password).unwrap();
        let password_attempt = "secret_password";

        assert_ok!(user_password.verify_password(password_attempt));
    }
}

The test above explains how are we expecting this value object to be used. It's still a unit test, so probably the access point here won't be directly, but through the aggregate root. Still, in a unit test, it's clear to see how we want to use it:

  1. given a secret

  2. when the UserPassword has been created with the same secret.

  3. Then verifying the password should be Ok()

See here github.com/eniltrexAdmin/crappy-user for the rest of the value objects and their tests.

Value Objects and Rust

In not strong-typed languages, value objects are of immense help. It's a safeguard to make sure that the constructing blocks of your domain are always valid and consistent. Now on strong-typed languages such as Rust, it might seem like extra overhead and oh if it didn't feel that way when I went for them. And that there are native Rust crates that out of the box can be used as perfectly valid Value Objects such as EmailAddress or passwords, adds to this feeling of doing extra unnecessary work. (Well, in this case, it was unnecessary work, when you see the code, see that the "UserEmail" is going too far away).

Why did I still go for it? Well, it still gives a lot of advantages. A typed value is normally not enough, there are business invariants that need to be satisfied and therefore Value Objects are still necessary. Having a designated Value Objects allows us to keep with the Single Responsibility Principle of the SOLID principles too. We would have very long tests just for the check of such business invariants in the main entity.

One downside is the number of files (well, also because I decided to have them separated into different files that do not have a bible written in a single file), and so all the includes. But in Rust probably the biggest downside is with the borrowing system. Using value objects to encapsulate adds a lot of extra code, and makes it harder to understand the whole borrowing, reference, and consuming thing.

TDD

If we move now to the aggregate root. You will also see how the object is expected to be used. There are two kinds of tests, the ones that show how the aggregate root should be used, namely the "test_register_user" or "test_authenticate" and all the tests about "apply_xxx_event". The latter tests are just internal, it should be a must to unit test those, but those functions aren't accessible from outside, as it's obvious because they don't have pub indicator.

If we focus on the authenticate, the test goes like this:

#[test]
    fn authenticate_user() {
        let id = Uuid::new_v4();
        let email = "francesc.travesa@mymail.com".to_string();
        let password_hash = "password_hash".to_string();
        let user = simulate_fetch_user(id, &email, &password_hash);

        let result = user.authenticate_user(&password_hash);
        let events = result.unwrap();
        assert_eq!(1, events.len());
        let event = events.get(0).unwrap();

        match event {
            UserDomainEvent::UserAuthenticated(user_authenticated_event) => {
                assert_eq!(user_authenticated_event.id, id);
                assert_eq!(
                    Utc::now().round_subsecs(3),
                    user_authenticated_event.occurred_at().clone().round_subsecs(3)
                );
            },
            wrong_domain_event=> {
                assert_eq!(
                    true,
                    false,
                    "event generated was not of type UserAuthenticated but of type {}",
                    wrong_domain_event.event_type()
                );
            }
        }
        // Now failed.
        let result = user.authenticate_user("wrong_password".to_string().as_ref());
        let events = result.unwrap();
        assert_eq!(1, events.len());
        let event = events.get(0).unwrap();
        match event {
            UserDomainEvent::UserAuthenticationFailed(user_authenticated_event) => {
                assert_eq!(user_authenticated_event.id, id);
            },
            wrong_domain_event=>{
                assert_eq!(
                    true,
                    false,
                    "event generated was not of type UserAuthenticated but of type {}",
                    wrong_domain_event.event_type()
                );
            }
        }
        [...]

and the actual code:

    pub fn authenticate_user(
        &self,
        password_attempt: &str,
    )-> Result<Vec<UserDomainEvent>, UserDomainError> {
        let result = self
            .password_as_ref()
            .verify_password(password_attempt);
        return match result {
            Ok(_) => {
                let event = UserSuccessfullyAuthenticated{
                    id: *self.id.value(),
                    occurred_at: Utc::now()
                };
               Ok(vec![UserDomainEvent::UserAuthenticated(event)])
            },
            Err(UserDomainError::IncorrectPassword) => {
                let event = UserAuthenticationFailed{
                    id: *self.id.value(),
                    occurred_at: Utc::now()
                };
                Ok(vec![UserDomainEvent::UserAuthenticationFailed(event)])
            },
            Err(error) => {
                return Err(error);
            }
        }
    }

There is a myriad of things to discuss about the domain, don't take my code as the ultimate truth, it's just a working example, but you might feel comfortable doing different things.

Event Sourcing Repositories

In the previous post I explained that the aggregate roots get build by applying all the events that happened in the past one by one, making sure the result aggregate root fulfills all the business invariants.

The one in charge of doing that is the repository (also a DDD pattern). A typical repository would look like this:

#[async_trait]
pub trait UserRepository {
     async fn findById(user_id: UserId) -> Result<User, UserDomainError>;
     async fn add() -> Result<(), UserDomainError>;
}

But with event sourcing, we are never saving the aggregate root, instead, we save the events, and we load the events of the aggregate root to have the final state:

#[async_trait]
pub trait UserRepository {
     async fn load(&self, user_id: UserId) -> Result<User, UserDomainError>;
     async fn save_events(
        &self,
        user_id: UserId,
        events: Vec<UserDomainEvent>,
    ) -> Result<(), UserDomainError>;
}

The Repository will make use of the event store, the actual thing that will connect to the DB. When using Hexagonal Architecture - and as Martin Fowler said - to leave the implementation decisions for the latest - we don't know yet which permanence system we're going to use, if a SQL DB, Redis or whatever.

We just know we have an event store that can load events and save them:

#[async_trait]
pub trait EventStoreInterface<A>: Send + Sync
where
    A: EventSourcedAggregate,
{
    async fn load_events(
        &self,
        aggregate_id: &Uuid,
    ) -> Result<Vec<EventEnvelope<A>>, EventStoreError>;
    async fn save_events(&self, events: Vec<EventEnvelope<A>>) -> Result<(), EventStoreError>;
    async fn load_all_events(
        &self,
        last_event_read: i64
    ) -> Result<Vec<EventEnvelope<A>>, EventStoreError>;
}

(Not sure if Rust has a naming convention for traits... since I come from PHP, those are interfaces...)

Now that we have defined the trait for the event store, we can code the repository:

#[async_trait]
impl<ES> UserRepository for UserEventStoreRepository<ES>
where
    ES: EventStoreInterface<User>,
{
    async fn load(&self, user_id: UserId) -> Result<User, UserDomainError> {
        let events_to_apply = self.store.load_events(user_id.value()).await.map_err(
            |event_store_error: EventStoreError| {
                UserDomainError::CouldNotLoadUserEvents(event_store_error.to_string())
            },
        )?;
        let mut user = User::default();
        for event in events_to_apply {
            user.apply(event.payload);
        }
        Ok(user)
    }

    async fn save_events(
        &self,
        user_id: UserId,
        events: Vec<UserDomainEvent>,
    ) -> Result<(), UserDomainError> {
        let mut wrapped_events: Vec<EventEnvelope<User>> = Vec::new();
        for payload in events {
            wrapped_events.push(EventEnvelope {
                aggregate_id: *user_id.value(),
                occurred_at: payload.occurred_at(),
                payload,
                metadata: Default::default(),
            })
        }
        self.store.save_events(wrapped_events).await.map_err(
            |event_store_error: EventStoreError| {
                UserDomainError::CouldNotSaveUserEvents(event_store_error.to_string())
            },
        )
    }
}

Special mention here about the loading of the user, which returns User::default();, we could have made to return a not found or something if there are no events about that aggregate for example. The way I am doing this here is considering that "default" is actually "before starting counting", like the 0. sounds philosophical, it's a good excuse for not having to deal with different kinds of errors, and for being lazy.

Hexagonal Architecture

We have the domain, and we have some actions on our user, shown in our tests, how do we expect the user aggregate to be used. How do we access the domain though? Now it's time to move outside the domain and start building the other layers.

Hexagonal Architecture has normally 3 layers, each one can see the one inside, but the ones inside cannot see outside. From the external world, we have the infrastructure layer, where the controllers and the access points are available for the clients, everything that is an "impure" function in functional programming should go in this layer. Accessing the Database, for example, can give a different result every time it's executed, that's an impure action, and it goes to the infrastructure.

Inside the infrastructure, protecting the domain, there's the layer that translates the infra and executes the domain as it's expected. It's based strongly on the inverse dependency injection pattern to be able to separate pure function from the impure on the infrastructure.

And inside it all, there's the domain, which we carefully modeled, following DDD practices. DDD works very well with Hexagonal Architecture.

CQRS and Hexagonal Architecture

CQRS can also be based on Hexagonal Architecture and some may say that it is built upon it.

CQRS stands for Command Query Responsibility Segregation and relies basically on a bunch of handlers, separated by command handlers -that deal with commands- query handlers -deal with... queries- and even event handlers and projection handlers.

All of those things say how the user wants to interact with our application, and it matches perfectly with the hexagonal architecture. If we see the layers of the hexagonal architecture, with the protected and pure domain, and the impure infra, the middle layer is the perfect place for all of those handlers that translate from the impure world what is wanted to be executed in our pure domain.

During this whole blog post, I have been focusing on the write model and executing the business, so we will focus now on the command side. Let's take a look at the function signature for the register user command handler:

pub async fn register_user_command_handler(
    user_event_store_repository: &impl UserRepository,
    command: RegisterUserCommand,
) -> Result<(), UserDomainError> {..}

it expects the command to handle, and something that implements the UserRepository. In more OO languages the repository would normally be a dependency on the constructor. Rust is not so OO.

The rest of the function is also quite straightforward:

    // either form is valid, construct value objects here or pass simple and construct inside user.
    let user_id = UserId::new(command.id);
    let user_email = UserEmail::new(command.email.as_str())?;
    let user_password = UserPassword::new(command.password.as_str())?;

    let mut user = user_event_store_repository.load(user_id).await?;
    user.register_user(user_id, user_email, user_password)?;
    user_event_store_repository
        .save_events(user_id, user.recorded_events())
        .await

We build the pieces of our pure domain until we can execute it: the email, and the password, finally we fetch the user (which will be the empty default user explained above) and we execute the domain: user.register_user(user_id, user_email, user_password)?;

if everything goes well, we save the resulting domain events from that action in our DB. That's all we need to save.

The command handler for authenticating a user is very similar and follows the same pattern.

pub async fn authenticate_user_command_handler(
    user_event_store_repository: &impl UserRepository,
    command: AuthenticateUserCommand,
) -> Result<UserDomainEvent, UserDomainError> {
    let user_id = UserId::new(command.id);
    let user = user_event_store_repository.load(user_id).await?;

    let events = user.authenticate_user(&command.password_attempt.expose_secret())?;
    // normally command handler do not return anything, I am breaking the rule here
    // because the nature of authenticating the user, that should return credentials.
    // its an action that really doesn't fit in cqrs. (or I'm too lazy to think on a better approach to this)
    let authentication_result = events.first().unwrap().clone();
    user_event_store_repository
        .save_events(user_id, events)
        .await?;
    return Ok(authentication_result);
}

The infrastructure

So we have the two inner layers, how do we access them? through the most external layer, the infrastructure layer. Here is where all the "messy" stuff happens, it's still a very important layer and the one that brings the project from pure theory (domain) to the real world. Without infrastructure, there's only air.

The infrastructure code has been heavily influenced by the book "from zero to production" (see credits). Just by following the book, I am using Actix as the web server/framework. I will explain in following blog posts this part in more detail, for now you can just check the code. (The code is again here )

I will focus only on the things that matter for this blog post, which are the http "controllers" and the repository implementations.

Controllers

Let's see what the controller for registering the user looks like:

pub async fn register_user(
    request: web::Json<RegisterUserCommand>,
    pool: web::Data<PgPool>,
) -> Result<HttpResponse, UserDomainError> {
    let postgre_event_store_repository = EventStorePostgres::new_event_store(&pool);
    let user_repository = UserEventStoreRepository {
        store: postgre_event_store_repository,
    };

    register_user_command_handler(&user_repository, request.into_inner()).await?;
    Ok(HttpResponse::Created().json(NoContentResponse {}))
}

Having a strong domain makes for a very easy controller and command handlers, it's just a way to mount and execute them.

We are making use of Actix ability to transform the request to the command by using this request: web::Json<RegisterUserCommand>, Since commands are DTOs and are with plain types, here's the command:

#[derive(Debug, Serialize, Deserialize)]
pub struct RegisterUserCommand {
    pub id: Uuid,
    pub email: String,
    pub password: String,
}

just by adding those #[derive] macros (don't remember which one, probably the "serialize"), Actix can convert the request to our command.

No Command Bus?

Normally we would make the command go through a command bus, in charge to find the corresponding handler. The advantage of using a bus are two:

  1. We can add some infrastructure to decouple the request from the actual work (some queues)

  2. We can add middleware and add some consistency in how all commands are treated.

For commands, I am not going to add infrastructure, I want the actions to happen synchronously. Therefore it doesn't make sense the effort to use a bus for this part. (I don't need middleware either or to ensure consistency, because I am a very good coder - sarcasm here-).

If we don't have a command bus, the way to reach the command handler is by basically executing it. As you see above in the code. That's the way for hexagonal architecture. Dead simple.

Don't worry we will see buses and their infrastructure when we go for the event handlers part (not yet done in the code)

Event Store implementation

Let's focus now on the implementation of the event store. The last interesting piece for this blog post. Here the credit is due mainly to the cqrs_es crate.

This is the struct declaration and the constructor

pub struct EventStorePostgres<'a, A>
where
    A: EventSourcedAggregate + Send + Sync,
{
    pub pool: &'a PgPool,
    _phantom: PhantomData<A>,
}
impl<'a, A> EventStorePostgres<'a, A>
where
    A: EventSourcedAggregate + Send + Sync,
{
    pub fn new_event_store(pool: &'a PgPool) -> Self {
        EventStorePostgres {
            pool,
            _phantom: PhantomData,
        }
    }
}

Phantom is to avoid some rust complaints that A doesn't exist or something similar.

Let's see how we implement the event store trait, loading events for example:

#[async_trait]
impl<A> EventStoreInterface<A> for EventStorePostgres<'_, A>
where
    A: EventSourcedAggregate + Debug,
{

loading events for example:

 #[tracing::instrument(name = "Loading events from the message store", skip(self))]
    async fn load_events(
        &self,
        aggregate_id: &Uuid,
    ) -> Result<Vec<EventEnvelope<A>>, EventStoreError> {
        let mut rows = sqlx::query("SELECT aggregate_type, aggregate_id, event_type, event_version, payload, metadata, timestamp
                  FROM events
                  WHERE aggregate_type = $1 AND aggregate_id = $2
                  ORDER BY sequence")
            .bind(A::aggregate_type())
            .bind(aggregate_id)
            .fetch(self.pool);
        let mut result: Vec<EventEnvelope<A>> = Default::default();
        while let Some(row) = rows.try_next().await? {
            result.push(SerializedEvent::from_row(&row)?.try_into()?);
        }

        Ok(result)
    }

It's the actual query to the database (using sqlx), and iterating on the results of the query, which returns not our domain events envelope but the sql database shape - called serialized events - and then tries to transform that to the actual domain event envelope that we understand in our domain.

The whole picture

This has been a long ride. Here we've focused on the command side of CQRS and how it all matches the hexagonal architecture and DDD. Now we are missing the other side, the query side, and how it all is tied together through domain events.

What we've seen is that:

At the end of the request, the result is just some domain events saved in the DB. That's all the command side needs to worry about for now.

Final words

We are missing now all the query sides of the CQRS. I will also explain better how to execute that in localhost and how to deploy it to production. Everything about the infrastructure that I have not explained here (less architecture/theoretical and more practical stuff) will be explained then.

Again, you can take a look at the whole code here: github.com/eniltrexAdmin/crappy-user, but beware! I always live it in a working state BUT there are concepts that I might change while I am writing this blog, so beware of the parts that are not yet covered in this blog series.

Credits

Let me cite again all the sources on that I heavily based this project upon. where all credit is due. They have been my introduction to either Rust or CQRS and ES patterns, and they are all very good. I strongly recommend reading the code/ the books. They explain everything in a better way than I do.

I am writing this mainly to show that "mortals" can still try to make/code/architect something that is more or less correct and of course valid. There are many points of view. it's OK to put your touch of flaws in the projects. That's what makes them worth it to work on.

CQRS_ES crate

crates.io/crates/cqrs-es

docs.rs/cqrs-es/latest/cqrs_es

The implementation of the event store is almost a copy/paste from this crate.

I didn't end up using the whole create itself since the whole point of this project was to experiment. Also, it felt that I was coupling a lot of my domain to this crate. That's OK, for value objects or other things that keep the domain just a calculation/pure, but this crate is an entire framework, with its way of working and usage, so it was hard to separate the concerns.

Still, if you want to go fast, I would recommend using it, it's a working solution out of the box!

Zero to Production In Rust

app.gumroad.com/d/40684410d99d242f75b4a96b2..

The infra code is heavily based on this book. After the main Rust books available on the web, this one is probably a must.

Many things you will see in the code related to the infrastructure, Actix, the configuration, the connection to the database and even the docker image (amazing job there!) come from this book.

CQRS by example

leanpub.com/cqrs-by-example

The best explanation of CQRS based on Hexagonal Architecture and DDD that I've read. Everything from those authors is just gold.

Practical microservices

amazon.com/gp/product/B0899K5R4

This a different point of view, but 90% consistent with the points of view above. It just doesn't use hexagonal architecture, or DDD concepts.