Skip to content

Emitter

Thread-safe async event system with context support for decoupled module communication

Overview

Event-Driven Architecture

Base Framework's Emitter provides a thread-safe, async event system that enables decoupled communication between structures. It supports synchronous and asynchronous event emission, context cancellation, timeouts, and automatic panic recovery for robust event handling.

Thread-Safe

Concurrent-safe operations with read-write mutexes ensuring data integrity across goroutines.

Async Events

Support for both blocking and non-blocking event emission with goroutine-based execution.

Context Support

Built-in context cancellation and timeout support for better control over event processing.

Basic Usage

Creating and Using an Emitter

Emitter Initialization and Basic Events

go
import (
    "base/core/emitter"
    "base/core/logger"
)

// Initialize emitter
e := emitter.New()

// Register event listeners
e.On("user.created", func(data any) {
    if user, ok := data.(*User); ok {
        log.Info("New user registered",
            logger.String("email", user.Email),
            logger.Int("id", int(user.ID)))
    }
})

e.On("user.created", func(data any) {
    // Multiple listeners can handle the same event
    if user, ok := data.(*User); ok {
        // Send welcome email, update analytics, etc.
        sendWelcomeEmail(user)
    }
})

// Emit events
user := &User{ID: 1, Email: "john@example.com"}
e.Emit("user.created", user) // Blocks until all listeners complete

// Service integration example
type UserService struct {
    DB      *gorm.DB
    Emitter *emitter.Emitter
    Logger  logger.Logger
}

func (s *UserService) CreateUser(req CreateUserRequest) (*User, error) {
    user := &User{
        Email: req.Email,
        Name:  req.Name,
    }
    
    if err := s.DB.Create(user).Error; err != nil {
        return nil, err
    }
    
    // Emit event after successful creation
    s.Emitter.Emit("user.created", user)
    
    return user, nil
}

API Reference

Core Methods

New() *Emitter

Creates a new emitter instance with initialized listener map.

go
e := emitter.New()

On(event string, listener func(any))

Registers an event listener for the specified event name. Multiple listeners can be registered for the same event.

go
e.On("user.login", func(data any) { ... })

Emit(event string, data any)

Emits an event synchronously, blocking until all listeners complete. Uses WaitGroup internally to ensure all goroutines finish.

go
e.Emit("order.completed", order)

EmitAsync(event string, data any)

Emits an event asynchronously without blocking. Listeners run in separate goroutines and the method returns immediately.

go
e.EmitAsync("analytics.track", eventData)

EmitWithContext(ctx context.Context, event string, data any) error

Emits an event with context support. Returns an error if the context is cancelled before all listeners complete.

go
err := e.EmitWithContext(ctx, "file.process", fileData)

EmitWithTimeout(event string, data any, timeout time.Duration) error

Emits an event with a timeout. Returns an error if listeners don't complete within the specified duration.

go
err := e.EmitWithTimeout("heavy.task", data, 30*time.Second)

ListenerCount(event string) int

Returns the number of listeners registered for a specific event.

go
count := e.ListenerCount("user.created")

EventNames() []string

Returns all registered event names as a slice of strings.

go
events := e.EventNames()

Clear()

Removes all registered listeners for all events. Useful for testing or cleanup.

go
e.Clear()

Async Events & Context Support

Asynchronous Event Patterns

Non-blocking Event Emission

go
// Non-blocking events for analytics, logging, notifications
e.EmitAsync("analytics.page_view", analyticsData)
e.EmitAsync("audit.log", auditEvent)
e.EmitAsync("notification.send", notificationData)

// Async event handlers
e.On("file.uploaded", func(data any) {
    if file, ok := data.(*UploadedFile); ok {
        // Heavy processing that shouldn't block the request
        go processImageThumbnails(file)
        go scanForVirus(file)
        go updateSearchIndex(file)
    }
})

