Using Axum + Tonic for Duplex connection for REST and GRPC
- postgresql
- rust
- cargo
- sqlx-cli
- cargo-runner
Set up Database
- add to your ENV to your .bashrc or .zshrc
export DATABASE_URL="postgres://username@localhost:5432/ddd"
we can use cargo-runner to set up ENV, press CMD+SHIFT+R
then pick env then add DATABASE_URL="postgres://username@localhost:5432/ddd"
the next the we run our app with CMD + R it would have that ENV passed on the command
- Create Database
sqlx database create
- Create migration
sqlx migrate add -r <name>
Note: if sqlx didnt produce 2 files with *.up.sql and *.down.sql , you need to manually rename and add the down migration
- Migration
slqx migrate run
- Prepare Sqlx Compile Time Check
cargo sqlx prepare
NOTE: you need to add on your vscode settings.json
{
"rust-analyzer.cargo.extraEnv": {
"SQLX_OFFLINE": "1",
},
}
everytime you run this there will be new files added in .sqlx
folder
- Reset Migration
sqlx database reset
- Build our proto file
create a new proto file in ./proto
folder
go to build.rs then press CMD + R (with cargo runner)
1. Command
use serde::{de::DeserializeOwned};
#[allow(dead_code)]
pub trait Command: DeserializeOwned {}
2. Events
use serde::{de::DeserializeOwned, ser::Serialize};
#[allow(dead_code)]
pub trait Event: DeserializeOwned + Serialize + Unpin + Send + Sync + 'static {}
3. Model/Entity
use serde::{de::DeserializeOwned, ser::Serialize};
#[allow(dead_code)]
pub trait Model: Serialize + DeserializeOwned + Unpin + Send + Sync + 'static {}
1. Create Routes Api
pub enum Api {
CreateUser,
GetUser,
}
impl From<Api> for &'static str {
fn from(value: Api) -> Self {
match value {
Api::CreateUser => "/users",
Api::GetUser => "/users/:id",
}
}
}
2. Impl Command
use serde::Deserialize;
use crate::domain::Command;
#[derive(Deserialize, Debug)]
pub struct CreateUser {
pub username: String,
pub email: String,
}
impl Command for CreateUser {}
3. Impl Event
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::domain::Event;
#[derive(Serialize, Deserialize, Debug)]
pub struct UserCreated {
pub id: Uuid,
pub username: String,
pub email: String,
}
impl Event for UserCreated {}
4. impl Model
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::domain::Model;
#[derive(Serialize, Deserialize, Debug)]
pub struct User {
pub id: Uuid,
pub username: String,
pub email: String,
}
impl Model for User {}
5. Create Repository
use axum::async_trait;
use uuid::Uuid;
use crate::{
events::UserCreated,
models::{self},
};
#[async_trait]
pub trait UserRepository {
async fn save_user(&self, user: models::User) -> Result<(), sqlx::Error>;
async fn save_event(&self, event: UserCreated) -> Result<(), sqlx::Error>;
async fn find_user_by_id(&self, id: Uuid) -> Result<Option<models::User>, sqlx::Error>;
}
- Note: you might need to use
axum::async_trait
for async fn
6. Impl Repository
use axum::async_trait;
use sqlx::{Pool, Postgres};
use uuid::Uuid;
use crate::{events::UserCreated, models, repositories::UserRepository};
#[derive(Clone)]
pub struct PgPool {
db: Pool<Postgres>,
}
impl PgPool {
pub fn new(db: Pool<Postgres>) -> Self {
Self { db }
}
}
#[async_trait]
impl UserRepository for PgPool {
async fn save_user(&self, user: models::User) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO users (id,username,email) VALUES ($1,$2,$3)",
user.id,
&user.username,
&user.email,
)
.execute(&self.db)
.await?;
Ok(())
}
async fn save_event(&self, event: UserCreated) -> Result<(), sqlx::Error> {
let payload = serde_json::to_value(&event).unwrap();
sqlx::query!(
"INSERT INTO events (id,payload) VALUES ($1,$2)",
Uuid::now_v7(),
payload
)
.execute(&self.db)
.await?;
Ok(())
}
async fn find_user_by_id(&self, id: Uuid) -> Result<Option<models::User>, sqlx::Error> {
sqlx::query_as!(models::User, "SELECT * from users WHERE id = $1", id)
.fetch_optional(&self.db)
.await
}
}
7. SQLX Compile Time Check
Note: This is only required if we use query!
and query_as!
macro.
cargo sqlx prepare
7. Create Service Provider
use uuid::Uuid;
use crate::{
commands::CreateUser,
db,
events::UserCreated,
models::{self},
repositories::UserRepository,
};
#[derive(Clone)]
pub struct UserService {
pub repo: db::PgPool,
}
impl UserService {
pub fn new(repo: db::PgPool) -> Self {
Self { repo }
}
pub async fn handle_create_user(&self, cmd: CreateUser) -> Result<(), sqlx::Error> {
let user = models::User {
id: Uuid::now_v7(),
username: cmd.username,
email: cmd.email,
};
let event = UserCreated {
id: user.id,
username: user.username.clone(),
email: user.email.clone(),
};
self.repo.save_user(user).await?;
self.repo.save_event(event).await?;
Ok(())
}
pub async fn handle_get_user_by_id(
&self,
id: Uuid,
) -> Result<Option<models::User>, sqlx::Error> {
self.repo.find_user_by_id(id).await
}
}
- Note: You are not limited to one repo to inject here
8. Create Route Handler Function (Controller)
use axum::{
extract::{Path, State},
response::IntoResponse,
Json,
};
use tracing::{error, info};
use uuid::Uuid;
use crate::{
commands,
services::{self, UserService},
};
pub async fn create_user(
State(handler): State<UserService>,
Json(payload): Json<commands::CreateUser>,
) -> impl IntoResponse {
match handler.handle_create_user(payload).await {
Ok(_) => {
info!("User Created");
"User created".into_response()
}
Err(_) => {
error!("Failed to Create User");
"Failed to create user".into_response()
}
}
}
pub async fn get_user_by_id(
State(state): State<services::UserService>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.handle_get_user_by_id(id).await {
Ok(Some(user)) => {
info!("User Found:\n {:#?}", user);
Json(user).into_response()
}
Ok(None) => {
info!("User Not Found");
"User not found".into_response()
}
Err(_) => {
error!("Failed to Fetch User");
"Failed to get user".into_response()
}
}
}
- Note: You have access to State as first Parameter
1. Create Protobuf
Note: It is important to note that any Command / Querries are equivalent to RPC
While both Request and Response can be used by Either Rest or Grpc Service
Through the build.rs we made all Request and Response Be De/Serializable
syntax = "proto3";
package users;
// we can define here all our commands and querries
// as rpc
// while request and response for the messages
service UserService {
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc GetUser(GetUserRequest) returns (GetUserResponse);
}
message CreateUserRequest {
string username =1;
string email = 2;
}
message CreateUserResponse {}
message GetUserRequest {
string id = 1;
}
message GetUserResponse {
string id = 1;
string username = 2;
string email = 3;
}
2. Impl From Trait on a Command
Note: Here we made use of the generated Message
on proto like CreateUserRequest
to a command CreateUser
impl From<CreateUserRequest> for CreateUser {
fn from(value: CreateUserRequest) -> Self {
CreateUser {
email: value.email,
username: value.username,
}
}
}
Note: This will help us on Grpc Impl to just use CreateUser::from(request.into_inner())
And converting any request to command that we can use our our service provider
3. Create/Add Grpc Service
Note: It is important that you passed a state of db pool connection to a service_provider
use sqlx::{Pool, Postgres};
use tonic_reflection::pb::v1alpha::FILE_DESCRIPTOR_SET;
use super::users::GrpcUserServiceImpl;
pub fn services(pool: Pool<Postgres>) -> axum::routing::Router {
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build()
.unwrap();
tonic::transport::Server::builder()
.accept_http1(true)
.add_service(reflection_service)
.add_service(GrpcUserServiceImpl::new(pool.clone()))
/// Add your new Service here
.into_router()
}
Note: you can enable tonic_web
on specific service by doing this
.add_service(tonic_web::enable(GrpcUserServiceImpl::new(pool.clone())))
4. Implement Rpc
use sqlx::{Pool, Postgres};
use tonic::{Request, Response, Status};
use tracing::{error, info};
use uuid::Uuid;
use crate::{
commands::CreateUser,
proto::{
user_service_server::{UserService as GrpcUserService, UserServiceServer},
CreateUserRequest, CreateUserResponse, GetUserRequest, GetUserResponse,
},
services::UserService,
PostgreSQL,
};
#[derive(Debug)]
pub struct GrpcUserServiceImpl {
repo: UserService,
}
impl GrpcUserServiceImpl {
pub fn new(pool: Pool<Postgres>) -> UserServiceServer<GrpcUserServiceImpl> {
let user_service = UserService::new(PostgreSQL::new(pool.clone()));
UserServiceServer::new(GrpcUserServiceImpl { repo: user_service })
}
}
#[tonic::async_trait]
impl GrpcUserService for GrpcUserServiceImpl {
async fn create_user(
&self,
request: Request<CreateUserRequest>,
) -> Result<Response<CreateUserResponse>, Status> {
let command = CreateUser::from(request.into_inner());
match self.repo.handle_create_user(command).await {
Ok(_) => {
info!("User Created");
Ok(Response::new(CreateUserResponse {}))
}
Err(e) => {
error!("{}", e);
Err(Status::already_exists("User already Exists"))
}
}
}
async fn get_user(
&self,
request: Request<GetUserRequest>,
) -> Result<Response<GetUserResponse>, Status> {
let id = Uuid::parse_str(&request.into_inner().id).unwrap();
match self.repo.handle_get_user_by_id(id).await {
Ok(Some(user)) => {
info!("User Found:\n{:#?}", user);
let response = Response::new(GetUserResponse {
id: user.id.to_string(),
email: user.email,
username: user.username,
});
Ok(response)
}
Ok(None) => Err(Status::not_found("User Not Found")),
Err(e) => {
error!("{}", e);
Err(Status::not_found("User Not Found"))
}
}
}
}
- Using Curl
curl -X POST \
-H "Content-Type: application/json" \
-d '{"username": "testuser", "email": "testuser@example.com"}' \
http://127.0.0.1:80/users
- Using Postman
- Create new GRPC
- Enter Url:
grpc://localhost:80
- Import
users.proto
- Add the Payload on
Message
{
"username": "uriah",
"email": "ceo@goldcoders.dev"
}
- Using Curl
curl localhost:80/users/01911459-8cfa-7e91-9f2a-4d3da4faa526
- Using Postman
- Create new GRPC
- Enter Url:
grpc://localhost:80
- Import
users.proto
- Add the Payload on
Message
{
"id": "01911459-8cfa-7e91-9f2a-4d3da4faa526",
}