Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
zarnoevic committed Feb 27, 2024
1 parent d380978 commit 7193946
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 11 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ jobs:
cd src/cmd/client
go build -o clientexec
- name: Check if executable was created
- name: Check if client executable was created
run: |
ls src/cmd/client/clientexec
- name: Build the consumerService
run: |
cd src/cmd/server
go build -o serverexec
- name: Check if consumerService executable was created
run: |
ls src/cmd/server/serverexec
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,28 @@ Run the following command to start the service:
sudo docker-compose -f docker/rabbitClient.docker-compose.yaml --env-file .env up -d
```

## Running Client
## Running the Client

Run the following command to start the service:
Run the following command to start the Client:

```shell
sudo docker-compose -f docker/client.docker-compose.yaml --scale client=1 --env-file .env up -d
```

### Scaling the client
### Scaling the Client

Change the `--scale client=1` parameter to any number of clients you'd want to run in parallel.

## Running the Server

Run the following command to start the Server:

```shell
sudo docker-compose -f docker/consumerService.docker-compose.yaml --scale consumerService=1 --env-file .env up -d
```

### Scaling the Client

Change the `--scale server=1` parameter to any number of clients you'd want to run in parallel.


17 changes: 17 additions & 0 deletions docker/server.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM --platform=linux/amd64 golang:1.22 as builder

WORKDIR /app

ADD . /app

RUN go mod tidy && go build -o server_exec ./src/cmd/server/

FROM --platform=linux/amd64 alpine:latest

RUN apk --no-cache add ca-certificates

WORKDIR /root/

COPY --from=builder /app/server_exec .

CMD ["./client_exec"]
16 changes: 16 additions & 0 deletions docker/server.docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: '3.8'

services:
client:
build:
context: .
dockerfile: server.Dockerfile
environment:
RABBITMQ_USER: ${RABBITMQ_USER}
RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD}
RABBITMQ_AMQP_PORT: ${RABBITMQ_AMQP_PORT}
RABBITMQ_AMQP_HOST: ${RABBITMQ_AMQP_HOST}
COMMANDS_PATH: /resources/commands.csv
volumes:
- ./resources:/resources

2 changes: 1 addition & 1 deletion src/cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"os"

producerService "github.com/zarnoevic/go-rabbitmq/src/pkg/service/producer"
"github.com/zarnoevic/go-rabbitmq/src/pkg/services/producerService"
)

func main() {
Expand Down
40 changes: 40 additions & 0 deletions src/cmd/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"fmt"
"log"
"os"

"github.com/zarnoevic/go-rabbitmq/src/pkg/orderedmap"
"github.com/zarnoevic/go-rabbitmq/src/pkg/rabbitClient"
"github.com/zarnoevic/go-rabbitmq/src/pkg/services/consumerService"
)

func main() {
rabbitMQUser := os.Getenv("RABBITMQ_USER")
rabbitMQPassword := os.Getenv("RABBITMQ_PASSWORD")
rabbitMQAMQPPort := os.Getenv("RABBITMQ_AMQP_PORT")
rabbitMQAMQPHost := os.Getenv("RABBITMQ_AMQP_HOST")

amqpURL := fmt.Sprintf("amqp://%s:%s@%s:%s/", rabbitMQUser, rabbitMQPassword, rabbitMQAMQPHost, rabbitMQAMQPPort)
queueName := "yourQueueName"

client, err := rabbitClient.NewRabbitClient(amqpURL, queueName)
if err != nil {
log.Fatalf("Failed to create RabbitMQ client: %s", err)
}
defer client.Close()

msgs, err := client.Consume()
if err != nil {
log.Fatalf("Failed to consume messages: %s", err)
}

omap := orderedmap.NewOrderedMap()

processor := consumerService.NewConsumerService(omap)

for msg := range msgs {
processor.ProcessMessage(msg)
}
}
8 changes: 4 additions & 4 deletions src/pkg/orderedmap/orderedmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewOrderedMap() *OrderedMap {
}
}

func (om *OrderedMap) AddItem(key, value string) {
func (om *OrderedMap) Add(key, value string) {
om.Lock()
defer om.Unlock()

Expand All @@ -32,7 +32,7 @@ func (om *OrderedMap) AddItem(key, value string) {
om.values[key] = keyValue{key, value}
}

func (om *OrderedMap) DeleteItem(key string) {
func (om *OrderedMap) Delete(key string) {
om.Lock()
defer om.Unlock()

Expand All @@ -47,15 +47,15 @@ func (om *OrderedMap) DeleteItem(key string) {
}
}

func (om *OrderedMap) GetItem(key string) (string, bool) {
func (om *OrderedMap) Get(key string) (string, bool) {
om.RLock()
defer om.RUnlock()

kv, exists := om.values[key]
return kv.value, exists
}

func (om *OrderedMap) GetAllItems() []keyValue {
func (om *OrderedMap) GetAll() []keyValue {
om.RLock()
defer om.RUnlock()

Expand Down
1 change: 0 additions & 1 deletion src/pkg/service/server/service.go

This file was deleted.

88 changes: 88 additions & 0 deletions src/pkg/services/consumerService/consumerService.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package consumerService

import (
"encoding/json"
"fmt"
"log"
"sync"

"github.com/rabbitmq/amqp091-go"
"github.com/zarnoevic/go-rabbitmq/src/pkg/orderedmap"
)

type ConsumerService struct {
omap *orderedmap.OrderedMap
rwmu sync.RWMutex
}

func NewConsumerService(omap *orderedmap.OrderedMap) *ConsumerService {
return &ConsumerService{
omap: omap,
}
}

func (cs *ConsumerService) ProcessMessage(d amqp091.Delivery) {
var command []string
err := json.Unmarshal(d.Body, &command)
if err != nil {
log.Printf("Error unmarshalling message: %s", err)
return
}

switch command[0] {
case "add":
cs.handleAdd(command)
case "get":
cs.handleGet(command)
case "delete":
cs.handleDelete(command)
case "getAll":
cs.handleGetAll()
default:
log.Printf("Unknown command: %s", command[0])
}
}

func (cs *ConsumerService) handleAdd(command []string) {
if len(command) != 3 {
log.Println("Add command requires 2 arguments: key and value")
return
}
cs.rwmu.Lock()
cs.omap.Add(command[1], command[2])
cs.rwmu.Unlock()
}

func (cs *ConsumerService) handleGet(command []string) {
if len(command) != 2 {
log.Println("Get command requires 1 argument: key")
return
}
cs.rwmu.RLock()
value, found := cs.omap.Get(command[1])
cs.rwmu.RUnlock()
if found {
fmt.Printf("Get: %s = %s\n", command[1], value)
} else {
fmt.Printf("Key not found: %s\n", command[1])
}
}

func (cs *ConsumerService) handleDelete(command []string) {
if len(command) != 2 {
log.Println("Delete command requires 1 argument: key")
return
}
cs.rwmu.Lock()
cs.omap.Delete(command[1])
cs.rwmu.Unlock()
}

func (cs *ConsumerService) handleGetAll() {
cs.rwmu.RLock()
allItems := cs.omap.GetAll()
cs.rwmu.RUnlock()
for key, value := range allItems {
fmt.Printf("%s: %s\n", key, value)

Check failure on line 86 in src/pkg/services/consumerService/consumerService.go

View workflow job for this annotation

GitHub Actions / Run Go Tests

fmt.Printf format %s has arg key of wrong type int
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"sync"

rabbitClient "github.com/zarnoevic/go-rabbitmq/src/pkg/rabbitClient"
"github.com/zarnoevic/go-rabbitmq/src/pkg/rabbitClient"
)

func ProcessCSV(filePath, amqpURL, queueName string) error {
Expand Down

0 comments on commit 7193946

Please sign in to comment.