// Fire-and-forget pattern for non-critical operations
func (s *PostService) PublishPost(post *Post) error {
    if err := s.DB.Save(post).Error; err != nil {
        return err
    }
    
    // These operations shouldn't block the HTTP response
    s.Emitter.EmitAsync("post.published", post)
    s.Emitter.EmitAsync("analytics.content_created", post)
    s.Emitter.EmitAsync("social.auto_share", post)
    
    return nil
}

// Mixed patterns: critical vs non-critical events
func (s *OrderService) ProcessOrder(order *Order) error {
    // Critical event - wait for completion
    s.Emitter.Emit("order.validated", order)
    
    if err := s.DB.Save(order).Error; err != nil {
        return err
    }
    
    // Critical notifications
    s.Emitter.Emit("order.confirmed", order)
    
    // Non-critical analytics and marketing
    s.Emitter.EmitAsync("analytics.order", order)
    s.Emitter.EmitAsync("marketing.customer_activity", order)
    
    return nil
}

Context and Timeout Handling

Context-Aware Event Processing

go
import (
    "context"
    "time"
)

// Context cancellation example
func (s *FileService) ProcessLargeFile(ctx context.Context, file *File) error {
    // Process file with context support
    if err := s.EmitWithContext(ctx, "file.processing", file); err != nil {
        if errors.Is(err, context.Canceled) {
            s.Logger.Info("File processing cancelled", logger.String("file", file.Name))
            return err
        }
        return err
    }
    return nil
}

// Timeout pattern for slow operations
func (s *ReportService) GenerateReport(data *ReportData) error {
    // Set reasonable timeout for report generation
    err := s.Emitter.EmitWithTimeout("report.generate", data, 5*time.Minute)
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            s.Logger.Error("Report generation timed out")
            return errors.New("report generation took too long")
        }
        return err
    }
    return nil
}

// HTTP request context integration
func (c *PostController) CreatePost(ctx *router.Context) error {
    var req CreatePostRequest
    if err := ctx.Bind(&req); err != nil {
        return ctx.JSON(400, map[string]string{"error": "Invalid request"})
    }
    
    post := &Post{Title: req.Title, Content: req.Content}
    
    // Use HTTP request context for event emission
    requestCtx := ctx.Request.Context()
    if err := c.Service.EmitWithContext(requestCtx, "post.creating", post); err != nil {
        return ctx.JSON(500, map[string]string{"error": "Failed to process post"})
    }
    
    return ctx.JSON(201, post)
}

// Advanced timeout with cleanup
func (s *EmailService) SendBulkEmail(emails []EmailData) error {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
    defer cancel()
    
    // Create a channel to track progress
    done := make(chan struct{})
    
    go func() {
        defer close(done)
        for _, email := range emails {
            if err := s.Emitter.EmitWithContext(ctx, "email.send", email); err != nil {
                s.Logger.Error("Failed to send email", logger.String("error", err.Error()))
                return
            }
        }
    }()
    
    select {
    case <-done:
        s.Logger.Info("Bulk email sending completed")
        return nil
    case <-ctx.Done():
        s.Logger.Error("Bulk email sending timed out")
        return ctx.Err()
    }
}

Module Integration Patterns

Cross-Module Communication Decoupled Module Design

