Coffeewards, es un sistema de puntos para fidelización de los clientes.
Por cada compra que realizan los clientes, suman puntos que luego pueden canjear por cafes gratuitos.
Nota: Los gráficos y diagramas aquí presentes son ilustrativos y apuntan a transmitir el concepto del sistema. Pueden presentarse abreviaciones o alteraciones tanto de las entidades como de las operaciones en pos de simplificar el entendimiento de las características esenciales.
El sistema se implementa como una arquitectura cliente-servidor, donde las cafeteras se conectan a su servidor local para sumar o usar puntos a medida que recibe pedidos.
Los servidores se comunican entre si para mantener consistente el estado de las cuentas de forma distribuida. Esto se implementa mediante transacciones en 2 fases.
Se distinguen 4 acciones principales:
- Reservar puntos
Lock
- Soltar puntos reservados
Free
- Consumir puntos reservados
Consume
- Añadir puntos
Add
Para reservar puntos se requiere que por lo menos la mitad de los servidores estén disponibles. En cambio, las otras transacciones (asumiendo que los puntos fueron previamente reservados si fuese necesario) no deberían fallar y pueden quedar pendientes hasta que sea posible resolverlas.
Al procesar una orden, primero se reservan los puntos necesarios y al finalizarla se añaden/liberan/consumen los puntos reservados. La respectiva cuenta solo se bloquea mientras se procesan estas transacciones y no mientras se prepara el café, lo cual permite que se puedan procesar pedidos de una misma cuenta de manera concurrente.
- Se asume que las cafeteras no pierden conexión con el servidor local.
- Se asume que los servidores pueden perder conexión con la red, pero siguen siendo parte de la misma durante toda la ejecución.
- Se asume que no habrá agentes externos al sistema que intenten afectarlo.
- El proceso del servidor no es interrumpido de manera inesperada.
El programa de la cafetera se encarga de recibir pedidos de los clientes desde un archivo csv
y de procesarlos.
Este se implementa utilizando un esquema de modelo de actores:
flowchart LR
ot(OrderTaker) --> sa{ }
sa --> oh1(OrderHandler)
sa --> oh2(OrderHandler)
sa --> oh3(OrderHandler)
oh1 --> ps(PointStorage)
oh2 --> ps(PointStorage)
oh3 --> ps(PointStorage)
OrderTaker
: Recibe los pedidos y los delega.OrderHandler
: Prepara los cafes. Hay uno por dispenser.PointStorage
: Se encarga de las operaciones de puntos, comunicándose con el servidor local.
sequenceDiagram
participant oh as OrderHandler
participant ps as PointStorage
oh ->> ps: reservar puntos
ps -->> oh: Err
note over oh,ps: Error
sequenceDiagram
participant oh as OrderHandler
participant ps as PointStorage
oh ->> ps: reservar puntos
ps -->> oh: Ok
note over oh: prepara cafe correctamente
oh ->> ps: consumir puntos
ps -->> oh: Ok
note over oh,ps: Éxito
sequenceDiagram
participant oh as OrderHandler
participant ps as PointStorage
oh ->> ps: reservar puntos
ps -->> oh: Ok
note over oh: falla en preparar cafe
oh ->> ps: liberar puntos
ps -->> oh: Ok
note over oh,ps: Error
El servidor local se encarga tanto de recibir y procesar los mensajes de los clientes como de comunicarse con el resto de los servidores para mantener consistente el estado de las cuentas.
Toda la comunicación se realiza mediante TCP.
Cuando un cliente abre una conexión, el servidor crea un hilo para manejarla.
En este recibe pedidos (order
) y los maneja secuencialmente hasta que el cliente se desconecta.
El servidor le responderá al cliente si el pedido fue exitoso o no.
Los servidores abren una conexión para cada comunicación con otro servidor. Una comunicación entrante se resuelve en un nuevo hilo y puede implicar el intercambio de varios mensajes.
Los tipos de comunicación son:
PING
- Se utiliza para verificar si el servidor objetivo tiene conexión.
- Secuencia:
PingRequest
,PingResponse
CONNECT
- Se utiliza para conectar un nuevo servidor a la red.
- Secuencia:
ConnectRequest(new_server)
,ConnectResponse(servers)
SYNC
- Se utiliza para sincronizar el estado de las cuentas.
- Secuencia:
SyncRequest
,SyncResponse(point_map)
TRANSACTION
- Se utiliza para realizar una transacción distribuida.
Cuando un servidor no recibe respuesta de ningún otro, tanto al realizar una transacción como al enviar pings, detecta que está desconectado.
Por otro lado, al recibir algún mensaje o respuesta detecta que esta conectado.
Cuando una transacción falla, pero podría ser resuelta (por ejemplo, una carga de puntos estando desconectado) esta se guarda en una lista de pendientes, que se intentan de procesar en un hilo dedicado.
Cuando el servidor se desconecta (pasa de estado conectado -> desconectado) detiene el procesamiento de pendientes.
Cuando el servidor se reconecta (pasa de estado desconectado -> conectado), primero se sincroniza con los demás servidores y luego reanuda el procesamiento de transacciones pendientes.
El servidor que recibe el pedido hace de coordinador de la transacción.
Las transacciones se ejecutan en 2 fases:
- Preparación [
PREPARE
]- El coordinador intenta tomar el recurso necesario.
- Verifica poder realizar la transacción.
- Comienza una comunicación de tipo
TRANSACTION
con los demás servidores.
- Finalización [
COMMIT
/ABORT
]- Al recibir el mensaje, los servidores locales:
- Intentan tomar el recurso necesario.
- Verifican poder realizar la transacción.
- Responden
Proceed
oAbort
según corresponda.
- Al recibir las respuestas
- Si más de la mitad respondieron
Proceed
, y ningunoAbort
:- El coordinador envía
Proceed
a los demás servidores. - Todos los servidores aplican la transacción.
- El coordinador envía
- Si faltan suficientes respuestas o alguna es
Abort
:- El coordinador envía
Abort
a los demás servidores. - Agrega la transacción a la lista de pendientes, si puede ser resuelta más adelante.
- El coordinador envía
- Si más de la mitad respondieron
- Al recibir el mensaje, los servidores locales:
Debido a su funcionamiento, bloqueando un solo recurso y resolviendo de manera consiguiente, no surgen deadlocks.
Aun asi se implementa un mecanismo similar a wait-die
para cancelar transacciones.
sequenceDiagram
participant co as Coordinator
participant s1 as Server
participant s2 as Server
co ->> s1: TRANSACTION
co ->> s2: TRANSACTION
s1 -->> co: Proceed
s2 -->> co: Proceed
co ->> s1: Proceed
co ->> s2: Proceed
note over co,s2: Transacción Exitosa
sequenceDiagram
participant co as Coordinator
participant s1 as Server
participant s2 as Server
co ->> s1: TRANSACTION
co ->> s2: TRANSACTION
s1 -->> co: Proceed
s2 -->> co: Abort
co ->> s1: Abort
co ->> s2: Abort
note over co,s2: Transacción Fallida
sequenceDiagram
participant co as Coordinator
participant s1 as Server
participant s2 as Server
participant s3 as Server
co ->> s1: TRANSACTION
co -x s2: TRANSACTION
co -x s3: TRANSACTION
s1 -->> co: Proceed
note over s2: Timeout
note over s3: Timeout
co ->> s1: Abort
co -->> s2: Abort
co -->> s3: Abort
note over co,s3: Transacción Fallida
classDiagram
direction LR
class Server {
listener: TcpListener
listen()
handle_stream(TcpStream)
}
class PointStorage {
servers : Addr[]
pending : Transaction[]
coordinate(Message)
handle(Message)
}
class PointRecord {
available : Int
locked : Int
coordinate(Transaction)
handle(Transaction)
apply(Transaction)
}
class Transaction {
client : Id
amount : Int
action : TxAction
timestamp : Timestamp
olderThan(Transaction)
}
Server -- PointStorage : points
PointStorage *-- PointRecord : points
PointStorage o-- Transaction : pending
PointRecord -- Transaction : holder
flowchart LR
subgraph Local Server
s[Server]
c1(Client) --> s
c2(Client) --> s
s --> c(CoordinateTx)
s --> h(HandleTx)
end
subgraph External Servers
c -.- eh1(HandleTx)
c -.- eh2(HandleTx)
h -.- ec(CoordinateTx)
eh1 --- s1[Server]
eh2 --- s2[Server]
ec --- s2
end
sequenceDiagram
participant c as Client
participant s as Server
participant ps as PointStorage
participant pr as PointRecord
participant e as External
c->>+s: Fill ( id: 1, amount: 1)
s ->>+ ps: coordinate( fill, 1, 1 )
ps ->> pr : coordinate( Transaction )
note over pr,e : Successful Transaction
pr ->> ps: Ok
ps ->> s: Ok
s ->>-c: Ok
sequenceDiagram
participant e as ExternalServer
participant s as Server
participant ps as PointStorage
participant pr as PointRecord
e->>+s: Transaction
s ->> ps: handle( Transaction )
ps ->> pr: handle( Transaction )
pr -->> e : Proceed
e ->> pr: Proceed | Abort
note over pr : Apply | Abort
s -->-e: end connection
El controlador es un programa que está por fuera del sistema principal. Se utiliza para enviar mensajes de control a los servidores.
Estos mensajes pueden ser:
Disconnect
: El servidor descartará todos los mensajes recibidos por otro servidor y fallará en enviar mensajes a otros servidores. Sin embargo, continúa recibiendo pedidos de las cafeteras.Connect
: El servidor recuperará la capacidad de enviar y recibir mensajes a otros servidores.
El programa escucha constantemente por stdin
por comandos indicando la acción a realizar y la dirección del servidor.
Suponiendo que nos encontramos en el root del proyecto.
make
correfmt
,test
yclippy
para el espacio de trabajo.- Coffee maker:
cargo run --bin coffee_maker <local_server> [<orders>] [sucess_chance]
- Local server:
cargo run --bin local_server <address> [<known_server_address>]
- Controller:
cargo run --bin controller
<Disconnect/Connect> <address>
- Tests:
cargo test
Nota: Las direcciones son de la forma
ip:puerto
opuerto
(en cuyo caso se usalocalhost
)
Los crates utilizados para el presente trabajo práctico fueron:
- futures: para el uso de futures dentro del contexto de actores en la implementación de la cafetera.
- actix y actix-rt: para la implementación de actores en la cafetera.
- tracing y tracing-subscriber: para loggear eventos tanto en la cafetera como en el servidor.
- rayon: para procesar paralelamente los streams dentro del servidor.
- serde y serde_json: para la serialización y deserialización de los mensajes.
- num_cpus: para obtener la cantidad de CPU cores disponibles en el sistema. Usado en la threadpool.
- std-semaphore: para la sincronización dentro de las transacciones pendientes (estados online y offline).
- serial_test: para serializar la ejecución de los tests de integración.