Skip to content
Snippets Groups Projects
engine.go 30.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • zhongqiling's avatar
    zhongqiling committed
    	if err != nil {
    		return err
    	}
    
    	if err := e.exec(ctx, actions); err != nil {
    		return err
    	}
    	// 执行保存后的 hook
    	for _, c := range changed {
    		for _, entity := range c.children {
    			if err := execHook(ctx, entity, c.changeType, false); err != nil {
    				return err
    			}
    		}
    	}
    
    	// 回收被删除的子实体,被删除的子实体仍可能有事件需要发送
    	e.recycle(changed)
    	return nil
    }
    
    func execHook(ctx context.Context, entity IEntity, ct changeType, isBefore bool) error {
    	if ct == newChildren && isBefore {
    		if saver, ok := entity.(IBeforeCreate); ok {
    			if err := saver.BeforeCreate(ctx); err != nil {
    				return err
    			}
    		}
    	} else if ct == newChildren && !isBefore {
    		if saver, ok := entity.(IAfterCreate); ok {
    			if err := saver.AfterCreate(ctx); err != nil {
    				return err
    			}
    		}
    	} else if ct == dirtyChildren && isBefore {
    		if saver, ok := entity.(IBeforeUpdate); ok {
    			if err := saver.BeforeUpdate(ctx); err != nil {
    				return err
    			}
    		}
    	} else if ct == dirtyChildren && !isBefore {
    		if saver, ok := entity.(IAfterUpdate); ok {
    			if err := saver.AfterUpdate(ctx); err != nil {
    				return err
    			}
    		}
    	} else if ct == deleteChildren && isBefore {
    		if saver, ok := entity.(IBeforeDelete); ok {
    			if err := saver.BeforeDelete(ctx); err != nil {
    				return err
    			}
    		}
    	} else if ct == deleteChildren && !isBefore {
    		if saver, ok := entity.(IAfterDelete); ok {
    			if err := saver.AfterDelete(ctx); err != nil {
    				return err
    			}
    		}
    	}
    	return nil
    }
    
    func (e *Stage) exec(ctx context.Context, actions []*Action) error {
    	for _, a := range actions {
    		if err := e.executor.Exec(ctx, a); err != nil {
    			return err
    		}
    	}
    	e.result.Actions = append(e.result.Actions, actions...)
    	return nil
    }
    
    func (e *Stage) setOutput(data interface{}) {
    	e.result.Output = data
    }
    
    func (e *Stage) do(ctx context.Context) *Result {
    	// 创建聚合
    	var err error
    
    	if e.main != nil {
    		if err := e.main(ctx, Repository{stage: e}); err != nil {
    
    zhongqiling's avatar
    zhongqiling committed
    			return ResultErrOrBreak(err)
    		}
    	}
    
    	err = e.persist(ctx)
    	if err != nil {
    		return ResultErrors(err)
    	}
    
    	events := e.collectEvents()
    	if len(events) > 0 && e.options.EventPersist != nil {
    		action, err := e.makeEventPersistAction(events)
    		if err != nil {
    			return ResultErrors(err)
    		}
    		if err := e.exec(ctx, []*Action{action}); err != nil {
    			return ResultError(err)
    		}
    	}
    
    	// 发送领域事件
    	if len(events) > 0 {
    		if e.eventBus == nil {
    			return ResultErrors(ErrNoEventBusFound)
    		}
    		if err := e.dispatchEvents(ctx, events); err != nil {
    			return ResultErrors(err)
    		}
    	}
    
    	return e.result
    }
    
    type doSave func(ctx context.Context) *Result
    
    func (e *Stage) runOnLock(f doSave, lockKeys ...string) doSave {
    	return func(ctx context.Context) *Result {
    		sort.Strings(lockKeys)
    		for i := 1; i < len(lockKeys); i++ {
    			if lockKeys[i] == lockKeys[i-1] {
    				return ResultErrors(fmt.Errorf("lockKey(%s) repeated", lockKeys[i]))
    			}
    		}
    
    		var lockErr error
    		ls := make([]interface{}, 0)
    		for _, id := range lockKeys {
    			l, err := e.locker.Lock(ctx, fmt.Sprintf("ddd_engine_%s", id))
    			if err != nil {
    				lockErr = fmt.Errorf("acquiring redis locker failed: %v", err)
    				break
    			}
    			ls = append(ls, l)
    		}
    		defer func() {
    			for _, l := range ls {
    				if err := e.locker.UnLock(ctx, l); err != nil {
    					e.logger.Error(err, "unlock failed")
    				}
    			}
    		}()
    
    		if lockErr != nil {
    			return ResultErrors(lockErr)
    		}
    		return f(ctx)
    	}
    }
    
    func (e *Stage) runWithTransaction(f doSave) doSave {
    	return func(ctx context.Context) *Result {
    		ctx, err := e.executor.Begin(ctx)
    		if err != nil {
    			return ResultErrors(err)
    		}
    		defer func() {
    			if r := recover(); r != nil {
    				if err := e.executor.RollBack(ctx); err != nil {
    					e.logger.Error(err, "rollback failed")
    				}
    				panic(r)
    			}
    		}()
    		result := f(ctx)
    		if result.Error != nil || result.Break {
    			if err := e.executor.RollBack(ctx); err != nil {
    				e.logger.Error(err, "rollback failed", "err", err)
    			}
    			// 回滚消息事务
    			if e.eventBus != nil && e.eventCtx != nil {
    				if txEventBus, ok := e.eventBus.(ITransactionEventBus); ok {
    					_ = txEventBus.Rollback(e.eventCtx)
    				}
    			}
    			return result
    		}
    		if err := e.executor.Commit(ctx); err != nil {
    			result.Error = err
    			e.logger.Error(err, "commit failed", "err", err)
    		} else {
    			// commit 成功后才提交消息事务
    			// 注意 commit 失败不会执行 eventBus.Commit,需要依靠 eventbus 的回查机制确认最终结果
    			if e.eventBus != nil && e.eventCtx != nil {
    				if txEventBus, ok := e.eventBus.(ITransactionEventBus); ok {
    					_ = txEventBus.Commit(e.eventCtx)
    				}
    			}
    		}
    
    		return result
    	}
    }
    
    func (e *Stage) Save(ctx context.Context) *Result {
    	do := e.do
    	if e.options.WithTransaction {
    		do = e.runWithTransaction(do)
    	}
    
    	if len(e.lockKeys) > 0 {
    		do = e.runOnLock(do, e.lockKeys...)
    	}
    
    	res := do(ctx)
    	if res.Error == nil && len(e.options.PostSaveHooks) > 0 {
    		for _, h := range e.options.PostSaveHooks {
    			h(ctx, res)
    		}
    	}
    	return res
    }