Skip to content
Snippets Groups Projects
Unverified Commit 1d6e3495 authored by kirinzhong's avatar kirinzhong Committed by GitHub
Browse files

fix: 完善事件处理中 panic 的逻辑 (#18)

parent 4f3f30af
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
"unicode/utf8"
......@@ -408,7 +409,7 @@ func (e *EventBus) doRetryStrategy(service *ServicePO, failedIDs []int64) (retry
return
}
func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (success, failed []int64) {
func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (success, failed, panics []int64) {
events := make(chan *EventPO, len(eventPOs))
for _, e := range eventPOs {
events <- e
......@@ -420,18 +421,23 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (suc
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
}
}()
for po := range events {
cb := func(ctx context.Context, po *EventPO) {
defer func() {
if r := recover(); r != nil {
e.logger.Error(fmt.Errorf("err: %v stack:%s", r, string(debug.Stack())), fmt.Sprintf("panic while handling event(%s)", po.EventID))
panics = append(panics, po.ID)
}
}()
if err := e.cb(ctx, po.Event); err != nil {
failed = append(failed, po.ID)
} else {
success = append(success, po.ID)
}
}
for po := range events {
cb(ctx, po)
}
}()
}
wg.Wait()
......@@ -462,10 +468,14 @@ func (e *EventBus) handleEvents() error {
return nil
}
_, failedIDs := e.dispatchEvents(ctx, events)
_, failedIDs, panicIDs := e.dispatchEvents(ctx, events)
retry, failed := e.doRetryStrategy(service, failedIDs)
service.Retry = retry
service.Failed = append(service.Failed, failed...)
for _, id := range panicIDs {
service.Failed = append(service.Failed, &RetryInfo{ID: id})
}
if len(scanEvents) > 0 {
last := scanEvents[len(scanEvents)-1]
service.Offset = last.ID
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment