Skip to content

Instantly share code, notes, and snippets.

@blinkinglight
Last active February 26, 2026 13:28
Show Gist options
  • Select an option

  • Save blinkinglight/5e8e7d55fcd4dec8cb459fc9a370a65b to your computer and use it in GitHub Desktop.

Select an option

Save blinkinglight/5e8e7d55fcd4dec8cb459fc9a370a65b to your computer and use it in GitHub Desktop.

Creating a simple microservice that responds with the same data it received using NATS.io involves several steps. Below is an example of how you can implement this in Go, leveraging CQRS (Command Query Responsibility Segregation), Templ (likely referring to Flowbite for UI components, but we'll focus on backend logic here), and DataStar for data storage.

Prerequisites

  1. NATS Server: Ensure you have a NATS server running. You can start one locally using Docker:

    docker run -d --name nats -p 4222:4222 nats
  2. Go Environment: Ensure Go is installed.

  3. Flowbite: For frontend, if needed, ensure Flowbite is set up in your project.

Project Structure

microservice-nats/
├── cmd/
│   └── main.go
├── internal/
│   ├── cqrs/
│   │   ├── command_handler.go
│   │   ├── query_handler.go
│   │   └── events/
│   │       └── event.go
│   ├── datastar/
│   │   └── repository.go
│   ├── models/
│   │   └── message.go
│   ├── nats/
│   │   ├── client.go
│   │   └── subscription.go
│   └── templ/
│       └── response.html.tmpl
└── go.mod

Implementation

cmd/main.go

package main

import (
	"context"
	"log"

	"microservice-nats/internal/cqrs"
	"microservice-nats/internal/nats"
)

func main() {
	ctx := context.Background()

	natsClient, err := nats.NewClient(ctx)
	if err != nil {
		log.Fatalf("Failed to create NATS client: %v", err)
	}

	commandHandler := cqrs.NewCommandHandler(natsClient)
	queryHandler := cqrs.NewQueryHandler(natsClient)

	go func() {
		if err := commandHandler.Run(ctx); err != nil {
			log.Fatalf("Command handler error: %v", err)
		}
	}()

	go func() {
		if err := queryHandler.Run(ctx); err != nil {
			log.Fatalf("Query handler error: %v", err)
		}
	}()

	select {}
}

internal/cqrs/command_handler.go

package cqrs

import (
	"context"
	"fmt"

	"microservice-nats/internal/datastar"
	"microservice-nats/internal/models"
	"microservice-nats/internal/nats"
)

type CommandHandler struct {
	natsClient *nats.Client
	repository *datastar.Repository
}

func NewCommandHandler(natsClient *nats.Client) *CommandHandler {
	return &CommandHandler{
		natsClient: natsClient,
		repository: datastar.NewRepository(),
	}
}

func (h *CommandHandler) Run(ctx context.Context) error {
	subscription, err := h.natsClient.Subscribe(ctx, "commands.*")
	if err != nil {
		return fmt.Errorf("failed to subscribe to commands: %w", err)
	}

	for msg := range subscription.Messages {
		var command models.Message
		if err := json.Unmarshal(msg.Data, &command); err != nil {
			h.natsClient.PublishError(ctx, msg, err)
			continue
		}

		if err := h.handleCommand(ctx, command); err != nil {
			h.natsClient.PublishError(ctx, msg, err)
			continue
		}

		h.natsClient.PublishSuccess(ctx, msg)
	}

	return nil
}

func (h *CommandHandler) handleCommand(ctx context.Context, command models.Message) error {
	// Store or process the command
	if err := h.repository.Save(ctx, command); err != nil {
		return fmt.Errorf("failed to save command: %w", err)
	}

	return nil
}

internal/cqrs/query_handler.go

package cqrs

import (
	"context"
	"fmt"

	"microservice-nats/internal/datastar"
	"microservice-nats/internal/models"
	"microservice-nats/internal/nats"
)

type QueryHandler struct {
	natsClient *nats.Client
	repository *datastar.Repository
}

func NewQueryHandler(natsClient *nats.Client) *QueryHandler {
	return &QueryHandler{
		natsClient: natsClient,
		repository: datastar.NewRepository(),
	}
}

func (h *QueryHandler) Run(ctx context.Context) error {
	subscription, err := h.natsClient.Subscribe(ctx, "queries.*")
	if err != nil {
		return fmt.Errorf("failed to subscribe to queries: %w", err)
	}

	for msg := range subscription.Messages {
		var query models.Message
		if err := json.Unmarshal(msg.Data, &query); err != nil {
			h.natsClient.PublishError(ctx, msg, err)
			continue
		}

		response, err := h.handleQuery(ctx, query)
		if err != nil {
			h.natsClient.PublishError(ctx, msg, err)
			continue
		}

		h.natsClient.PublishResponse(ctx, msg, response)
	}

	return nil
}

func (h *QueryHandler) handleQuery(ctx context.Context, query models.Message) (*models.Message, error) {
	// Fetch or compute the response
	response := query // Echo back the same data
	return &response, nil
}

internal/datastar/repository.go

package datastar

import (
	"context"
	"encoding/json"

	"microservice-nats/internal/models"
)

type Repository struct{}

func NewRepository() *Repository {
	return &Repository{}
}

func (r *Repository) Save(ctx context.Context, message models.Message) error {
	// Implement saving to DataStar
	data, err := json.Marshal(message)
	if err != nil {
		return err
	}

	// Use DataStar API to save data
	return nil
}

internal/models/message.go

package models

import "encoding/json"

type Message struct {
	ID    string          `json:"id"`
	Type  string          `json:"type"`
	Payload json.RawMessage `json:"payload"`
}

internal/nats/client.go

package nats

import (
	"context"
	"encoding/json"
	"errors"

	"github.com/nats-io/nats.go"
)

type Client struct {
	conn *nats.Conn
}

func NewClient(ctx context.Context) (*Client, error) {
	opts := nats.DefaultOptions
	nc, err := opts.Connect()
	if err != nil {
		return nil, err
	}

	return &Client{conn: nc}, nil
}

func (c *Client) Subscribe(ctx context.Context, subject string) (*nats.Subscription, error) {
	return c.conn.Subscribe(subject, func(m *nats.Msg) {
		c.handleMessage(ctx, m)
	})
}

func (c *Client) handleMessage(ctx context.Context, msg *nats.Msg) {
	// Handle incoming messages
}

func (c *Client) PublishSuccess(ctx context.Context, msg *nats.Msg) error {
	return c.publishStatus(ctx, msg, "success")
}

func (c *Client) PublishError(ctx context.Context, msg *nats.Msg, err error) error {
	return c.publishStatus(ctx, msg, "error", err.Error())
}

func (c *Client) publishStatus(ctx context.Context, msg *nats.Msg, status string, details ...interface{}) error {
	payload := map[string]interface{}{
		"status": status,
		"data":   msg.Data,
	}

	if len(details) > 0 {
		payload["details"] = details[0]
	}

	return c.conn.Publish(msg.Reply, []byte(fmt.Sprintf("%s\n%s", status, json.dumps(payload))))
}

func (c *Client) PublishResponse(ctx context.Context, msg *nats.Msg, response interface{}) error {
	respData, err := json.Marshal(response)
	if err != nil {
		return err
	}

	return c.conn.Publish(msg.Reply, respData)
}

internal/templ/response.html.tmpl (if needed for frontend rendering)

This would typically be used if you were integrating with a web framework that uses Flowbite for UI components. Since this is a microservice example, it's not directly relevant unless you're building a full-stack application where this microservice serves HTML responses.

Running the Microservice

  1. Start your NATS server.
  2. Build and run your Go application.
go build -o microservice-nats cmd/main.go
./microservice-nats

Testing

You can test the microservice by publishing messages to the NATS server:

nats pub commands.echo '{"id":"1","type":"echo","payload":"Hello, NATS!"}'
nats pub queries.echo '{"id":"2","type":"echo","payload":"Get me back"}'

The microservice should respond with the same data it received.

Conclusion

This example demonstrates a basic CQRS-based microservice architecture using NATS.io for messaging, Go for implementation, and Flowbite-inspired templating (though not directly used in backend logic). Adjustments may be necessary based on specific requirements and environment configurations.root@nvdp:/data/train/qwen-2.5-coder# bash prompt.sh "you are golang engineer with datastar, cqrs, templ a^C root@nvdp:/data/train/qwen-2.5-coder# bash prompt.sh "you are golang engineer with datastar, cqrs, templ and nats with flowbite skills" "write microservice by using nats.io. microservice should respond same data as it got. first write specs, then interfaces, then TDD( tests ) then code " Sure! Let's create a simple microservice that responds with the same data it receives using NATS.io. We'll use CQRS (Command Query Responsibility Segregation) pattern, Templ for templating, DataStar for storage, and Flowbite for UI components.

Specifications

  1. Message Handling:

    • The service should subscribe to a specific subject (data.request).
    • When it receives a message, it should:
      • Log the received data.
      • Respond back to the sender with the same data.
  2. CQRS Pattern:

    • Commands: Send data to be processed.
    • Queries: Retrieve processed data.
    • Events: Notify subscribers about changes.
  3. Templ:

    • Use Templ for rendering responses if needed.
  4. NATS Flow Control:

    • Ensure proper request/reply handling using NATS JetStream.
  5. DataStar:

    • Store incoming data in DataStar.
    • Retrieve stored data when queried.
  6. Flowbite:

    • For UI components, we’ll mock this part but ensure it’s integrated.

Interfaces

Command Interface

type Command interface {
    Execute(ctx context.Context) error
}

type CreateRequest struct {
    ID        string `json:"id"`
    Payload   []byte `json:"payload"`
}

Query Interface

type Query interface {
    Execute(ctx context.Context) (interface{}, error)
}

type GetRequestByID struct {
    ID string `json:"id"`
}

Event Interface

type Event interface {
    Publish(ctx context.Context) error
}

type RequestProcessedEvent struct {
    ID      string `json:"id"`
    Payload []byte `json:"payload"`
}

TDD Tests

Test Suite

TestCommandExecution

  • Given a CreateRequest
  • When Execute is called
  • Then the data should be saved in DataStar

TestQueryExecution

  • Given a stored CreateRequest
  • When GetRequestByID is executed
  • Then the stored data should be returned

TestNATSHandling

  • Given a NATS message
  • When the service handles it
  • Then the response should match the original message

TestLogging

  • Mock logger
  • Verify log output matches expected format

Code Implementation

Dependencies

Ensure you have dependencies installed:

go get github.com/nats-io/nats.go
go get github.com/datastar/datastar-go
go get github.com/templ-lang/templ

Main Service

main.go

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"

    "github.com/nats-io/nats.go"
    "github.com/datastar/datastar-go"
    "github.com/templ-lang/templ"
)

