Skip to content
Snippets Groups Projects
engine.go 26.7 KiB
Newer Older
zhongqiling's avatar
zhongqiling committed
//
// Copyright 2023 Bytedance Ltd. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dddfirework

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	stdlog "log"
	"os"
	"reflect"
	"sort"
	"strings"

	"github.com/go-logr/logr"
	"github.com/go-logr/stdr"
	"github.com/rs/xid"
)

var ErrBreak = fmt.Errorf("break process") // 中断流程,不返回错误
var ErrEntityNotFound = fmt.Errorf("entity not found")
var ErrEntityRepeated = fmt.Errorf("entity already added")

var defaultLogger = stdr.New(stdlog.New(os.Stderr, "", stdlog.LstdFlags|stdlog.Lshortfile)).WithName("ddd_engine")

type ILock interface {
	Lock(ctx context.Context, key string) (keyLock interface{}, err error)
	UnLock(ctx context.Context, keyLock interface{}) error
}

type IIDGenerator interface {
	NewID() (string, error)
}

type defaultIDGenerator struct {
}

func (d *defaultIDGenerator) NewID() (string, error) {
	guid := xid.New()
	return guid.String(), nil
}

// EntityContainer 负责维护领域内所有聚合根实体的实体
type EntityContainer struct {
	BaseEntity

	roots   []IEntity // 保存聚合根实体
	deleted []IEntity // 保存所有被删除实体
}

func (w *EntityContainer) GetChildren() map[string][]IEntity {
	return map[string][]IEntity{"meta": w.roots}
}

func (w *EntityContainer) GetDeleted() []IEntity {
	return w.deleted
}

func (w *EntityContainer) SetChildren(roots []IEntity) {
	// EntityContainer 里面会有修改 roots 的操作,应当拷贝一个新的 slice,隔离输入的影响
	w.roots = make([]IEntity, len(roots))
	copy(w.roots, roots)
}

func (w *EntityContainer) Add(root IEntity) error {
	for _, e := range w.roots {
		if e == root {
			return ErrEntityRepeated
		}
	}

	w.roots = append(w.roots, root)
	return nil
}

func (w *EntityContainer) Remove(root IEntity) error {
	i := 0
	for _, item := range w.roots {
		if item != root {
			w.roots[i] = item
			i++
		}
	}
	if i == len(w.roots) {
		return ErrEntityNotFound
	}
	w.roots = w.roots[:i]
	w.deleted = append(w.deleted, root)
	return nil
}

// Recycle 回收所有被删除的实体
func (w *EntityContainer) Recycle(e IEntity) {
	w.deleted = append(w.deleted, e)
}

type ErrList []error

func (e ErrList) Error() string {
	errs := make([]string, 0)
	for _, err := range e {
		errs = append(errs, err.Error())
	}
	return strings.Join(errs, ", ")
}

type Result struct {
	Error   error
	Break   bool
	Actions []*Action
	Output  interface{}
}

func ResultErrors(err ...error) *Result {
	return &Result{Error: ErrList(err)}
}

func ResultError(err error) *Result {
	return &Result{Error: err}
}

func ResultErrOrBreak(err error) *Result {
	if errors.Is(err, ErrBreak) {
		return &Result{Break: true}
	}
	return ResultError(err)
}

type DomainBuilder struct {
	stage *Stage
}

// Build 查询并构建 parent 以及 children 实体
// parent 必须指定 id,children 为可选,需要是 *IEntity 或者 *[]IEntity 类型
func (h DomainBuilder) Build(ctx context.Context, parent IEntity, children ...interface{}) error {
	return h.stage.BuildEntity(ctx, parent, children...)
}

// RootContainer 聚合根实体容器
type RootContainer struct {
	stage *Stage
	errs  []error
}

// Add 创建聚合根实体
func (h *RootContainer) Add(root IEntity) {
	if err := h.stage.meta.Add(root); err != nil {
		h.errs = append(h.errs, err)
	}
}

// Remove 删除聚合根实体
func (h *RootContainer) Remove(root IEntity) {
	if err := h.stage.meta.Remove(root); err != nil {
		h.errs = append(h.errs, err)
	}
}

