From 1d6e34958bb9c3d84ebd86a1a2df4486e5399ae2 Mon Sep 17 00:00:00 2001 From: kirinzhong <144225553+kirinzhong@users.noreply.github.com> Date: Wed, 6 Dec 2023 20:31:45 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=AE=8C=E5=96=84=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E5=A4=84=E7=90=86=E4=B8=AD=20panic=20=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91=20(#18)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- eventbus/mysql/eventbus.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/eventbus/mysql/eventbus.go b/eventbus/mysql/eventbus.go index 3920ba2..ba758ed 100644 --- a/eventbus/mysql/eventbus.go +++ b/eventbus/mysql/eventbus.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "runtime/debug" "sync" "time" "unicode/utf8" @@ -408,7 +409,7 @@ func (e *EventBus) doRetryStrategy(service *ServicePO, failedIDs []int64) (retry return } -func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (success, failed []int64) { +func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (success, failed, panics []int64) { events := make(chan *EventPO, len(eventPOs)) for _, e := range eventPOs { events <- e @@ -420,18 +421,23 @@ func (e *EventBus) dispatchEvents(ctx context.Context, eventPOs []*EventPO) (suc wg.Add(1) go func() { defer wg.Done() - defer func() { - if r := recover(); r != nil { - } - }() - for po := range events { + cb := func(ctx context.Context, po *EventPO) { + defer func() { + if r := recover(); r != nil { + e.logger.Error(fmt.Errorf("err: %v stack:%s", r, string(debug.Stack())), fmt.Sprintf("panic while handling event(%s)", po.EventID)) + panics = append(panics, po.ID) + } + }() if err := e.cb(ctx, po.Event); err != nil { failed = append(failed, po.ID) } else { success = append(success, po.ID) } } + for po := range events { + cb(ctx, po) + } }() } wg.Wait() @@ -462,10 +468,14 @@ func (e *EventBus) handleEvents() error { return nil } - _, failedIDs := e.dispatchEvents(ctx, events) + _, failedIDs, panicIDs := e.dispatchEvents(ctx, events) retry, failed := e.doRetryStrategy(service, failedIDs) service.Retry = retry service.Failed = append(service.Failed, failed...) + for _, id := range panicIDs { + service.Failed = append(service.Failed, &RetryInfo{ID: id}) + } + if len(scanEvents) > 0 { last := scanEvents[len(scanEvents)-1] service.Offset = last.ID -- GitLab