Skip to content

Commit

Permalink
feat: add the basic documentation (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson authored Dec 29, 2023
1 parent da1f017 commit d236c72
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 11 deletions.
182 changes: 181 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,182 @@
# psqlqueue
Simple queue system powered by Golang and PostgreSQL
[![test](https://github.com/allisson/psqlqueue/actions/workflows/test.yml/badge.svg)](https://github.com/allisson/psqlqueue/actions/workflows/test.yml)
[![Go Report Card](https://goreportcard.com/badge/github.com/allisson/psqlqueue)](https://goreportcard.com/report/github.com/allisson/psqlqueue)
[![Docker Repository on Quay](https://quay.io/repository/allisson/psqlqueue/status "Docker Repository on Quay")](https://quay.io/repository/allisson/psqlqueue)

Simple queue system powered by Golang and PostgreSQL.

## quickstart

The idea of this service is to offer a simple queuing system using PostgreSQL as a backend.

First, we need a PostgreSQL database, for this, we will use docker:

```bash
docker run --name postgres-psqlqueue \
--restart unless-stopped \
-e POSTGRES_USER=psqlqueue \
-e POSTGRES_PASSWORD=psqlqueue \
-e POSTGRES_DB=psqlqueue \
-p 5432:5432 \
-d postgres:15-alpine
```

Now let's run the database migrations before starting the server:

```bash
docker run --rm \
-e PSQLQUEUE_DATABASE_URL='postgres://psqlqueue:[email protected]:5432/psqlqueue?sslmode=disable' \
quay.io/allisson/psqlqueue migrate
```

```json
{"time":"2023-12-29T21:11:39.516360369Z","level":"INFO","msg":"migration process started"}
{"time":"2023-12-29T21:11:39.54908151Z","level":"INFO","msg":"migration process finished"}
```

Starting the server:

```bash
docker run --rm \
-e PSQLQUEUE_DATABASE_URL='postgres://psqlqueue:[email protected]:5432/psqlqueue?sslmode=disable' \
-p 8000:8000 \
quay.io/allisson/psqlqueue server
```

```json
{"time":"2023-12-29T21:14:30.898080659Z","level":"INFO","msg":"http server starting","host":"0.0.0.0","port":8000}
```

For creating a new queue we have these fields:
- "id": The identifier of this new queue.
- "ack_deadline_seconds": The maximum time before the consumer should acknowledge the message, after this time the message will be delivered again to consumers.
- "message_retention_seconds": The maximum time in which the message must be delivered to consumers, after this time the message will be marked as expired.
- "delivery_delay_seconds": The number of seconds to postpone the delivery of new messages to consumers.

```bash
curl --location 'http://localhost:8000/v1/queues' \
--header 'Content-Type: application/json' \
--data '{
"id": "my-new-queue",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0
}'
```

```json
{
"id": "my-new-queue",
"ack_deadline_seconds": 30,
"message_retention_seconds": 1209600,
"delivery_delay_seconds": 0,
"created_at": "2023-12-29T21:30:58.682194763Z",
"updated_at": "2023-12-29T21:30:58.682194763Z"
}
```

For creating a new message we have these fields:
- "body": The body of the message.
- "label": A label that allows this message to be filtered.
- "attributes": The message attributes.

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages' \
--header 'Content-Type: application/json' \
--data '{
"body": "message body",
"label": "my-label",
"attributes": {"attribute1": "attribute1", "attribute2": "attribute2"}
}'
```

For consuming the messages we have these filters:
- "label": To filter by the message label.
- "limit": To limit the number of messages.

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages?limit=1'
```

```json
{
"data": [
{
"id": "01HJVRCQVAD9VBT10MCS74T0EN",
"queue_id": "my-new-queue",
"label": "my-label",
"body": "message body",
"attributes": {
"attribute1": "attribute1",
"attribute2": "attribute2"
},
"delivery_attempts": 1,
"created_at": "2023-12-29T21:41:25.994731Z"
}
],
"limit": 1
}
```

Now you have 30 seconds to execute the ack or nack for this message, first we can do the nack:

```bash
curl --location --request PUT 'http://localhost:8000/v1/queues/my-new-queue/messages/01HJVRCQVAD9VBT10MCS74T0EN/nack' \
--header 'Content-Type: application/json' \
--data '{
"visibility_timeout_seconds": 30
}'
```

Now we need to wait 30 seconds before consuming this message again, after this time:

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages?limit=1'
```

```json
{
"data": [
{
"id": "01HJVRCQVAD9VBT10MCS74T0EN",
"queue_id": "my-new-queue",
"label": "my-label",
"body": "message body",
"attributes": {
"attribute1": "attribute1",
"attribute2": "attribute2"
},
"delivery_attempts": 2,
"created_at": "2023-12-29T21:41:25.994731Z"
}
],
"limit": 1
}
```

Now it's time to ack the message:

```bash
curl --location --request PUT 'http://localhost:8000/v1/queues/my-new-queue/messages/01HJVRCQVAD9VBT10MCS74T0EN/ack'
```

Let's try to consume the messages again:

```bash
curl --location 'http://localhost:8000/v1/queues/my-new-queue/messages/?limit=1'
```

```json
{
"data": [],
"limit": 1
}
```

After the ack, the message remains in the database marked as expired, to remove expired messages we can use the cleanup endpoint:

```bash
curl --location --request PUT 'http://localhost:8000/v1/queues/my-new-queue/cleanup'
```

This is the basics of using this service, I recommend that you check the swagger documentation at http://localhost:8000/v1/swagger/index.html to see more options.
2 changes: 1 addition & 1 deletion docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ const docTemplate = `{
},
{
"type": "string",
"description": "Label",
"description": "Filter by label",
"name": "label",
"in": "path"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@
},
{
"type": "string",
"description": "Label",
"description": "Filter by label",
"name": "label",
"in": "path"
},
Expand Down
2 changes: 1 addition & 1 deletion docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ paths:
name: queue_id
required: true
type: string
- description: Label
- description: Filter by label
in: path
name: label
type: string
Expand Down
12 changes: 5 additions & 7 deletions http/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ type messageResponse struct {

// nolint:unused
type messageListRequest struct {
Label *string `form:"label" validate:"optional"`
Offset uint `form:"offset" validate:"required"`
Limit uint `form:"limit" validate:"required"`
Label *string `form:"label" validate:"optional"`
Limit uint `form:"limit" validate:"required"`
} //@name MessageListRequest

// nolint:unused
Expand Down Expand Up @@ -92,7 +91,7 @@ func (m *MessageHandler) Create(c *gin.Context) {
// @Accept json
// @Produce json
// @Param queue_id path string true "Queue id"
// @Param label path string false "Label"
// @Param label path string false "Filter by label"
// @Param limit query int false "The limit indicates the maximum number of items to return"
// @Success 200 {object} messageListResponse
// @Failure 404 {object} errorResponse
Expand All @@ -101,13 +100,12 @@ func (m *MessageHandler) Create(c *gin.Context) {
func (m *MessageHandler) List(c *gin.Context) {
queueID := c.Param("queue_id")

request := messageListRequest{Offset: 0, Limit: 10}
request := messageListRequest{Limit: 10}
if err := c.ShouldBindQuery(&request); err != nil {
slog.Warn("message list request error", "error", err)
}

request.Limit = min(request.Limit, m.cfg.QueueMaxNumberOfMessages)
request.Offset = 0

messages, err := m.messageService.List(c.Request.Context(), queueID, request.Label, request.Limit)
if err != nil {
Expand All @@ -116,7 +114,7 @@ func (m *MessageHandler) List(c *gin.Context) {
return
}

response := listResponse{Data: messages, Offset: request.Offset, Limit: request.Limit}
response := listResponse{Data: messages, Offset: 0, Limit: request.Limit}

c.JSON(http.StatusOK, response)
}
Expand Down

0 comments on commit d236c72

Please sign in to comment.