diff --git a/eventbus/mysql/eventbus.go b/eventbus/mysql/eventbus.go index 6fe3a5f32864a6a6227bf4c9bd8f24345fb09fbf..5d6cd9df495a90ba9bcf278e92c91c786e379d4e 100644 --- a/eventbus/mysql/eventbus.go +++ b/eventbus/mysql/eventbus.go @@ -37,6 +37,7 @@ import ( const retryInterval = time.Second * 3 const retryLimit = 5 const runInterval = time.Millisecond * 100 +const defaultQueueLimit = 1000 // 每天凌晨两点执行clean const cleanCron = "0 2 * * *" @@ -125,6 +126,7 @@ type Options struct { RetryLimit int // 重试次数 RetryInterval time.Duration // 重试间隔 CustomRetry []time.Duration // 自定义重试间隔 + QueueLimit int // 重试/失败队列长度 DefaultOffset *int64 // 默认起始 offset RunInterval time.Duration // 默认轮询间隔 @@ -166,6 +168,7 @@ func NewEventBus(serviceName string, db *gorm.DB, options ...Option) *EventBus { } opt := Options{ + QueueLimit: defaultQueueLimit, RunInterval: runInterval, CleanCron: cleanCron, RetentionTime: retentionTime, @@ -483,11 +486,11 @@ func (e *EventBus) handleEvents() error { failedIDs, panicIDs := e.dispatchEvents(ctx, events) retry, failed := e.doRetryStrategy(service, remainIDs, failedIDs) - service.Retry = retry - service.Failed = append(service.Failed, failed...) + failed = append(service.Failed, failed...) for _, id := range panicIDs { - service.Failed = append(service.Failed, &RetryInfo{ID: id}) + failed = append(failed, &RetryInfo{ID: id}) } + service.Retry, service.Failed = e.evictEvents(retry, failed) if len(scanEvents) > 0 { last := scanEvents[len(scanEvents)-1] @@ -497,6 +500,31 @@ func (e *EventBus) handleEvents() error { }) } +func (e *EventBus) evictEvents(retry, failed []*RetryInfo) (evictedRetry, evictedFailed []*RetryInfo) { + // 重试队列超出限制时,移动到失败队列并记录日志, FIFO + if len(retry) > e.opt.QueueLimit { + evictCount := len(retry) - e.opt.QueueLimit + evictEvents := retry[:evictCount] + failed = append(failed, evictEvents...) + evictedRetry = retry[evictCount:] + e.logger.V(logger.LevelWarn).Info("evict retry events", "count", evictCount, "events", evictEvents) + } else { + evictedRetry = retry + } + + // 失败队列超出限制时,丢弃并记录日志, FIFO + if len(failed) > e.opt.QueueLimit { + evictCount := len(failed) - e.opt.QueueLimit + evictEvents := failed[:evictCount] + evictedFailed = failed[evictCount:] + e.logger.V(logger.LevelWarn).Info("evict failed events", "count", evictCount, "events", evictEvents) + } else { + evictedFailed = failed + } + + return +} + func (e *EventBus) checkTX(ctx context.Context, tx *Transaction) { ctx = e.ctxWithTX(ctx, tx) evt := tx.Events[0] diff --git a/eventbus/mysql/eventbus_test.go b/eventbus/mysql/eventbus_test.go index d79aa1eeb49d01c75656ee125a15e7d68c3b3546..78cdbdcffc849ef37a1fd4de2caf9d94696b92c4 100644 --- a/eventbus/mysql/eventbus_test.go +++ b/eventbus/mysql/eventbus_test.go @@ -341,6 +341,44 @@ func TestEventBusFailed(t *testing.T) { assert.Len(t, service.Failed, 10) } +func TestEventBusQueueLimit(t *testing.T) { + ctx := context.Background() + db := testsuit.InitMysql() + + queueLimit := 100 + eventCount := queueLimit + 100 + eventBus := NewEventBus("test_queue_limit", db, func(opt *Options) { + opt.RetryStrategy = &LimitRetry{ + Limit: -1, + } + opt.LimitPerRun = eventCount * 2 + opt.ConsumeConcurrent = 10 + opt.QueueLimit = queueLimit + }) + eventBus.RegisterEventHandler(func(ctx context.Context, evt *dddfirework.DomainEvent) error { + if evt.Type == "test_queue_limit" { + return fmt.Errorf("failed") + } + return nil + }) + + for i := 0; i < eventCount; i++ { + err := eventBus.Dispatch(ctx, dddfirework.NewDomainEvent(&testEvent{EType: "test_queue_limit", Data: "failed"})) + assert.NoError(t, err) + } + + err := eventBus.handleEvents() + assert.NoError(t, err) + + service := &ServicePO{} + err = db.Transaction(func(tx *gorm.DB) error { + return tx.Where("name = ?", "test_queue_limit").First(service).Error + }) + assert.NoError(t, err) + assert.Len(t, service.Failed, queueLimit) + +} + func TestEngine(t *testing.T) { db := testsuit.InitMysql() diff --git a/eventbus/mysql/po.go b/eventbus/mysql/po.go index 269b20193920d9e3bed9176ff9eeee8d0a925673..877d67d574edac479b9137e4ac41b324f986ab5d 100644 --- a/eventbus/mysql/po.go +++ b/eventbus/mysql/po.go @@ -109,11 +109,11 @@ CREATE TABLE `ddd_eventbus_service` ( */ type ServicePO struct { Name string `gorm:"primaryKey"` - Retry []*RetryInfo `gorm:"serializer:json"` // 重试信息 - Failed []*RetryInfo `gorm:"serializer:json"` // 失败信息 - Offset int64 `gorm:"column:offset"` // 消费位置,等于最后一次消费的事件id - CreatedAt time.Time `gorm:"index"` // 记录创建时间 - UpdatedAt time.Time `gorm:"index"` // 记录的更新时间 + Retry []*RetryInfo `gorm:"serializer:json;type:text"` // 重试信息 + Failed []*RetryInfo `gorm:"serializer:json;type:text"` // 失败信息 + Offset int64 `gorm:"column:offset"` // 消费位置,等于最后一次消费的事件id + CreatedAt time.Time `gorm:"index"` // 记录创建时间 + UpdatedAt time.Time `gorm:"index"` // 记录的更新时间 } func (o *ServicePO) GetID() string {