// User module emits events
type UserService struct {
DB      *gorm.DB
Emitter *emitter.Emitter
Logger  logger.Logger
}
func (s *UserService) CreateUser(req CreateUserRequest) (*User, error) {
user := &User{
Email: req.Email,
Name:  req.Name,
}
if err := s.DB.Create(user).Error; err != nil {
return nil, err
}
// Emit events for other structures to handle
s.Emitter.Emit("user.created", user)
s.Emitter.EmitAsync("analytics.user_registered", user)
return user, nil
}
// Notification module listens for user events
type NotificationService struct {
Emitter *emitter.Emitter
Logger  logger.Logger
Email   email.Sender
}
func (s *NotificationService) Init() {
// Listen for user events without direct coupling
s.Emitter.On("user.created", func(data any) {
if user, ok := data.(*User); ok {
s.sendWelcomeEmail(user)
}
})
s.Emitter.On("user.password_reset", func(data any) {
if event, ok := data.(*PasswordResetEvent); ok {
s.sendPasswordResetEmail(event.User, event.Token)
}
})
}
// Analytics module tracks user behavior
type AnalyticsService struct {
Emitter *emitter.Emitter
Logger  logger.Logger
DB      *gorm.DB
}
func (s *AnalyticsService) Init() {
// Track various user events
s.Emitter.On("user.created", s.trackUserRegistration)
s.Emitter.On("user.login", s.trackUserLogin)
s.Emitter.On("post.created", s.trackContentCreation)
s.Emitter.On("order.completed", s.trackPurchase)
}
func (s *AnalyticsService) trackUserRegistration(data any) {
if user, ok := data.(*User); ok {
event := &AnalyticsEvent{
Type:      "user_registration",
UserID:    user.ID,
Timestamp: time.Now(),
Data:      map[string]any{"email": user.Email},
}
if err := s.DB.Create(event).Error; err != nil {
s.Logger.Error("Failed to track user registration",
logger.String("error", err.Error()))
}
}
}
// Post module with file handling
type PostService struct {
DB      *gorm.DB
Emitter *emitter.Emitter
Storage storage.Storage
Logger  logger.Logger
}
func (s *PostService) Init() {
// Listen for file upload events from storage
s.Emitter.On("post.featured_image.uploaded", func(data any) {
if post, ok := data.(*Post); ok {
s.Logger.Info("Featured image uploaded",
logger.Int("post_id", int(post.ID)),
logger.String("image", post.FeaturedImageURL))
}
})
// Listen for file deletion events
s.Emitter.On("post.featured_image.deleted", func(data any) {
if post, ok := data.(*Post); ok {
s.Logger.Info("Featured image deleted",
logger.Int("post_id", int(post.ID)))
}
})
}
func (s *PostService) CreatePost(req CreatePostRequest) (*Post, error) {
post := &Post{
Title:   req.Title,
Content: req.Content,
UserID:  req.UserID,
}
if err := s.DB.Create(post).Error; err != nil {
return nil, err
}
// Emit creation event for other structures
s.Emitter.Emit("post.created", post)
return post, nil
}

Module Registration with Emitter Dependency Injection Pattern

// Module structure with emitter injection
type PostModule struct {
Service    *PostService
Controller *PostController
Router     *router.RouterGroup
Emitter    *emitter.Emitter
Logger     logger.Logger
}
func NewPostModule(
db *gorm.DB,
router *router.RouterGroup,
log logger.Logger,
emitter *emitter.Emitter,
storage *storage.ActiveStorage,
) module.Module {
service := &PostService{
DB:      db,
Emitter: emitter,
Logger:  log,
Storage: storage,
}
controller := &PostController{
Service: service,
Logger:  log,
}
// Initialize event listeners
service.Init()
// Register routes
router.GET("/posts", controller.GetPosts)
router.POST("/posts", controller.CreatePost)
router.PUT("/posts/:id", controller.UpdatePost)
router.DELETE("/posts/:id", controller.DeletePost)
return &PostModule{
Service:    service,
Controller: controller,
Router:     router,
Emitter:    emitter,
Logger:     log,
}
}
// App initialization with emitter
func (app *App) initModules() *App {
// Create shared emitter instance
emitter := emitter.New()
// Register structures with emitter dependency injection
authModule := authentication.NewAuthenticationModule(
app.db,
app.router.Group("/auth"),
app.emailSender,
app.logger,
emitter,
)
postModule := NewPostModule(
app.db,
app.router.Group("/api/v1"),
app.logger,
emitter,
app.storage,
)
notificationModule := notification.NewNotificationModule(
app.router.Group("/notifications"),
app.logger,
emitter,
app.emailSender,
)
// Store structures
app.modules = []module.Module{
authModule,
postModule,
notificationModule,
}
return app
}
// Module initialization helper
type ModuleInitializer struct {
DB          *gorm.DB
Router      *router.RouterGroup
Logger      logger.Logger
Emitter     *emitter.Emitter
Storage     *storage.ActiveStorage
EmailSender email.Sender
}
func (m *ModuleInitializer) InitializeAll() {
// Initialize all structures with shared dependencies
modules := []module.Module{
authentication.NewAuthenticationModule(m.DB, m.Router.Group("/auth"), m.EmailSender, m.Logger, m.Emitter),
media.NewMediaModule(m.DB, m.Router.Group("/media"), m.Storage, m.Emitter, m.Logger),
translation.NewTranslationModule(m.DB, m.Router.Group("/translations"), m.Logger, m.Emitter, m.Storage),
}
for _, mod := range modules {
if initializer, ok := mod.(interface{ Init() }); ok {
initializer.Init()
}
}
}