type BuildFunc func(ctx context.Context, h DomainBuilder) (roots []IEntity, err error)
type ActFunc func(ctx context.Context, container RootContainer, roots ...IEntity) error
type PostSaveFunc func(ctx context.Context, res *Result)

// EventHandlerConstruct EventHandler 的构造函数,带一个入参和一个返回值,入参是与事件类型匹配的事件数据指针类型,返回值是 ICommand
// 示例 func(evt *OrderCreatedEvent) *OnEventCreateCommand
type EventHandlerConstruct interface{}

type Options struct {
	WithTransaction bool
	RecursiveDelete bool         // 删除根实体是否递归删除所有子实体
	EventPersist    EventPersist // 是否保存领域事件到 DB
	Logger          logr.Logger
	EventBus        IEventBus
zhongqiling's avatar
zhongqiling committed
	IDGenerator     IIDGenerator
	PostSaveHooks   []PostSaveFunc
}

type Option interface {
	ApplyToOptions(*Options)
}
type TransactionOption bool

func (t TransactionOption) ApplyToOptions(opts *Options) {
	opts.WithTransaction = bool(t)
}

const WithTransaction = TransactionOption(true)
const WithoutTransaction = TransactionOption(false)

type RecursiveDeleteOption bool

func (t RecursiveDeleteOption) ApplyToOptions(opts *Options) {
	opts.RecursiveDelete = bool(t)
}

const WithRecursiveDelete = RecursiveDeleteOption(true)

type LoggerOption struct {
	logger logr.Logger
}

func (t LoggerOption) ApplyToOptions(opts *Options) {
	opts.Logger = t.logger
}

func WithLogger(logger logr.Logger) LoggerOption {
	return LoggerOption{logger: logger}
}

type EventBusOption struct {
	eventBus IEventBus
}

func (t EventBusOption) ApplyToOptions(opts *Options) {
	opts.EventBus = t.eventBus
}

func WithEventBus(eventBus IEventBus) EventBusOption {
	return EventBusOption{eventBus: eventBus}
}

type DTimerOption struct {
	timer ITimer
}

func (t DTimerOption) ApplyToOptions(opts *Options) {
	opts.Timer = t.timer
}

func WithTimer(timer ITimer) DTimerOption {
	return DTimerOption{timer: timer}
}

zhongqiling's avatar
zhongqiling committed
type EventPersist func(event *DomainEvent) (IModel, error)

type EventSaveOption EventPersist

func (t EventSaveOption) ApplyToOptions(opts *Options) {
	opts.EventPersist = EventPersist(t)
}

func WithEventPersist(f EventPersist) EventSaveOption {
	return EventSaveOption(f)
}

type IDGeneratorOption struct {
	idGen IIDGenerator
}

func (t IDGeneratorOption) ApplyToOptions(opts *Options) {
	opts.IDGenerator = t.idGen
}

func WithIDGenerator(idGen IIDGenerator) IDGeneratorOption {
	return IDGeneratorOption{idGen: idGen}
}

type PostSaveOption PostSaveFunc

func (t PostSaveOption) ApplyToOptions(opts *Options) {
	opts.PostSaveHooks = append(opts.PostSaveHooks, PostSaveFunc(t))
}

func WithPostSave(f PostSaveFunc) PostSaveOption {
	return PostSaveOption(f)
}

type Engine struct {
	locker      ILock
	executor    IExecutor
	idGenerator IIDGenerator
	eventbus    IEventBus
zhongqiling's avatar
zhongqiling committed
	logger      logr.Logger
	options     Options
}

func NewEngine(l ILock, e IExecutor, opts ...Option) *Engine {
	options := Options{
		// 默认开启事务
		WithTransaction: true,
		Logger:          defaultLogger,
		IDGenerator:     &defaultIDGenerator{},
		EventBus:        &noEventBus{},
zhongqiling's avatar
zhongqiling committed
	}
	for _, opt := range opts {
		opt.ApplyToOptions(&options)
	}
	eventBus := options.EventBus
	eventBus.RegisterEventHandler(onEvent)
	if txEB, ok := eventBus.(ITransactionEventBus); ok {
		txEB.RegisterEventTXChecker(onTXChecker)
	}
	timer := options.Timer
	timer.RegisterTimerHandler(onTimer)
zhongqiling's avatar
zhongqiling committed
	return &Engine{
		locker:      l,
		executor:    e,
		eventbus:    eventBus,
zhongqiling's avatar
zhongqiling committed
		options:     options,
		logger:      options.Logger,
		idGenerator: options.IDGenerator,
	}
}

