C#应用 - 事件总线

发布时间 2023-10-08 17:14:27作者: tossorrow

前言

发布订阅模式很常见,每个发布者和订阅者之间都搭建了一条小线路,随着功能越来越多,事件和委托就会满天飞,就像私拉电线的蜘蛛网一样。这时候可能需要一种集中式的事件处理方法,即事件总线。

1,简介

事件总线就像一个集线器,原本直接从起点到终点的连接,如今全部都要经过事件总线,发布者和订阅者完全地解耦了。发布者只需要向事件总线发起事件,不需要关心事件处理。订阅者只需要处理事件总线派发过来的事件,不需要关心事件的来源。

2,设计

2.1 设计思路

我希望事件总线是简单整洁灵活的

  • 定义一个事件接口IEventData,所有的事件类都应该继承此接口,用不同的类型代表不同的事件,并且事件类包含了全部事件信息。
  • 事件总线维护一个字典Dictionary<Type, List<Action<IEventData>>,第一个泛型参数Type表示事件类型,第二个泛型参数List<Action<IEventData>>表示事件处理委托列表。(此处用Action举例,Func是类似的)
  • 订阅者手动向事件总线注册事件处理委托
  • 发布者创建一个IEventData实例即可向事件总线触发事件,把事件处理委托列表调用一遍

2.2 设计实现

2.2.1 IEventData

//事件接口,所有的事件都要实现该接口
public interface IEventData
{
}

一个空接口就行,应用时再根据业务定义事件类

2.2.2 EventBus

public class EventBus
{
    public static EventBus Default = new EventBus();  //单例

    private readonly Dictionary<Type, List<object>> eventDataAndActionHandlerDic;  //Action<IEventData>
    private static readonly object 字典锁 = new object();

    private EventBus()
    {
        eventDataAndActionHandlerDic = new Dictionary<Type, List<object>>();
    }

    //手动注册事件处理方法
    public void Register<TEventData>(Action<TEventData> action) where TEventData : IEventData
    {
        if (action == null)
        {
            throw new ArgumentNullException(nameof(action));
        }
        lock (字典锁)
        {
            if (!eventDataAndActionHandlerDic.ContainsKey(typeof(TEventData)))
            {
                eventDataAndActionHandlerDic.Add(typeof(TEventData), new List<object>());
            }
            List<object> actionList = eventDataAndActionHandlerDic[typeof(TEventData)];
            if (!actionList.Contains(action))
            {
                actionList.Add(action);
            }
        }
    }

    //手动注销事件处理方法
    public void UnRegister<TEventData>(Action<TEventData> action) where TEventData : IEventData
    {
        if (action == null)
        {
            throw new ArgumentNullException(nameof(action));
        }
        lock (字典锁)
        {
            if (eventDataAndActionHandlerDic.ContainsKey(typeof(TEventData)))
            {
                List<object> actionList = eventDataAndActionHandlerDic[typeof(TEventData)];
                actionList.Remove(action);
            }
        }
    }

    //触发事件
    public void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData
    {
        if (eventData == null)
        {
            throw new ArgumentNullException(nameof(eventData));
        }
        List<object> actionList = null;
        lock (字典锁)
        {
            if (eventDataAndActionHandlerDic.ContainsKey(typeof(TEventData)))
            {
                actionList = eventDataAndActionHandlerDic[typeof(TEventData)];
            }
        }
        if(actionList != null)
        {
            for (var index = 0; index < actionList.Count; index++)
            {
                actionList[index].GetType().GetMethod("Invoke").Invoke(actionList[index], new object[] { eventData });
            }
        }
    }
}

很简单,就3个方法(此处用Action举例,Func是类似的)

  • Register:将订阅者的事件处理委托添加到事件处理委托列表
  • UnRegister:将订阅者的事件处理委托从事件处理委托列表里移除
  • Trigger:根据事件类型从字典里拿到事件处理委托列表都调用一遍

2.2.3 用起来

假设一个应用场景,一个气象台发布天气,多个电视台接收天气。
创建一个winform项目。

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
    }
    private void 订阅天气_Click(object sender, EventArgs e)
    {
        EventBus.Default.Register<EventData<string>>(中国电视台.收到天气);
        EventBus.Default.Register<EventData<string>>(米国电视台.收到天气);
    }
    private void 取消订阅天气_Click(object sender, EventArgs e)
    {
        EventBus.Default.UnRegister<EventData<string>>(中国电视台.收到天气);
        EventBus.Default.UnRegister<EventData<string>>(米国电视台.收到天气);
    }

    private 气象台 气象台 = new 气象台();
    private void 播报天气_Click(object sender, EventArgs e)
    {
        气象台.播报天气("下雨");
    }
}