Error Handling & Best Practices

Panic Recovery & Error Resilience

Robust Event Handling

go
// The emitter automatically recovers from panics in listeners
func (s *PostService) Init() {
    s.Emitter.On("post.created", func(data any) {
        // Even if this panics, other listeners will still execute
        panic("something went wrong")
    })
    
    s.Emitter.On("post.created", func(data any) {
        // This will still run despite the panic above
        if post, ok := data.(*Post); ok {
            s.Logger.Info("Post created", logger.Int("id", int(post.ID)))
        }
    })
}

// Best practice: Handle errors gracefully in listeners
func (s *EmailService) Init() {
    s.Emitter.On("user.created", func(data any) {
        user, ok := data.(*User)
        if !ok {
            s.Logger.Error("Invalid data type for user.created event")
            return
        }
        
        if err := s.sendWelcomeEmail(user); err != nil {
            s.Logger.Error("Failed to send welcome email",
                logger.String("error", err.Error()),
                logger.String("user_email", user.Email))
            // Don't panic - log and continue
            return
        }
        
        s.Logger.Info("Welcome email sent", logger.String("email", user.Email))
    })
}

// Defensive event emission with error checking
func (s *OrderService) ProcessOrder(order *Order) error {
    // Validate order before processing
    if order == nil {
        return errors.New("order cannot be nil")
    }
    
    // Process order logic
    if err := s.DB.Save(order).Error; err != nil {
        return fmt.Errorf("failed to save order: %w", err)
    }
    
    // Safe event emission with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    if err := s.Emitter.EmitWithContext(ctx, "order.processed", order); err != nil {
        // Log error but don't fail the entire operation
        s.Logger.Error("Failed to emit order.processed event",
            logger.String("error", err.Error()),
            logger.Int("order_id", int(order.ID)))
    }
    
    return nil
}

// Testing event listeners with mock data
func TestPostServiceEvents(t *testing.T) {
    emitter := emitter.New()
    service := &PostService{Emitter: emitter}
    
    var receivedPost *Post
    emitter.On("post.created", func(data any) {
        if post, ok := data.(*Post); ok {
            receivedPost = post
        }
    })
    
    testPost := &Post{ID: 1, Title: "Test Post"}
    emitter.Emit("post.created", testPost)
    
    assert.Equal(t, testPost, receivedPost)
}

// Event listener cleanup for tests
func (s *TestSuite) TearDown() {
    // Clear all listeners after each test
    s.emitter.Clear()
}

// Monitoring event listener performance
func (s *AnalyticsService) trackEventPerformance() {
    s.Emitter.On("performance.track", func(data any) {
        start := time.Now()
        defer func() {
            duration := time.Since(start)
            s.Logger.Debug("Event processing time",
                logger.String("duration", duration.String()))
        }()
        
        // Process analytics data
        s.processAnalyticsData(data)
    })
}

Best Practices

  • Use EmitAsync for non-critical operations like analytics
  • Always validate data types in event listeners
  • Handle errors gracefully - don't panic in listeners
  • Use meaningful event names following module.action pattern
  • Implement proper logging in event handlers
  • Use context cancellation for long-running operations
  • Clear listeners in tests to avoid interference