func (e *Engine) NewStage() *Stage {
	return &Stage{
		locker:      e.locker,
		executor:    e.executor,
		eventBus:    e.eventbus,
zhongqiling's avatar
zhongqiling committed
		idGenerator: e.idGenerator,
		meta:        &EntityContainer{},
		result:      &Result{},
		options:     e.options,
		logger:      e.logger,
	}
}

func (e *Engine) Create(ctx context.Context, roots ...IEntity) *Result {
	return e.NewStage().Act(func(ctx context.Context, container RootContainer, entities ...IEntity) error {
		for _, r := range roots {
			container.Add(r)
		}
		return nil
	}).Save(ctx)
}

func (e *Engine) Delete(ctx context.Context, roots ...IEntity) *Result {
	return e.NewStage().Build(func(ctx context.Context, builder DomainBuilder) ([]IEntity, error) {
		return roots, nil
	}).Act(func(ctx context.Context, container RootContainer, roots ...IEntity) error {
		for _, r := range roots {
			container.Remove(r)
		}
		return nil
	}).Save(ctx)
}

func (e *Engine) RunCommand(ctx context.Context, c ICommand, opts ...Option) *Result {
	return e.NewStage().WithOption(opts...).RunCommand(ctx, c)
}

func (e *Engine) RegisterEventHandler(eventType EventType, construct EventHandlerConstruct) {
	handlerType := reflect.TypeOf(construct)
	if handlerType.Kind() != reflect.Func {
		panic("construct must type of reflect.Func")
	}
	if handlerType.NumIn() != 1 || handlerType.NumOut() != 1 {
		panic("construct num of arg or output must 1")
	}

	evtType := handlerType.In(0)
	if evtType.Kind() != reflect.Ptr {
		panic("event type must be pointer")
	}
	evtType = evtType.Elem() // event type 引用实际类型
	outType := handlerType.Out(0)
	if !outType.Implements(cmdType) {
		panic("construct output must be type of ICommand")
	}
	constructFunc := reflect.ValueOf(construct)

	RegisterEventHandler(eventType, func(ctx context.Context, evt *DomainEvent) error {
		var bizEvt reflect.Value
		if evtType == domainEventType {
			bizEvt = reflect.ValueOf(evt)
		} else {
			bizEvt = reflect.New(evtType)
			if err := json.Unmarshal(evt.Payload, bizEvt.Interface()); err != nil {
				e.logger.Error(err, "unmarshal event failed")
				return err
			}
		}

		outputs := constructFunc.Call([]reflect.Value{bizEvt})
		if res := e.RunCommand(ctx, outputs[0].Interface().(ICommand)); res.Error != nil {
			e.logger.Error(res.Error, "event handler exec failed")
			return res.Error
		}
		return nil
	})
}

// RegisterCronTask 注册定时任务
func (e *Engine) RegisterCronTask(key EventType, cron string, f func(key, cron string)) {
	if e.timer == nil {
		panic("No ITimer specified")
	}
	if hasEventHandler(key) {
		panic("key has registered")
	}

	RegisterEventHandler(key, func(ctx context.Context, evt *TimerEvent) error {
		f(evt.Key, evt.Cron)
		return nil
	})

	if err := e.timer.RunCron(string(key), cron, nil); err != nil {
		panic(err)
	}
}

