Skip to content
Snippets Groups Projects
engine.go 32.1 KiB
Newer Older
zhongqiling's avatar
zhongqiling committed
	pos := make([]IModel, len(events))
	for i, evt := range events {
		po, err := e.options.EventPersist(evt)
		if err != nil {
			return nil, err
		}
		pos[i] = po
	}
	return &Action{
		Op:         OpInsert,
		Models:     pos,
		PrevModels: []IModel{},
	}, nil
}

func (e *Stage) dispatchEvents(ctx context.Context, events []*DomainEvent) (err error) {
	if e.options.DryRun {
		return nil
	}
zhongqiling's avatar
zhongqiling committed
	if !e.options.WithTransaction {
		e.logger.Info("engine not support transaction")
		return e.eventBus.Dispatch(ctx, events...)
	}

	normalEvents, txEvents := make([]*DomainEvent, 0), make([]*DomainEvent, 0)
	for _, evt := range events {
		if evt.SendType == SendTypeTransaction {
			txEvents = append(txEvents, evt)
		} else {
			normalEvents = append(normalEvents, evt)
		}
	}
	if txEventBus, ok := e.eventBus.(ITransactionEventBus); ok {
		if len(txEvents) > 0 {
			e.eventCtx, err = txEventBus.DispatchBegin(ctx, txEvents...)
			if err != nil {
				return err
			}
		}

		if len(normalEvents) > 0 {
			if err = txEventBus.Dispatch(ctx, normalEvents...); err != nil {
				return err
			}
		}
		return
	} else {
		// 如果 eventbus 不支持事务,所有事件默认按照普通方式发送
		return e.eventBus.Dispatch(ctx, events...)
	}
}

func (e *Stage) commit(ctx context.Context) error {
	if err := e.persist(ctx); err != nil {
		return err
	}
	if err := e.flush(); err != nil {
zhongqiling's avatar
zhongqiling committed
		return err
	}

	e.unDirty()
	return nil
}

func (e *Stage) persist(ctx context.Context) error {
	// 发现实体变更
	changed, err := e.getEntityChanged()
	if err != nil {
		return err
	}
	// 执行保存前的 hook
	for _, c := range changed {
		for _, entity := range c.children {
			if err := execHook(ctx, entity, c.changeType, true); err != nil {
				return err
			}
		}
	}

	if err := e.putNewID(changed); err != nil {
		return err
	}
	// 转换为持久化 Action 序列
	actions, err := e.makeActions(changed)
	if err != nil {
		return err
	}

	if err := e.exec(ctx, actions); err != nil {
		return err
	}
	// 执行保存后的 hook
	for _, c := range changed {
		for _, entity := range c.children {
			if err := execHook(ctx, entity, c.changeType, false); err != nil {
				return err
			}
		}
	}

	// 回收被删除的子实体,被删除的子实体仍可能有事件需要发送
	e.recycle(changed)
	return nil
}

func execHook(ctx context.Context, entity IEntity, ct changeType, isBefore bool) error {
	if ct == newChildren && isBefore {
		if saver, ok := entity.(IBeforeCreate); ok {
			if err := saver.BeforeCreate(ctx); err != nil {
				return err
			}
		}
	} else if ct == newChildren && !isBefore {
		if saver, ok := entity.(IAfterCreate); ok {
			if err := saver.AfterCreate(ctx); err != nil {
				return err
			}
		}
	} else if ct == dirtyChildren && isBefore {
		if saver, ok := entity.(IBeforeUpdate); ok {
			if err := saver.BeforeUpdate(ctx); err != nil {
				return err
			}
		}
	} else if ct == dirtyChildren && !isBefore {
		if saver, ok := entity.(IAfterUpdate); ok {
			if err := saver.AfterUpdate(ctx); err != nil {
				return err
			}
		}
	} else if ct == deleteChildren && isBefore {
		if saver, ok := entity.(IBeforeDelete); ok {
			if err := saver.BeforeDelete(ctx); err != nil {
				return err
			}
		}
	} else if ct == deleteChildren && !isBefore {
		if saver, ok := entity.(IAfterDelete); ok {
			if err := saver.AfterDelete(ctx); err != nil {
				return err
			}
		}
	}
	return nil
}

func (e *Stage) exec(ctx context.Context, actions []*Action) error {
	if e.options.DryRun {
		return nil
	}
zhongqiling's avatar
zhongqiling committed
	for _, a := range actions {
		if err := e.executor.Exec(ctx, a); err != nil {
			return err
		}
	}
	e.result.Actions = append(e.result.Actions, actions...)
	return nil
}