//带泛型负载的事件
public class EventData<TPayload> : IEventData
{
    public TPayload Payload { get; protected set; }

    public EventData(TPayload payload)
    {
        Payload = payload;
    }
}
public class 气象台
{
    public void 播报天气(string 天气)
    {
        EventData<string> eventData = new EventData<string>(天气);
        EventBus.Default.Trigger<EventData<string>>(eventData);  //直接通过事件总线触发即可
    }
}
public class 中国电视台
{
    public static void 收到天气(EventData<string> eventData)
    {
        MessageBox.Show($"中国天气是{eventData.Payload}");
    }
}
public class 米国电视台
{
    public static void 收到天气(EventData<string> eventData)
    {
        MessageBox.Show($"米国天气是{eventData.Payload}");
    }
}

根据业务定义了一个带泛型负载的事件EventData<TPayload>。发布者气象台可以通过Trigger方法发起事件。订阅者电视台通过Register方法注册事件处理委托。先点击订阅天气,再点击播报天气时就会弹窗。

3,问题

3.1 起缘

一切都看起来很棒,直到有一天电视台要拆掉了。

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
    }

    private 中国电视台 中国电视台 = new 中国电视台();
    private 米国电视台 米国电视台 = new 米国电视台();
    private void 订阅天气_Click(object sender, EventArgs e)
    {
        EventBus.Default.Register<天气数据, string>(中国电视台.收到天气);
        EventBus.Default.Register<天气数据, string>(米国电视台.收到天气);
    }
    private void 取消订阅天气_Click(object sender, EventArgs e)
    {
        EventBus.Default.UnRegister<天气数据, string>(中国电视台.收到天气);
        EventBus.Default.UnRegister<天气数据, string>(米国电视台.收到天气);
    }
    private void 销毁电视台_Click(object sender, EventArgs e)
    {
        //如果没有取消订阅,内存就泄露了
        中国电视台 = null;
        米国电视台 = null;
        GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced);
        textBox1.AppendText($"销毁电视台\r\n");
    }

    private 气象台 气象台 = new 气象台();
    private void 播报天气_Click(object sender, EventArgs e)
    {
        气象台.播报天气("下雪", 播报天气CallBack);
    }
    private void 播报天气CallBack(string obj)
    {
        textBox1.AppendText($"{obj}\r\n");
    }
}

//带负载的事件
public class 天气数据: IEventData
{
    public string 天气;
    public 天气数据(string 天气)
    {
        this.天气 = 天气;
    }
}

public class 气象台
{
    public void 播报天气(string 天气, Action<string> callBack)
    {
        天气数据 eventData = new 天气数据(天气);
        List<string> list = EventBus.Default.Trigger<天气数据, string>(eventData);  //直接通过事件总线触发即可
        foreach(var str in list)
        {
            callBack?.Invoke(str);
        }
    }
}
public class 中国电视台
{
    public string 收到天气(天气数据 eventData)
    {
        return $"中国电视台收到的天气是{eventData.天气}";
    }
}
public class 米国电视台
{
    public string 收到天气(天气数据 eventData)
    {
        return $"米国电视台收到的天气是{eventData.天气}";
    }
}

先点击订阅天气,再点击播报天气时就会显示天气,这都没问题。此时再点击销毁电视台,再点击播报天气时仍然会显示天气,这是一个常见的内存泄露。

销毁电视台前手动注销事件处理委托可以避免这个问题,但这真的很难保证。有没有一种即便不手动注销也能正常GC的方法呢?有,就是弱引用。这个问题本来和事件总线没啥关系,但如果用到了弱引用就需要对事件总线进行改造。

3.2 改造

注册了事件处理委托之后,事件总线对订阅者就是强引用关系,强引用在,GC永远无法回收被引用者。弱引用不计入引用计数,引用计数归零GC可以正常回收。弱引用在使用时先判断对象是否存在,如果存在才访问对象。
下面将事件总结对事件处理委托的引用改造成弱引用。更确切地说,就是使用弱引用对Dictionary<Type, List<Action<IEventData>>中的List<Action<IEventData>>进行封装。

public class WeakEvent<T>
{
    private class ActionUnit  //弱引用封装
    {
        private WeakReference reference;
        private MethodInfo method;
        private bool noTarget;

