Skip to content
Snippets Groups Projects
eventbus_test.go 15.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • zhongqiling's avatar
    zhongqiling committed
    //
    // Copyright 2023 Bytedance Ltd. and/or its affiliates
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    //     http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    
    package mysql
    
    import (
    	"context"
    	"fmt"
    	"math/rand"
    	"sync"
    	"testing"
    	"time"
    
    	"github.com/stretchr/testify/assert"
    	"gorm.io/driver/sqlite"
    	"gorm.io/gorm"
    
    	"github.com/bytedance/dddfirework"
    	exec_mysql "github.com/bytedance/dddfirework/executor/mysql"
    	"github.com/bytedance/dddfirework/testsuit"
    )
    
    type testEntity struct {
    	dddfirework.BaseEntity
    
    	Name string
    }
    
    type testPO struct {
    	ID   string
    	Name string
    }
    
    func (o *testPO) GetID() string {
    	return o.ID
    }
    
    func (o *testPO) TableName() string {
    	return "test"
    }
    
    type testEvent struct {
    	EType string
    	Data  string
    }
    
    func (t *testEvent) GetType() dddfirework.EventType {
    	return dddfirework.EventType(t.EType)
    }
    
    func (t *testEvent) GetSender() string {
    	return t.Data
    }
    
    func initModel(db *gorm.DB) {
    	if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}, &Transaction{}); err != nil {
    		panic(err)
    	}
    
    	exec_mysql.RegisterEntity2Model(&testEntity{}, func(entity, parent dddfirework.IEntity, op dddfirework.OpType) (exec_mysql.IModel, error) {
    		e := entity.(*testEntity)
    		return &testPO{
    			ID:   e.GetID(),
    			Name: e.Name,
    		}, nil
    	}, func(po exec_mysql.IModel, do dddfirework.IEntity) error {
    		p, d := po.(*testPO), do.(*testEntity)
    		d.SetID(p.ID)
    		d.Name = p.Name
    		return nil
    	})
    }
    
    func init() {
    	db := testsuit.InitMysql()
    	initModel(db)
    	db.Where("1 = 1").Delete(&EventPO{})
    	db.Where("1 = 1").Delete(&ServicePO{})
    
    	events := make([]*EventPO, 0)
    	createTime := time.Now()
    	e, _ := eventPersist(&dddfirework.DomainEvent{
    		ID:        "0",
    		Type:      "one",
    		Payload:   []byte("{}"),
    		CreatedAt: createTime,
    	})
    	events = append(events, e)
    
    	for i := 1; i < 10; i++ {
    		e, _ := eventPersist(&dddfirework.DomainEvent{
    			ID:        fmt.Sprintf("%d", i),
    			Type:      "test",
    			Payload:   []byte("{}"),
    			CreatedAt: createTime,
    		})
    		createTime = createTime.Add(time.Millisecond)
    		events = append(events, e)
    	}
    	for i := 10; i < 20; i++ {
    		e, _ := eventPersist(&dddfirework.DomainEvent{
    			ID:        fmt.Sprintf("%d", i),
    			Type:      "create_same_time",
    			Payload:   []byte("{}"),
    			CreatedAt: createTime,
    		})
    		events = append(events, e)
    	}
    
    	if err := db.Create(events).Error; err != nil {
    		panic(err)
    	}
    }
    
    func TestEventBusConcurrent(t *testing.T) {
    	db := testsuit.InitMysql()
    
    	mu := sync.Mutex{}
    	ids := make(map[string]bool)
    	events := make([]*dddfirework.DomainEvent, 0)
    
    	eventBus := NewEventBus("test_concurrent", db, func(opt *Options) {
    		opt.LimitPerRun = 5
    		opt.ConsumeConcurrent = 1
    		offset := int64(0)
    		opt.DefaultOffset = &offset
    	})
    	eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error {
    		mu.Lock()
    		defer mu.Unlock()
    		ids[evt.ID] = true
    		events = append(events, evt)
    		return nil
    	})
    
    	wg := sync.WaitGroup{}
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			for j := 0; j < 10; j++ {
    				_ = eventBus.handleEvents()
    			}
    		}()
    	}
    	wg.Wait()
    
    	var eventCount int64
    	db.Model(&EventPO{}).Count(&eventCount)
    	// 保证所有事件都能消费到
    	assert.Equal(t, eventCount, int64(len(ids)))
    	curr := time.Time{}
    	// 保证消费顺序一定是递增的
    	for _, e := range events {
    		assert.GreaterOrEqual(t, e.CreatedAt, curr)
    		curr = e.CreatedAt
    	}
    }
    
    func TestEventBusRetry(t *testing.T) {
    	db := testsuit.InitMysql()
    
    	mu := sync.Mutex{}
    	counts := map[string]int{}
    	eventBus := NewEventBus("test_retry", db, func(opt *Options) {
    		opt.LimitPerRun = 10
    		opt.RetryStrategy = &LimitRetry{
    			Limit: 5,
    		}
    		offset := int64(0)
    		opt.DefaultOffset = &offset
    	})
    	eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error {
    		mu.Lock()
    		counts[evt.ID] += 1
    		mu.Unlock()
    
    		if evt.ID == "0" {
    			return fmt.Errorf("retry")
    		}
    		return nil
    	})
    
    	for i := 0; i < 30; i++ {
    		if err := eventBus.handleEvents(); err != nil {
    			assert.NoError(t, err)
    		}
    		time.Sleep(time.Millisecond * 10)
    	}
    
    	var eventCount int64
    	db.Model(&EventPO{}).Count(&eventCount)
    	assert.Equal(t, eventCount, int64(len(counts)))
    	for id, count := range counts {
    		if id == "0" {
    			assert.Equal(t, 7, count)
    		} else {
    			assert.Equal(t, 1, count)
    		}
    	}
    }
    
    func TestEventBusFailed(t *testing.T) {
    	ctx := context.Background()
    	db := testsuit.InitMysql()
    
    	eventBus := NewEventBus("test_fail", db, func(opt *Options) {
    		opt.RetryStrategy = &LimitRetry{
    			Limit: 2,
    		}
    	})
    	eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error {
    		if evt.Type == "test_failed" {
    			return fmt.Errorf("failed")
    		}
    		return nil
    	})
    
    	for i := 0; i < 10; i++ {
    		err := eventBus.Dispatch(ctx, dddfirework.NewDomainEvent(&testEvent{EType: "test_failed", Data: "failed"}))
    		assert.NoError(t, err)
    	}
    
    	for i := 0; i < 20; i++ {
    		err := eventBus.handleEvents()
    		assert.NoError(t, err)
    	}
    
    	service := &ServicePO{}
    	err := db.Where("name = ?", "test_fail").First(service).Error
    	assert.NoError(t, err)
    	assert.Len(t, service.Failed, 10)
    }
    
    func TestEngine(t *testing.T) {
    	db := testsuit.InitMysql()
    
    	ctx := context.Background()
    	data := ""
    	wg := sync.WaitGroup{}
    	wg.Add(2)
    	dddfirework.RegisterEventHandler("test_engine", func(ctx context.Context, evt *testEvent) error {
    		data = evt.Data
    		wg.Done()
    		return nil
    	})
    	dddfirework.RegisterEventHandler("test_engine_tx", func(ctx context.Context, evt *testEvent) error {
    		data = evt.Data
    		wg.Done()
    		return nil
    	})
    
    	eventBus := NewEventBus("test_engine", db)
    	eventBus.Start(ctx)
    
    	engine := dddfirework.NewEngine(nil, exec_mysql.NewExecutor(db), eventBus.Options()...)
    
    	res := engine.NewStage().Main(func(ctx context.Context, repo *dddfirework.Repository) error {
    
    zhongqiling's avatar
    zhongqiling committed
    		e := &testEntity{Name: "hello"}
    		e.AddEvent(&testEvent{EType: "test_engine", Data: e.Name})
    		e.AddEvent(&testEvent{EType: "test_engine_tx", Data: e.Name}, dddfirework.WithSendType(dddfirework.SendTypeTransaction))
    
    		repo.Add(e)
    		return nil
    
    zhongqiling's avatar
    zhongqiling committed
    	}).Save(ctx)
    
    	wg.Wait()
    	assert.NoError(t, res.Error)
    	assert.Equal(t, "hello", data)
    }
    
    type mockExecutor struct {
    }
    
    func (f *mockExecutor) Begin(ctx context.Context) (context.Context, error) { return ctx, nil }
    func (f *mockExecutor) Commit(ctx context.Context) error                   { return fmt.Errorf("failed") }
    func (f *mockExecutor) RollBack(ctx context.Context) error                 { return fmt.Errorf("failed") }
    
    func (f *mockExecutor) Entity2Model(entity, parent dddfirework.IEntity, op dddfirework.OpType) (dddfirework.IModel, error) {
    	return entity, nil
    }
    
    func (f *mockExecutor) Model2Entity(model dddfirework.IModel, entity dddfirework.IEntity) error {
    	return nil
    }
    
    func (f *mockExecutor) Exec(ctx context.Context, action *dddfirework.Action) error { return nil }
    
    func TestTXChecker(t *testing.T) {
    	db := testsuit.InitMysql()
    
    	ctx := context.Background()
    	data := ""
    
    	dddfirework.RegisterEventTXChecker("test_commit_failed", func(evt *testEvent) dddfirework.TXStatus {
    		data = evt.Data
    		return dddfirework.TXCommit
    	})
    
    	eventBus := NewEventBus("test_engine", db, func(opt *Options) {
    		opt.TXCheckTimeout = time.Millisecond * 200
    	})
    	eventBus.Start(ctx)
    
    	engine := dddfirework.NewEngine(nil, &mockExecutor{}, eventBus.Options()...)
    
    	res := engine.NewStage().Main(func(ctx context.Context, repo *dddfirework.Repository) error {
    
    zhongqiling's avatar
    zhongqiling committed
    		e := &testEntity{Name: "test_commit_failed"}
    		e.AddEvent(&testEvent{EType: "test_commit_failed", Data: e.Name}, dddfirework.WithSendType(dddfirework.SendTypeTransaction))
    
    		repo.Add(e)
    		return nil
    
    zhongqiling's avatar
    zhongqiling committed
    	}).Save(ctx)
    
    	time.Sleep(time.Second * 1)
    
    	assert.Error(t, res.Error)
    	assert.Equal(t, "test_commit_failed", data)
    }
    
    func TestOuter(t *testing.T) {
    	ctx := context.Background()
    	db := testsuit.InitMysql()
    	eventBus := NewEventBus("test_engine", db)
    	eventBus.Start(ctx)
    
    	var data string
    
    	dddfirework.RegisterEventBus(eventBus)
    	dddfirework.RegisterEventHandler("test_outer", func(ctx context.Context, evt *testEvent) error {
    		data = evt.Data
    		return nil
    	})
    
    	err := eventBus.Dispatch(ctx, dddfirework.NewDomainEvent(&testEvent{EType: "test_outer", Data: "gujuji"}))
    	assert.NoError(t, err)
    
    	err = eventBus.handleEvents()
    	assert.NoError(t, err)
    	assert.Equal(t, "gujuji", data)
    }
    
    func TestCommit(t *testing.T) {
    	ctx := context.Background()
    	db := testsuit.InitMysql()
    
    	eventBus := NewEventBus("test_transaction", db)
    	evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_transaction", Data: "ttt"})
    	ctx, err := eventBus.DispatchBegin(ctx, evt)
    	assert.NoError(t, err)
    
    	count := int64(0)
    	err = db.Model(&EventPO{}).Where("id = ?", evt.ID).Count(&count).Error
    	assert.NoError(t, err)
    	assert.Equal(t, 0, int(count))
    
    	err = eventBus.Commit(ctx)
    	assert.NoError(t, err)
    
    	err = db.Model(&EventPO{}).Where("event_id = ?", evt.ID).Count(&count).Error
    	assert.NoError(t, err)
    	assert.Equal(t, 1, int(count))
    }
    
    func TestCommitTimeout(t *testing.T) {
    	ctx := context.Background()
    	db := testsuit.InitMysql()
    
    	eventBus := NewEventBus("test_transaction", db, func(opt *Options) {
    		opt.TXCheckTimeout = time.Millisecond * 200
    	})
    	eventBus.RegisterEventTXChecker(func(evt *dddfirework.DomainEvent) dddfirework.TXStatus {
    		return dddfirework.TXCommit
    	})
    	eventBus.Start(ctx)
    
    	evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_transaction", Data: "ttt"})
    	_, err := eventBus.DispatchBegin(ctx, evt)
    	assert.NoError(t, err)
    
    	time.Sleep(time.Second)
    
    	count := int64(0)
    	err = db.Model(&EventPO{}).Where("event_id = ?", evt.ID).Count(&count).Error
    	assert.NoError(t, err)
    	assert.Equal(t, 1, int(count))
    }
    
    func TestRollback(t *testing.T) {
    	ctx := context.Background()
    	db := testsuit.InitMysql()
    
    	eventBus := NewEventBus("test_transaction", db, func(opt *Options) {
    		offset := int64(0)
    		opt.DefaultOffset = &offset
    	})
    	evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_transaction", Data: "ttt"})
    	ctx, err := eventBus.DispatchBegin(ctx, evt)
    	assert.NoError(t, err)
    
    	err = eventBus.Rollback(ctx)
    	assert.NoError(t, err)
    
    	count := int64(0)
    	err = db.Model(&EventPO{}).Where("id = ?", evt.ID).Count(&count).Error
    	assert.NoError(t, err)
    	assert.Equal(t, 0, int(count))
    }
    
    func TestClean(t *testing.T) {
    	ctx := context.Background()
    	// 使用sqlite防止数据被其它单测影响
    	db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
    	assert.NoError(t, err)
    	err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{})
    	assert.NoError(t, err)
    
    	eventBus := NewEventBus("test_clean", db, func(opt *Options) {
    		// 消费完成的事件保留时间为0
    		opt.RetentionTime = 0 * time.Hour
    	})
    	// 插入一些事件
    	num := 10
    	for num > 0 {
    		evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_clean", Data: "ttt"})
    		err = eventBus.Dispatch(ctx, evt)
    		// 必须停1ms,因为mysql datetime 只精确到1ms,1ms以内的event无法立即清理
    		time.Sleep(1 * time.Millisecond)
    		assert.NoError(t, err)
    		num--
    	}
    
    	eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error {
    		t.Logf("handle event ID %s,CreatedAt %s", evt.ID, evt.CreatedAt)
    		return nil
    	})
    	// 处理事件
    	err = eventBus.handleEvents()
    	assert.NoError(t, err)
    	// 清理事件
    	err = eventBus.cleanEvents()
    	assert.NoError(t, err)
    
    	// 校验 lowestServicePO.offset 之前的event 其它的都删掉了
    	var slowestServicePO ServicePO
    	err = db.Order("offset asc").Take(&slowestServicePO).Error
    	assert.NoError(t, err)
    
    	count := int64(0)
    	err = db.Model(&EventPO{}).Where("id < ?", slowestServicePO.Offset).Count(&count).Error
    	assert.NoError(t, err)
    	assert.Equal(t, 0, int(count))
    }
    
    // 测试failed 或 retry 事件未被删除
    func TestCleanFailed(t *testing.T) {
    	ctx := context.Background()
    	// 使用sqlite防止数据被其它单测影响
    	db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
    	assert.NoError(t, err)
    	err = db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{})
    	assert.NoError(t, err)
    
    	eventBus := NewEventBus("test_clean", db, func(opt *Options) {
    		// 消费完成的事件保留时间为0
    		opt.RetentionTime = 0 * time.Hour
    	})
    	// 插入一些事件
    	num := 10
    	for num > 0 {
    		evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_clean", Data: "ttt"})
    		err = eventBus.Dispatch(ctx, evt)
    		// 必须停1ms,因为mysql datetime 只精确到1ms,1ms以内的event无法立即清理
    		time.Sleep(1 * time.Millisecond)
    		assert.NoError(t, err)
    		num--
    	}
    
    	eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error {
    		t.Logf("handle event ID %s,CreatedAt %s", evt.ID, evt.CreatedAt)
    		if rand.Intn(2) > 0 {
    			return fmt.Errorf("test clean error")
    		}
    		return nil
    	})
    	// 处理事件
    	err = eventBus.handleEvents()
    	assert.NoError(t, err)
    	// 清理事件
    	err = eventBus.cleanEvents()
    	assert.NoError(t, err)
    
    	// 校验除了 failed 和 retry中的event 其它的都删掉了
    	var servicePO ServicePO
    	err = db.Where("name = ?", "test_clean").Take(&servicePO).Error
    	assert.NoError(t, err)
    
    	eventIDs := make([]int64, 0)
    	for _, si := range servicePO.Retry {
    		eventIDs = append(eventIDs, si.ID)
    	}
    	for _, si := range servicePO.Failed {
    		eventIDs = append(eventIDs, si.ID)
    	}
    
    	count := int64(0)
    	err = db.Model(&EventPO{}).Where("id in ?", eventIDs).Count(&count).Error
    	assert.NoError(t, err)
    	assert.Equal(t, len(eventIDs), int(count))
    }
    
    func TestCleanConcurrent(t *testing.T) {
    	ctx := context.Background()
    	// sqlite在并发场景下部分特性与mysql不同
    	// 使用独立database防止数据被其它单测影响
    	db := testsuit.InitMysql()
    	db = testsuit.InitMysqlWithDatabase(db, "test_clean")
    	if err := db.AutoMigrate(&EventPO{}, &ServicePO{}, &testPO{}); err != nil {
    		panic(err)
    	}
    	eventBus := NewEventBus("test_clean", db, func(opt *Options) {
    		// 消费完成的事件保留时间为0
    		opt.RetentionTime = 0 * time.Hour
    	})
    	// 插入一些事件
    	num := 30
    	for num > 0 {
    		evt := dddfirework.NewDomainEvent(&testEvent{EType: "test_clean", Data: "ttt"})
    		err := eventBus.Dispatch(ctx, evt)
    		// 必须停1ms,因为mysql datetime 只精确到1ms,1ms以内的event无法立即清理
    		time.Sleep(1 * time.Millisecond)
    		assert.NoError(t, err)
    		num--
    	}
    
    	eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error {
    		t.Logf("handle event ID %s,CreatedAt %s", evt.ID, evt.CreatedAt)
    		return nil
    	})
    	// 处理事件
    	err := eventBus.handleEvents()
    	assert.NoError(t, err)
    	// 并发清理事件
    	wg := sync.WaitGroup{}
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			err := eventBus.cleanEvents()
    			assert.NoError(t, err)
    		}()
    	}
    	wg.Wait()
    
    	// 校验 lowestServicePO.event_created_at 之前的event 其它的都删掉了
    	var slowestServicePO ServicePO
    	err = db.Order("offset asc").Take(&slowestServicePO).Error
    	assert.NoError(t, err)
    
    	count := int64(0)
    	err = db.Model(&EventPO{}).Where("id < ?", slowestServicePO.Offset).Count(&count).Error
    	assert.NoError(t, err)
    	assert.Equal(t, 0, int(count))
    }