func (e *Stage) setOutput(data interface{}) {
	e.result.Output = data
}

func (e *Stage) do(ctx context.Context) *Result {
	// 创建聚合
	var err error
		repo := &Repository{stage: e}
		if err := e.main(ctx, repo); err != nil {
zhongqiling's avatar
zhongqiling committed
			return ResultErrOrBreak(err)
		}
		if err := repo.getError(); err != nil {
			return ResultError(err)
		}
zhongqiling's avatar
zhongqiling committed
	}

	err = e.persist(ctx)
	if err != nil {
		return ResultErrors(err)
	}

	events := e.collectEvents()
	if len(events) > 0 && e.options.EventPersist != nil {
		action, err := e.makeEventPersistAction(events)
		if err != nil {
			return ResultErrors(err)
		}
		if err := e.exec(ctx, []*Action{action}); err != nil {
			return ResultError(err)
		}
	}

	// 发送领域事件
	if len(events) > 0 {
		if e.eventBus == nil {
			return ResultErrors(ErrNoEventBusFound)
		}
		if err := e.dispatchEvents(ctx, events); err != nil {
			return ResultErrors(err)
		}
	}

	return e.result
}

type doSave func(ctx context.Context) *Result

func (e *Stage) runOnLock(f doSave, lockKeys ...string) doSave {
	return func(ctx context.Context) *Result {
		sort.Strings(lockKeys)
		for i := 1; i < len(lockKeys); i++ {
			if lockKeys[i] == lockKeys[i-1] {
				return ResultErrors(fmt.Errorf("lockKey(%s) repeated", lockKeys[i]))
			}
		}

		var lockErr error
		ls := make([]interface{}, 0)
		for _, id := range lockKeys {
			l, err := e.locker.Lock(ctx, fmt.Sprintf("ddd_engine_%s", id))
			if err != nil {
				if lockErr != ErrEntityLocked {
					lockErr = fmt.Errorf("acquire lock failed: %w", err)
				}
zhongqiling's avatar
zhongqiling committed
				break
			}
			ls = append(ls, l)
		}
		defer func() {
			for _, l := range ls {
				if err := e.locker.UnLock(ctx, l); err != nil {
					e.logger.Error(err, "unlock failed")
				}
			}
		}()

		if lockErr != nil {
			return ResultErrors(lockErr)
		}
		return f(ctx)
	}
}

func (e *Stage) runWithTransaction(f doSave) doSave {
	return func(ctx context.Context) *Result {
		ctx, err := e.executor.Begin(ctx)
		if err != nil {
			return ResultErrors(err)
		}
		defer func() {
			if r := recover(); r != nil {
				if err := e.executor.RollBack(ctx); err != nil {
					e.logger.Error(err, "rollback failed")
				}
				panic(r)
			}
		}()
		result := f(ctx)
		if result.Error != nil || result.Break {
			if err := e.executor.RollBack(ctx); err != nil {
				e.logger.Error(err, "rollback failed", "err", err)
			}
			// 回滚消息事务
			if e.eventBus != nil && e.eventCtx != nil {
				if txEventBus, ok := e.eventBus.(ITransactionEventBus); ok {
					_ = txEventBus.Rollback(e.eventCtx)
				}
			}
			return result
		}
		if err := e.executor.Commit(ctx); err != nil {
			result.Error = err
			e.logger.Error(err, "commit failed", "err", err)
		} else {
			// commit 成功后才提交消息事务
			// 注意 commit 失败不会执行 eventBus.Commit,需要依靠 eventbus 的回查机制确认最终结果
			if e.eventBus != nil && e.eventCtx != nil {
				if txEventBus, ok := e.eventBus.(ITransactionEventBus); ok {
					_ = txEventBus.Commit(e.eventCtx)
				}
			}
		}

		return result
	}
}

func (e *Stage) Save(ctx context.Context) *Result {
	do := e.do
	if e.options.WithTransaction {
		do = e.runWithTransaction(do)
	}

	if len(e.lockKeys) > 0 {
		do = e.runOnLock(do, e.lockKeys...)
	}

	res := do(ctx)
	if res.Error == nil && len(e.options.PostSaveHooks) > 0 {
		for _, h := range e.options.PostSaveHooks {
			h(ctx, res)
		}
	}
	return res
}