// RegisterCronTaskOfCommand 注册定时触发的 ICommand
func (e *Engine) RegisterCronTaskOfCommand(key EventType, cron string, f func(key, cron string) ICommand) {
	if e.timer == nil {
		panic("No ITimer specified")
	}
	if hasEventHandler(key) {
		panic("key has registered")
	}

	e.RegisterEventHandler(key, func(evt *TimerEvent) ICommand {
		return f(evt.Key, evt.Cron)
	})
	if err := e.timer.RunCron(string(key), cron, nil); err != nil {
zhongqiling's avatar
zhongqiling committed
// Stage 取舞台的意思,表示单次运行
type Stage struct {
	lockKeys []string
	build    BuildFunc
	act      ActFunc

	locker      ILock
	executor    IExecutor
	eventBus    IEventBus
zhongqiling's avatar
zhongqiling committed
	idGenerator IIDGenerator
	logger      logr.Logger
	options     Options

	meta     *EntityContainer
	snapshot entitySnapshotPool
	result   *Result
	eventCtx context.Context
}

func (e *Stage) WithOption(opts ...Option) *Stage {
	for _, opt := range opts {
		opt.ApplyToOptions(&e.options)
	}

	eventBus := e.options.EventBus
	eventBus.RegisterEventHandler(onEvent)
	if txEB, ok := eventBus.(ITransactionEventBus); ok {
		txEB.RegisterEventTXChecker(onTXChecker)
	}
	e.eventBus = eventBus

	timer := e.options.Timer
	timer.RegisterTimerHandler(onTimer)
	e.timer = timer
zhongqiling's avatar
zhongqiling committed
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
	e.logger = e.options.Logger
	e.idGenerator = e.options.IDGenerator
	return e
}

func changeType2OP(t changeType) OpType {
	switch t {
	case newChildren:
		return OpInsert
	case dirtyChildren:
		return OpUpdate
	case deleteChildren, clearChildren:
		return OpDelete
	}
	return OpUnknown
}

// BuildEntity 查询并构建 parent 以及 children 实体
// parent 必须指定 id,children 为可选,需要是 *IEntity 或者 *[]IEntity 类型
func (e *Stage) BuildEntity(ctx context.Context, parent IEntity, children ...interface{}) error {
	if parent.GetID() == "" {
		return fmt.Errorf("parent must has ID")
	}

	if err := e.buildEntity(ctx, parent, nil); err != nil {
		return err
	}

	for _, item := range children {
		itemType := reflect.TypeOf(item)
		if itemType.Kind() != reflect.Ptr {
			return fmt.Errorf("children must be pointer")
		}
		if itemType.Elem().Kind() == reflect.Slice {
			if err := e.buildEntitySliceByParent(ctx, parent, item); err != nil {
				return err
			}
		} else if itemType.Implements(entityType) {
			if err := e.buildEntity(ctx, item.(IEntity), parent); err != nil && !errors.Is(err, ErrEntityNotFound) {
				return err
			}
		} else {
			return fmt.Errorf("children type must be IEntity")
		}
	}
	return nil
}

// 查询并构建 entity 实体,注意,不会处理 parent 实体
func (e *Stage) buildEntity(ctx context.Context, entity, parent IEntity) error {
	// 至少一个有 ID
	if entity.GetID() == "" && parent.GetID() == "" {
		return fmt.Errorf("entity to build must has id")
	}
	po, err := e.executor.Entity2Model(entity, parent, OpQuery)
	if err != nil {
		return err
	}
	posPointer := reflect.New(reflect.SliceOf(reflect.TypeOf(po)))
	if err := e.executor.Exec(ctx, &Action{
		Op:          OpQuery,
		Query:       po,
		QueryResult: posPointer.Interface(),
	}); err != nil {
		return err
	}
	if posPointer.Elem().Len() == 0 {
		return ErrEntityNotFound
	}
	queryPO := posPointer.Elem().Index(0).Interface()
	return e.executor.Model2Entity(queryPO.(IModel), entity)
}

func (e *Stage) buildEntitySliceByParent(ctx context.Context, parent IEntity, children interface{}) error {
	childrenType := reflect.TypeOf(children)
	if childrenType.Kind() != reflect.Ptr || childrenType.Elem().Kind() != reflect.Slice {
		return fmt.Errorf("children must be pointer of slice")
	}
	eType := childrenType.Elem().Elem()
	if !eType.Implements(entityType) {
		return fmt.Errorf("element of children must implement IEntity")
	}

	if eType.Kind() == reflect.Ptr {
		eType = eType.Elem()
	}
	entity := reflect.New(eType)
	po, err := e.executor.Entity2Model(entity.Interface().(IEntity), parent, OpQuery)
	if err != nil {
		return err
	}
	posPointer := reflect.New(reflect.SliceOf(reflect.TypeOf(po)))

	if err := e.executor.Exec(ctx, &Action{
		Op:          OpQuery,
		Query:       po,
		QueryResult: posPointer.Interface(),
	}); err != nil {
		return err
	}
	if posPointer.Elem().Len() == 0 {
		return nil
	}

	resultVal := reflect.ValueOf(children)
	entitiesVal := resultVal.Elem()
	for i := 0; i < posPointer.Elem().Len(); i++ {
		newEntity := reflect.New(eType)
		if err := e.executor.Model2Entity(posPointer.Elem().Index(i).Interface().(IModel), newEntity.Interface().(IEntity)); err != nil {
			return err
		}
		entitiesVal = reflect.Append(entitiesVal, newEntity)
	}
	resultVal.Elem().Set(entitiesVal)
	return nil
}

func (e *Stage) Lock(keys ...string) *Stage {
	e.lockKeys = keys
	return e
}

func (e *Stage) Build(f BuildFunc) *Stage {
	e.build = f
	return e
}

func (e *Stage) Act(f ActFunc) *Stage {
	e.act = f
	return e
}

func (e *Stage) RunCommand(ctx context.Context, c ICommand) *Result {
	if setter, ok := c.(IStageSetter); ok {
		setter.SetStage(StageAgent{st: e})
	}

	keys, err := c.Init(ctx)
	if err != nil {
		return ResultErrOrBreak(err)
	}
	return e.WithOption(PostSaveOption(c.PostSave)).Lock(keys...).Build(c.Build).Act(c.Act).Save(ctx)
}

func childrenSnapshot(children map[string][]IEntity) map[string][]IEntity {
	snapshot := make(map[string][]IEntity, len(children))
	for k, v := range children {
		v2 := make([]IEntity, len(v))
		copy(v2, v)
		snapshot[k] = v2
	}
	return snapshot
}

func (e *Stage) makeSnapshot() error {
	e.snapshot = entitySnapshotPool{}
	return walk(e.meta, nil, func(entity, parent IEntity, children map[string][]IEntity) error {
		if _, in := e.snapshot[entity]; in {
			return nil
		}
		po, err := e.executor.Entity2Model(entity, parent, OpQuery)
		if err != nil && !errors.Is(ErrEntityNotRegister, err) {
			return err
		}
		e.snapshot[entity] = &entitySnapshot{
			po:       po,
			children: childrenSnapshot(children),
		}
		return nil
	})
}

// unDirty 对所有实体取消 Dirty 标记
func (e *Stage) unDirty() {
	_ = walk(e.meta, nil, func(entity, parent IEntity, children map[string][]IEntity) error {
		entity.UnDirty()
		return nil
	})
}

func (e *Stage) getEntityChanged() ([]*entityChanged, error) {
	changed := entityDiff(e.meta, e.snapshot)
	// 处理实体移动的场景
	changed, err := handleEntityMove(changed)
	if err != nil {
		return nil, err
	}
	// 处理递归删除子实体
	if e.options.RecursiveDelete {
		changed = recursiveDelete(changed)
	}
	return changed, nil
}

func (e *Stage) recycle(changed []*entityChanged) {
	for _, c := range changed {
		if c.changeType == deleteChildren {
			for _, entity := range c.children {
				e.meta.Recycle(entity)
			}
		}
	}
}

// 为新对象统一生成id
func (e *Stage) putNewID(changes []*entityChanged) error {
	for _, item := range changes {
		if item.changeType == newChildren {
			for _, child := range item.children {
				if child.GetID() == "" {
					id, err := e.idGenerator.NewID()
					if err != nil {
						return err
					}
					child.SetID(id)
				}
			}
		}
	}
	return nil
}

// 相同操作,相同类型的PO,合并到一个 Action
func (e *Stage) makeActions(changes []*entityChanged) ([]*Action, error) {
	typeActions := make(map[OpType]map[reflect.Type]*Action, 3)
	for _, item := range changes {
		for _, entity := range item.children {
			op := changeType2OP(item.changeType)
			po, err := e.executor.Entity2Model(entity, item.parent, op)
			if err != nil {
				if errors.Is(err, ErrEntityNotRegister) {
					e.logger.Info("entity not registered", "type", reflect.TypeOf(entity))
					continue
				}
				return nil, err
			}
			poType := reflect.TypeOf(po)
			if _, in := typeActions[op]; !in {
				typeActions[op] = map[reflect.Type]*Action{}
			}

			if _, in := typeActions[op][poType]; in {
				typeActions[op][poType].Models = append(typeActions[op][poType].Models, po)
			} else {
				typeActions[op][poType] = &Action{
					Op:     op,
					Models: []IModel{po},
				}
			}
			if op == OpUpdate {
				typeActions[op][poType].PrevModels = append(typeActions[op][poType].PrevModels, e.snapshot[entity].po)
			}
		}
	}
	actions := make([]*Action, 0)
	for _, t := range []OpType{OpInsert, OpUpdate, OpDelete} {
		for _, a := range typeActions[t] {
			actions = append(actions, a)
		}
	}
	return actions, nil
}

func (e *Stage) collectEvents() []*DomainEvent {
	eventMap := make(map[string]*DomainEvent, 0)
	_ = walk(e.meta, nil, func(entity, parent IEntity, children map[string][]IEntity) (err error) {
		for _, evt := range entity.GetEvents() {
			eventMap[evt.ID] = evt
		}
		return
	})
	// 收集已删除的实体发送的事件
	for _, del := range e.meta.GetDeleted() {
		_ = walk(del, nil, func(entity, parent IEntity, children map[string][]IEntity) (err error) {
			for _, evt := range entity.GetEvents() {
				eventMap[evt.ID] = evt
			}
			return
		})
	}
	events := make([]*DomainEvent, 0)
	for _, evt := range eventMap {
		events = append(events, evt)
	}
	// 事件根据发送时间 + id 的顺序排序,id 由 xid 保证单节点严格自增
	sort.SliceStable(events, func(i, j int) bool {
		return events[i].CreatedAt.Before(events[j].CreatedAt) ||
			(events[i].CreatedAt.Equal(events[j].CreatedAt) && events[i].ID < events[j].ID)
	})
	return events
}

func (e *Stage) makeEventPersistAction(events []*DomainEvent) (*Action, error) {
	pos := make([]IModel, len(events))
	for i, evt := range events {
		po, err := e.options.EventPersist(evt)
		if err != nil {
			return nil, err
		}
		pos[i] = po
	}
	return &Action{
		Op:         OpInsert,
		Models:     pos,
		PrevModels: []IModel{},
	}, nil
}

func (e *Stage) dispatchEvents(ctx context.Context, events []*DomainEvent) (err error) {
	if !e.options.WithTransaction {
		e.logger.Info("engine not support transaction")
		return e.eventBus.Dispatch(ctx, events...)
	}

	normalEvents, txEvents := make([]*DomainEvent, 0), make([]*DomainEvent, 0)
	for _, evt := range events {
		if evt.SendType == SendTypeTransaction {
			txEvents = append(txEvents, evt)
		} else {
			normalEvents = append(normalEvents, evt)
		}
	}
	if txEventBus, ok := e.eventBus.(ITransactionEventBus); ok {
		if len(txEvents) > 0 {
			e.eventCtx, err = txEventBus.DispatchBegin(ctx, txEvents...)
			if err != nil {
				return err
			}
		}

		if len(normalEvents) > 0 {
			if err = txEventBus.Dispatch(ctx, normalEvents...); err != nil {
				return err
			}
		}
		return
	} else {
		// 如果 eventbus 不支持事务,所有事件默认按照普通方式发送
		return e.eventBus.Dispatch(ctx, events...)
	}
}

func (e *Stage) commit(ctx context.Context) error {
	if err := e.persist(ctx); err != nil {
		return err
	}
	if err := e.makeSnapshot(); err != nil {
		return err
	}

	e.unDirty()
	return nil
}

func (e *Stage) persist(ctx context.Context) error {
	// 发现实体变更
	changed, err := e.getEntityChanged()
	if err != nil {
		return err
	}
	// 执行保存前的 hook
	for _, c := range changed {
		for _, entity := range c.children {
			if err := execHook(ctx, entity, c.changeType, true); err != nil {
				return err
			}
		}
	}

	if err := e.putNewID(changed); err != nil {
		return err
	}
	// 转换为持久化 Action 序列
	actions, err := e.makeActions(changed)
	if err != nil {
		return err
	}

	if err := e.exec(ctx, actions); err != nil {
		return err
	}
	// 执行保存后的 hook
	for _, c := range changed {
		for _, entity := range c.children {
			if err := execHook(ctx, entity, c.changeType, false); err != nil {
				return err
			}
		}
	}

	// 回收被删除的子实体,被删除的子实体仍可能有事件需要发送
	e.recycle(changed)
	return nil
}

func execHook(ctx context.Context, entity IEntity, ct changeType, isBefore bool) error {
	if ct == newChildren && isBefore {
		if saver, ok := entity.(IBeforeCreate); ok {
			if err := saver.BeforeCreate(ctx); err != nil {
				return err
			}
		}
	} else if ct == newChildren && !isBefore {
		if saver, ok := entity.(IAfterCreate); ok {
			if err := saver.AfterCreate(ctx); err != nil {
				return err
			}
		}
	} else if ct == dirtyChildren && isBefore {
		if saver, ok := entity.(IBeforeUpdate); ok {
			if err := saver.BeforeUpdate(ctx); err != nil {
				return err
			}
		}
	} else if ct == dirtyChildren && !isBefore {
		if saver, ok := entity.(IAfterUpdate); ok {
			if err := saver.AfterUpdate(ctx); err != nil {
				return err
			}
		}
	} else if ct == deleteChildren && isBefore {
		if saver, ok := entity.(IBeforeDelete); ok {
			if err := saver.BeforeDelete(ctx); err != nil {
				return err
			}
		}
	} else if ct == deleteChildren && !isBefore {
		if saver, ok := entity.(IAfterDelete); ok {
			if err := saver.AfterDelete(ctx); err != nil {
				return err
			}
		}
	}
	return nil
}

func (e *Stage) exec(ctx context.Context, actions []*Action) error {
	for _, a := range actions {
		if err := e.executor.Exec(ctx, a); err != nil {
			return err
		}
	}
	e.result.Actions = append(e.result.Actions, actions...)
	return nil
}

func (e *Stage) setOutput(data interface{}) {
	e.result.Output = data
}

func (e *Stage) do(ctx context.Context) *Result {
	// 创建聚合
	var err error
	var buildRoots []IEntity
	if e.build != nil {
		buildRoots, err = e.build(ctx, DomainBuilder{stage: e})
		if err != nil {
			return ResultErrOrBreak(err)
		}
		for _, r := range buildRoots {
			if r.GetID() == "" {
				return ResultErrors(fmt.Errorf("build entities must have ID, for create case, just use container.Add(**) at act func"))
			}
		}
		e.meta.SetChildren(buildRoots)
	}

	// 保存父子实体关系链
	if err = e.makeSnapshot(); err != nil {
		return ResultError(err)
	}
	if e.act != nil {
		container := RootContainer{stage: e}
		if err := e.act(ctx, container, buildRoots...); err != nil {
			return ResultErrOrBreak(err)
		} else if len(container.errs) > 0 {
			return ResultErrors(container.errs...)
		}
	}
	err = e.persist(ctx)
	if err != nil {
		return ResultErrors(err)
	}

	events := e.collectEvents()
	if len(events) > 0 && e.options.EventPersist != nil {
		action, err := e.makeEventPersistAction(events)
		if err != nil {
			return ResultErrors(err)
		}
		if err := e.exec(ctx, []*Action{action}); err != nil {
			return ResultError(err)
		}
	}

	// 发送领域事件
	if len(events) > 0 {
		if e.eventBus == nil {
			return ResultErrors(ErrNoEventBusFound)
		}
		if err := e.dispatchEvents(ctx, events); err != nil {
			return ResultErrors(err)
		}
	}

	return e.result
}

type doSave func(ctx context.Context) *Result

func (e *Stage) runOnLock(f doSave, lockKeys ...string) doSave {
	return func(ctx context.Context) *Result {
		sort.Strings(lockKeys)
		for i := 1; i < len(lockKeys); i++ {
			if lockKeys[i] == lockKeys[i-1] {
				return ResultErrors(fmt.Errorf("lockKey(%s) repeated", lockKeys[i]))
			}
		}

		var lockErr error
		ls := make([]interface{}, 0)
		for _, id := range lockKeys {
			l, err := e.locker.Lock(ctx, fmt.Sprintf("ddd_engine_%s", id))
			if err != nil {
				lockErr = fmt.Errorf("acquiring redis locker failed: %v", err)
				break
			}
			ls = append(ls, l)
		}