        public bool IsDead
        {
            get
            {
                return !this.noTarget && !this.reference.IsAlive;
            }
        }
        public ActionUnit(Action<T> action)
        {
            this.noTarget = action.Target == null;  //静态方法没有Target,所以noTarget就是isStaticMethod
            this.reference = new WeakReference(action.Target);  //action.Target是订阅者实例
            this.method = action.Method;  //Method是MethodInfo的实例,即订阅者的事件处理方法
        }
        public bool Equals(Action<T> action)
        {
            return this.reference.Target == action.Target && this.method == action.Method;
        }
        public void Invoke(object[] args)
        {
            this.method.Invoke(this.reference.Target, args);  //reference.Target就是action.Target,WeakReference构造函数中传入的
        }
    }
        
    private List<ActionUnit> actionUnitlist = new List<ActionUnit>();  //弱引用封装列表

    public int Count
    {
        get
        {
            return this.actionUnitlist.Count;
        }
    }

    public void Add(Action<T> action)
    {
        this.actionUnitlist.Add(new ActionUnit(action));
    }
    public void Remove(Action<T> action)
    {
        for (int i = this.actionUnitlist.Count - 1; i > -1; i--)
        {
            if (this.actionUnitlist[i].Equals(action))
            {
                this.actionUnitlist.RemoveAt(i);
            }
        }
    }
    public void Clear()
    {
        this.actionUnitlist.Clear();
    }
    public bool Contains(Action<T> action)
    {
        return this.actionUnitlist.Any(item => item.Equals(action));
    }

    public void Invoke(T arg)
    {
        List<int> removeList = new List<int>();
        for (int i = 0; i < this.actionUnitlist.Count; i++)
        {
            if (this.actionUnitlist[i].IsDead)
            {
                removeList.Add(i);
            }
            else
            {
                this.actionUnitlist[i].Invoke(new object[] { arg });
            }
        }
        for (int i = removeList.Count - 1; i >= 0; i--)
        {
            this.actionUnitlist.RemoveAt(removeList[i]);
        }
    }
}

ActionUnit对应的是Action<TEventData>WeakEvent<T>对应的是List<Action<TEventData>>
添加事件处理委托action时,就使用弱引用表示订阅者new WeakReference(action.Target),这样就不会增加订阅者的引用计数。在调用事件处理委托时,会逐个判断订阅者是否存在,存在则调用事件处理委托,不存在的就从列表中移除掉。

public class EventBus
{
    public static EventBus Default = new EventBus();

    private readonly Dictionary<Type, object> eventDataAndActionHandlerDic;
    private static readonly object 字典锁 = new object();

    private EventBus()
    {
        eventDataAndActionHandlerDic = new Dictionary<Type, object>();
    }

    //手动注册事件处理方法
    public void Register<TEventData>(Action<TEventData> action) where TEventData : IEventData
    {
        if (action == null)
        {
            throw new ArgumentNullException(nameof(action));
        }
        lock (字典锁)
        {
            if (!eventDataAndActionHandlerDic.ContainsKey(typeof(TEventData)))
            {
                eventDataAndActionHandlerDic.Add(typeof(TEventData), new WeakEvent<TEventData>());
            }
            object actionList = eventDataAndActionHandlerDic[typeof(TEventData)];
            WeakEvent<TEventData> weakEvent = (WeakEvent<TEventData>)actionList;
            if (!weakEvent.Contains(action))
            {
                weakEvent.Add(action);
            }
        }
    }

    //手动注销事件处理方法
    public void UnRegister<TEventData>(Action<TEventData> action) where TEventData : IEventData
    {
        if (action == null)
        {
            throw new ArgumentNullException(nameof(action));
        }
        lock (字典锁)
        {
            if (eventDataAndActionHandlerDic.ContainsKey(typeof(TEventData)))
            {
                object actionList = eventDataAndActionHandlerDic[typeof(TEventData)];
                ((WeakEvent<TEventData>)actionList).Remove(action);
            }
        }
    }

    //触发事件
    public void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData
    {
        if (eventData == null)
        {
            throw new ArgumentNullException(nameof(eventData));
        }
        object actionList = null;
        lock (字典锁)
        {
            if (eventDataAndActionHandlerDic.ContainsKey(typeof(TEventData)))
            {
                actionList = eventDataAndActionHandlerDic[typeof(TEventData)];
            }
        }
        if(actionList != null)
        {
            ((WeakEvent<TEventData>)actionList).Invoke(eventData);
        }
    }
}

EventBus和之前基本差不多,仅将List<Action<TEventData>>替换成WeakEvent<TEventData>

3.3 用起来

业务代码是一样的,结果就是销毁电视台就真的销毁了,再播报也不会显示天气了。