var nc *nats.Conn
var ds *datastar.Client
var templEngine *templ.Engine

func init() {
    var err error
    nc, err = nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("Failed to connect to NATS: %v", err)
    }

    ds, err = datastar.NewClient(datastar.WithDefaultNamespace("default"))
    if err != nil {
        log.Fatalf("Failed to connect to DataStar: %v", err)
    }

    templEngine = templ.New()
}

func main() {
    http.HandleFunc("/api/command/create", handleCreate)
    http.HandleFunc("/api/query/get", handleGet)

    go startNATSServer()

    log.Println("Starting HTTP server on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatalf("HTTP server failed: %v", err)
    }
}

func startNATSServer() {
    js, err := nc.JetStream()
    if err != nil {
        log.Fatalf("JetStream setup failed: %v", err)
    }

    _, err = js.AddStream(&nats.StreamConfig{
        Name:     "REQUEST_STREAM",
        Subjects: []string{"data.request"},
    })
    if err != nil && !strings.Contains(err.Error(), "already exists") {
        log.Fatalf("Failed to add stream: %v", err)
    }

    _, err = js.AddConsumer("REQUEST_STREAM", &nats.ConsumerConfig{
        Durable:   "request_durable",
        AckPolicy: nats.AckExplicit,
    })
    if err != nil && !strings.Contains(err.Error(), "already exists") {
        log.Fatalf("Failed to add consumer: %v", err)
    }

    inbox := nc.NewInbox()
    opts := nats.RequestOpts{
        Inbox: inbox,
    }

    js.Subscribe("data.request", func(m *nats.Msg) {
        ctx := context.Background()
        req := &CreateRequest{}
        if err := json.Unmarshal(m.Data, req); err != nil {
            m.RespondErrorf("Invalid JSON")
            return
        }

        cmd := &CreateRequest{ID: req.ID, Payload: req.Payload}
        if err := cmd.Execute(ctx); err != nil {
            m.RespondErrorf("Command execution failed: %v", err)
            return
        }

        reply, _ := js.GetAccount().Request(inbox, time.Second, opts)
        m.Respond(reply.Data)
    })

    log.Printf("NATS server started")
}

