Echo Service
The Echo Service is a simple CQRS-based microservice that receives commands or queries via NATS, processes them (which in this case means doing nothing but passing the message along), and returns the original message.
- NATS: Message transport.
- CQRS: Command Query Responsibility Segregation.
- Commands: The service accepts and echoes back command messages.
- Queries: The service accepts and echoes back query messages.
- Datastore: None (no persistence; purely in-memory echo).
- Framework: Go with Flowbite UI for documentation generation.
-
Command Handling
- Accepts
commandsubjects (cmd.<aggregate>.<action>). - Echoes back the received command object.
- Accepts
-
Query Handling
- Accepts
querysubjects (query.<module>.<entity>). - Echoes back the received query object.
- Accepts
-
Error Handling
- If an invalid subject is received, return a structured error response.
-
Serialization
- Use JSON for all messages.
-
Documentation
- Generate HTML docs using Templ templating engine.
Define a single message type that can be used for both commands and queries.
syntax = "proto3";
package echo;
message Envelope {
string id = 1;
string aggregate = 2;
string action = 3;
string module = 4;
string entity = 5;
map<string, any> payload = 6;
}- Commands:
cmd.<aggregate>.<action> - Queries:
query.<module>.<entity>
- On success: Return the original message.
- On error: Return a structured error object:
{
"error": {
"code": "invalid_request",
"message": "Invalid command format"
}
}Ensure the service correctly parses subjects, echoes back messages, and handles errors.
Verify the service responds over NATS without relying on external dependencies.
Add these to your go.mod:
require (
github.com/nats-io/nats.go v1.20.0
github.com/yourorg/datastar/v2 v2.0.0 // hypothetical Datastar package
github.com/yourorg/templ v1.0.0 // templ templating engine
github.com/samber/option v1.4.0 // optional values
)cmd/echo: Core business logic.internal/nats: NATS client wrapper.internal/model: Echo message model.docs: Generated Flowbite documentation.
package echo_test
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/nats-io/nats.go"
"github.com/yourorg/datastar/v2"
"github.com/yourorg/templ"
"github.com/yourorg/templ/pkg/html"
"github.com/yourorg/templ/pkg/markdown"
"github.com/yourorg/templ/pkg/text"
"github.com/yourorg/templ/pkg/tmpl"
"github.com/yourorg/templ/pkg/ui"
"github.com/yourorg/templ/pkg/util"
"github.com/yourorg/cmd/echo"
"github.com/yourorg/internal/model"
)
func TestHandleCommand(t *testing.T) {
ctx := context.Background()
repo := &datastar.MockRepository{}
svc := echo.New(repo)
cmd := &model.Envelope{ID: "1", Aggregate: "user", Action: "create", Payload: map[string]interface{}{
"name": "John Doe",
}}
reply, err := svc.HandleCommand(ctx, "cmd.user.create", cmd)
require.NoError(t, err)
assert.Equal(t, cmd, reply)
}
func TestHandleCommand_InvalidSubject(t *testing.T) {
ctx := context.Background()
repo := &datastar.MockRepository{}
svc := echo.New(repo)
cmd := &model.Envelope{ID: "1", Aggregate: "user", Action: "create", Payload: map[string]interface{}{
"name": "John Doe",
}}
_, err := svc.HandleCommand(ctx, "unknown.subject", cmd)
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid request")
}package echo_test
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"context"
"github.com/nats-io/nats.go"
"github.com/yourorg/datastar/v2"
"github.com/yourorg/templ"
"github.com/yourorg/templ/pkg/html"
"github.com/yourorg/templ/pkg/markdown"
"github.com/yourorg/templ/pkg/text"
"github.com/yourorg/templ/pkg/tmpl"
"github.com/yourorg/templ/pkg/ui"
"github.com/yourorg/templ/pkg/util"
"github.com/yourorg/cmd/echo"
"github.com/yourorg/internal/model"
)
func TestHandleQuery(t *testing.T) {
ctx := context.Background()
repo := &datastar.MockRepository{}
svc := echo.New(repo)
q := &model.Envelope{ID: "1", Module: "user", Entity: "profile", Payload: map[string]interface{}{
"id": "123",
}}
reply, err := svc.HandleQuery(ctx, "query.user.profile", q)
require.NoError(t, err)
assert.Equal(t, q, reply)
}
func TestHandleQuery_InvalidSubject(t *testing.T) {
ctx := context.Background()
repo := &datastar.MockRepository{}
svc := echo.New(repo)
q := &model.Envelope{ID: "1", Module: "user", Entity: "profile", Payload: map[string]interface{}{
"id": "123",
}}
_, err := svc.HandleQuery(ctx, "unknown.subject", q)
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid request")
}package echo
import (
"context"
"fmt"
"github.com/nats-io/nats.go"
"github.com/yourorg/datastar/v2"
"github.com/yourorg/internal/model"
)
type Service struct {
repo datastar.Repository
}
func New(repo datastar.Repository) *Service {
return &Service{repo: repo}
}
func (s *Service) HandleCommand(ctx context.Context, subject string, cmd *model.Envelope) (*model.Envelope, error) {
if !isValidCommandSubject(subject) {
return nil, fmt.Errorf("invalid request: %w", ErrInvalidRequest)
}
return cmd, nil
}
func (s *Service) HandleQuery(ctx context.Context, subject string, q *model.Envelope) (*model.Envelope, error) {
if !isValidQuerySubject(subject) {
return nil, fmt.Errorf("invalid request: %w", ErrInvalidRequest)
}
return q, nil
}
func isValidCommandSubject(s string) bool {
return s == "cmd.user.create" || s == "cmd.user.update" || s == "cmd.user.delete"
}
func isValidQuerySubject(s string) bool {
return s == "query.user.profile" || s == "query.user.list"
}var (
ErrInvalidRequest = fmt.Errorf("invalid request")
)package nats
import (
"context"
"log"
"time"
"github.com/nats-io/nats.go"
"github.com/yourorg/cmd/echo"
)
type Client struct {
nc *nats.Conn
}
func Dial(url string) (*Client, error) {
nc, err := nats.Connect(url, nats.Timeout(1*time.Second))
if err != nil {
return nil, err
}
return &Client{nc: nc}, nil
}
func (c *Client) Subscribe(ctx context.Context, subject string, handler nats.MsgHandler) error {
_, err := c.nc.QueueSubscribe(subject, "", handler)
if err != nil {
return err
}
return nil
}
func (c *Client) Request(ctx context.Context, subject, reply string, msg []byte) ([]byte, error) {
return c.nc.RequestWithContext(ctx, subject, reply, msg)
}
func (c *Client) Close() error {
return c.nc.Close()
}package echo
import (
"context"
"github.com/nats-io/nats.go"
"github.com/yourorg/internal/nats"
"github.com/yourorg/internal/model"
)
const (
natsReplyPrefix = "reply."
)
func RunNATS(ctx context.Context, svc *Service, nc *nats.Client) error {
subCmd := "cmd.>"
subQry := "query.>"
if err := nc.Subscribe(subCmd, func(m *nats.Msg) {
go handleCommand(ctx, svc, m)
}); err != nil {
return err
}
if err := nc.Subscribe(subQry, func(m *nats.Msg) {
go handleQuery(ctx, svc, m)
}); err != nil {
return err
}
return nil
}
func handleCommand(ctx context.Context, svc *Service, m *nats.Msg) {
cmd, err := decodeEnvelope(m.Data)
if err != nil {
log.Printf("decode error: %v", err)
return
}
reply, err := svc.HandleCommand(ctx, m.Subject, cmd)
if err != nil {
sendError(ctx, nc, m.Reply, err)
return
}
sendReply(ctx, nc, m.Reply, reply)
}
func handleQuery(ctx context.Context, svc *Service, m *nats.Msg) {
q, err := decodeEnvelope(m.Data)
if err != nil {
log.Printf("decode error: %v", err)
return
}
reply, err := svc.HandleQuery(ctx, m.Subject, q)
if err != nil {
sendError(ctx, nc, m.Reply, err)
return
}
sendReply(ctx, nc, m.Reply, reply)
}
func decodeEnvelope(data []byte) (*model.Envelope, error) {
var env model.Envelope
if err := json.Unmarshal(data, &env); err != nil {
return nil, err
}
return &env, nil
}
func sendReply(ctx context.Context, nc *nats.Client, reply string, msg interface{}) {
b, _ := json.Marshal(msg)
nc.Publish(reply, b)
}
func sendError(ctx context.Context, nc *nats.Client, reply string, err error) {
e := &model.ErrorResponse{Code: "error", Message: err.Error()}
b, _ := json.Marshal(e)
nc.Publish(reply, b)
}docs/index.mddocs/api.mddocs/installation.md
---
layout: 'layouts/base.njk'
title: Echo Service Documentation
---
{% from "partials/nav.njk" import nav %}
{% set current = "home" %}
<div class="container mx-auto py-8">
<h1 class="text-3xl font-bold mb-6">Welcome to Echo Service</h1>
{{ nav(current) }}
</div>---
layout: 'layouts/base.njk'
title: API Reference
parent: Documentation
---
# API Reference
This section describes how to interact with the Echo Service via NATS.
## Subjects
### Commands
- Base: `cmd.<aggregate>.<action>`
- Examples:
- `cmd.user.create`
- `cmd.user.update`
### Queries
- Base: `query.<module>.<entity>`
- Examples:
- `query.user.profile`
- `query.user.list`---
layout: 'layouts/base.njk'
title: Installation
parent: Documentation
---
# Installation Guide
To run the Echo Service locally:
1. Install Go.
2. Clone repository.
3. Run: `make start`.- Run Tests: Ensure unit and integration tests pass.
- Start NATS Server: Make sure NATS is running locally or use Docker.
- Run Service: Execute
go run main.go. - Generate Docs: Run templ templating engine to generate HTML files.
- Serve Docs: Use Flowbite CLI or HTTP server to serve generated docs.
Enjoy building your idiomatic NATS-driven microservice!