Anti-Patterns

  • Don't emit events for every database operation
  • Avoid deep event chains that create circular dependencies
  • Don't use events for synchronous data exchange
  • Never assume listeners will complete successfully
  • Don't emit sensitive data without sanitization
  • Avoid blocking operations in async event handlers
  • Don't ignore context cancellation in listeners

Performance & Thread Safety

Performance Features

  • Thread-safe with read-write mutexes
  • Goroutine-based parallel listener execution
  • Automatic panic recovery in listeners
  • Context cancellation and timeout support
  • Zero allocation for listener management

Thread Safety

Concurrent emissions: Safe
Listener registration: Safe
Event introspection: Safe
Memory consistency: Guaranteed
Goroutine safety: Full support

Performance Monitoring Example

Event Performance Tracking

go
// Performance monitoring service
type EventMonitor struct {
    emitter *emitter.Emitter
    logger  logger.Logger
    metrics map[string]*EventMetrics
    mutex   sync.RWMutex
}

type EventMetrics struct {
    TotalEmissions  int64
    TotalListeners  int64
    AverageLatency  time.Duration
    ErrorCount      int64
}

func (m *EventMonitor) Init() {
    m.metrics = make(map[string]*EventMetrics)
    
    // Monitor all events by wrapping emitter methods
    originalEmit := m.emitter.Emit
    m.emitter.Emit = func(event string, data any) {
        start := time.Now()
        originalEmit(event, data)
        m.recordMetrics(event, time.Since(start))
    }
}

func (m *EventMonitor) recordMetrics(event string, duration time.Duration) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    
    if _, exists := m.metrics[event]; !exists {
        m.metrics[event] = &EventMetrics{}
    }
    
    metrics := m.metrics[event]
    metrics.TotalEmissions++
    metrics.TotalListeners = int64(m.emitter.ListenerCount(event))
    
    // Calculate running average
    if metrics.TotalEmissions == 1 {
        metrics.AverageLatency = duration
    } else {
        metrics.AverageLatency = time.Duration(
            (int64(metrics.AverageLatency)*metrics.TotalEmissions + int64(duration)) /
                (metrics.TotalEmissions + 1),
        )
    }
}

// Get performance stats
func (m *EventMonitor) GetStats() map[string]*EventMetrics {
    m.mutex.RLock()
    defer m.mutex.RUnlock()
    
    stats := make(map[string]*EventMetrics)
    for event, metrics := range m.metrics {
        stats[event] = &EventMetrics{
            TotalEmissions:  metrics.TotalEmissions,
            TotalListeners:  metrics.TotalListeners,
            AverageLatency:  metrics.AverageLatency,
            ErrorCount:      metrics.ErrorCount,
        }
    }
    return stats
}

// Concurrent event emission test
func BenchmarkConcurrentEmit(b *testing.B) {
    emitter := emitter.New()
    
    // Register multiple listeners
    for i := 0; i < 10; i++ {
        emitter.On("benchmark.event", func(data any) {
            time.Sleep(time.Microsecond) // Simulate work
        })
    }
    
    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            emitter.Emit("benchmark.event", "test data")
        }
    })
}

Common Event Patterns

Event Naming Conventions

Entity Lifecycle Events

  • user.created
  • user.updated
  • user.deleted
  • post.published
  • order.completed

Action Events

  • user.login_attempt
  • email.sent
  • file.uploaded
  • payment.processed
  • cache.invalidated

File Events

  • post.image.uploaded
  • user.avatar.deleted
  • document.file.processed

System Events

  • system.startup
  • analytics.track
  • audit.log

Event Design Tips

  • Use dot notation for hierarchical event names (module.action.detail)
  • Keep event data immutable - pass copies, not references to mutable objects
  • Include sufficient context in event data for listeners to work independently
  • Use past tense for completed actions (created, updated) and present for ongoing (creating, updating)
  • Consider event versioning for breaking changes (user.created.v2)
  • Document your events and their data structures for other developers

Released under the MIT License.