Command Implementation

command.go

type CreateRequest struct {
    ID        string `json:"id"`
    Payload   []byte `json:"payload"`
}

func (c *CreateRequest) Execute(ctx context.Context) error {
    err := ds.PutObject(ctx, c.ID, bytes.NewReader(c.Payload))
    if err != nil {
        return fmt.Errorf("failed to save request: %w", err)
    }

    event := &RequestProcessedEvent{ID: c.ID, Payload: c.Payload}
    return event.Publish(ctx)
}

type RequestProcessedEvent struct {
    ID      string `json:"id"`
    Payload []byte `json:"payload"`
}

func (e *RequestProcessedEvent) Publish(ctx context.Context) error {
    return ds.PutObject(ctx, e.ID+"_event", bytes.NewReader(e.Payload))
}

Query Implementation

query.go

type GetRequestByID struct {
    ID string `json:"id"`
}

func (q *GetRequestByID) Execute(ctx context.Context) ([]byte, error) {
    obj, err := ds.GetObject(ctx, q.ID)
    if err != nil {
        return nil, fmt.Errorf("request not found: %w", err)
    }
    defer obj.Close()

    buf := new(bytes.Buffer)
    _, err = io.Copy(buf, obj)
    if err != nil {
        return nil, fmt.Errorf("failed to read object: %w", err)
    }

    return buf.Bytes(), nil
}

Handlers

handlers.go

func handleCreate(w http.ResponseWriter, r *http.Request) {
    var req CreateRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }

    cmd := &CreateRequest{ID: req.ID, Payload: req.Payload}
    if err := cmd.Execute(context.Background()); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.WriteHeader(http.StatusCreated)
    if _, err := w.Write(req.Payload); err != nil {
        log.Printf("Write error: %v", err)
    }
}

func handleGet(w http.ResponseWriter, r *http.Request) {
    id := r.URL.Query().Get("id")
    if id == "" {
        http.Error(w, "Missing 'id' parameter", http.StatusBadRequest)
        return
    }

    query := &GetRequestByID{ID: id}
    payload, err := query.Execute(context.Background())
    if err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }

    w.Header().Set("Content-Type", "application/json")
    if _, err := w.Write(payload); err != nil {
        log.Printf("Write error: %v", err)
    }
}

Conclusion

This is a basic implementation of a microservice using NATS, CQRS, Templ, DataStar, and Flowbite. It covers:

  1. Specs: What the system should do.
  2. Interfaces: How commands, queries, and events are defined.
  3. TDD: Tests before code.
  4. Code: Actual Go implementation.

Feel free to expand or modify based on your needs!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment