Reactive X 是 Reactive Extensions 的缩写,一般简写为 Rx,最初是 LINQ 的一个扩展,由微软的团队开发,Rx 是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,支持大部分流行语言。
Rx是一个函数库,让开发者可以利用可观察序列和 LINQ 风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables 表示异步数据流,用 LINO 操作符查询异步数据流,用 Schedulers 参数化异步数据流的并发处理,Rx可以这样定义︰
Rx = Observables + LINQ + Schedulers
Rx 结合了观察者模式、迭代器模式和函数式编程的精华。UniRx 就是 Unity Reactive Extensions,是为 Unity 平台设计的响应式编程框架。
在游戏中,⼤部分的逻辑都是在时间上异步的。⽐如动画的播放、声⾳的播放、⽹络请求、资源加载/卸载、场景过渡等。
在实现 异步逻辑时经常⽤到⼤量的回调,最终随着项⽬的扩张导致传说中的”回调地狱”。
相对较好地⽅法则是使⽤消息/事件的发送,结果导致“消息满天⻜”,导致代码⾮常难以阅读。
⽽ UniRx 的出现刚好解决了这个问题,它介于回调和事件之间。
常用API
监听 mono 生命周期函数
Observable.EveryUpdate() // 开启 Update 的事件监听
.Subscribe(_ => // 订阅/处理事件
{
Debug.Log("Update");
});
Observable.EveryLateUpdate()
.Subscribe(_ =>
{
Debug.Log("LateUpdate");
});
Observable.EveryFixedUpdate()
.Subscribe(_ =>
{
Debug.Log("FixedUpdate");
});
Observable.EveryEndOfFrame()
.Subscribe(_ =>
{
Debug.Log("EndOfFrame");
});
Observable.EveryApplicationFocus()
.Subscribe(focuse =>
{
Debug.Log("Focus: " + focuse);
});
如果在 Update ⽅法中,除了实现⿏标事件监听这个功能之外,还要实现其他的功能。那么 Update ⾥就会充斥着⼤量的状态判断等逻辑。代码⾮常不容易阅读。
⽽ UniRx 提供了⼀种编程思维,使得平时⼀些⽐较难实现的异步逻辑,使⽤ UniRx 轻松搞定,并且不失代码的可读性
参数下划线 _ 表示帧数,这里用不到就用 _ 表示
定时功能
Observable.Timer(TimeSpan.FromSeconds(5.0f))
.Subscribe(_ =>
{
Debug.Log("do something");
});
AddTo 与 Trigger
Observable.Timer(TimeSpan.FromSeconds(2.0f))
.Subscribe(_ =>
{
Debug.Log("延时两秒");
})
.AddTo(this);
this.OnDestroyAsObservable()
.Subscribe(_ =>
{
Debug.Log("OnDestroy");
});
this.OnCollisionEnterAsObservable()
.Subscribe(collision =>
{
Debug.Log("CollisionEnter");
});
AddTo 会在 GameObject 上添加一个 ObservableDestroyTrigger 脚本监听它的 OnDestroy 事件,当 GameObject 销毁时也会销毁正在运行的 UniRx 任务,即 GameObject 销毁时,这个 Timer 也会销毁,避免空引用异常。
ObservableDestroyTrigger 是一个 Trigger,Trigger 大部分都是都是 XXXAsObsrevable 命名形式的。
在使用 Trigger 的 GameObject 上都会挂上对应的 ObservableXXXTrigger 的脚本,AddTo 这个 API 就是封装了 ObservableDestroyTrigger
Where,First 过滤
Observable.EveryUpdate()
.Where(_ => Input.GetMouseButtonDown(0)) //进行一个鼠标是否点击的判断
.First() //只获取第一次点击事件
.Subscribe(_ =>
{
Debug.Log("left mouse clicked");
});
//也可以写成这样
Observable.EveryUpdate()
.First(_ => Input.GetMouseButtonDown(0)) //只获取第一次点击事件
.Subscribe(_ =>
{
Debug.Log("left mouse clicked");
});
EveryUpdate 是事件的发布者。他会每帧会发送⼀个事件过来。
Subscribe 是事件的接收者,接收的是 EveryUpdate 发送的事件。
Where 则是在事件的发布者和接收者之间的⼀个过滤操作。会过滤掉不满⾜条件的事件
First 又进行了一个过滤
Merge
在 UniRx 世界中,任何东西都是以事件流的形式存在,EveryUpdate 和 Timer 都是开启一条事件流。UniRx 可以开启多条事件流,然后使用 Merge 合并。
private void Start()
{
var leftClickEvents = Observable.EveryUpdate().Where(_ => Input.GetMouseButtonDown(0));
var rightClickEvents = Observable.EveryUpdate().Where(_ => Input.GetMouseButtonDown(1));
//点击左键或右键都会触发
Observable.Merge(leftClickEvents, rightClickEvents)
.Subscribe(_ =>
{
Debug.Log("mouse clicked");
});
}
WhenAll
当所有事件流都结束后触发
IEnumerator A()
{
yield return new WaitForSeconds(1.0f);
Debug.Log("A");
}
IEnumerator B()
{
yield return new WaitForSeconds(2.0f);
Debug.Log("B");
}
void Start()
{
var aStream = Observable.FromCoroutine(_ => A());
var bStream = Observable.FromCoroutine(_ => B());
Observable.WhenAll(aStream, bStream)
.Subscribe(_ =>
{
Debug.Log("全部处理完");
});
}
Start
开启线程
void Start()
{
var threadAStream = Observable.Start(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(1));
return 10;
});
var threadBStream = Observable.Start(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(3));
return 10;
});
Observable.WhenAll(threadAStream, threadBStream)
.ObserveOnMainThread() //转到主线程
.Subscribe(results =>
{
Debug.LogFormat("{0}:{1}", results[0], results[1]);
});
}
UGUI 支持
public Button button;
public Toggle toggle;
public Image image;
public Slider slider;
public InputField inputField;
void Start()
{
button.OnClickAsObservable()
.Subscribe(_ =>
{
Debug.Log("button clicked");
});
toggle.OnValueChangedAsObservable()
.Subscribe(on =>
{
Debug.LogFormat("toggle value changed: {0}", on);
});
image.OnDragAsObservable()
.Subscribe(_ =>
{
Debug.Log("dragging");
});
slider.OnValueChangedAsObservable()
.Subscribe(value =>
{
Debug.Log("slider: " + value);
});
inputField.OnValueChangedAsObservable()
.Subscribe(value =>
{
Debug.Log("input: " + value);
});
inputField.OnEndEditAsObservable()
.Subscribe(value =>
{
Debug.Log("input: " + value);
});
}
查看源码,button.onClick 是 ButtonClickedEvent 类型,而 ButtonClickedEvent 继承自 UnityEvent, button.OnClickAsObservable 是对点击事件进行了封装,同理 UniRx 对其他组件的事件进行了封装
public static IObservable<Unit> OnClickAsObservable(this Button button)
{
return button.onClick.AsObservable();
}
public static IObservable<Unit> AsObservable(this UnityEngine.Events.UnityEvent unityEvent)
{
return Observable.FromEvent<UnityAction>(h => new UnityAction(h), h => unityEvent.AddListener(h), h => unityEvent.RemoveListener(h));
}
public static IObservable<Unit> FromEvent<TDelegate>(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
{
return new FromEventObservable<TDelegate>(conversion, addHandler, removeHandler);
}
绑定两个组件
public Toggle mToggle;
public Button mButton;
public Slider mSlider;
public Text mText;
void Start()
{
// 当 mToggle.isOn = true 时,mButton.interactable = true
mToggle.OnValueChangedAsObservable().SubscribeToInteractable(mButton);
// 滑动条变化时更新文本
mSlider.OnValueChangedAsObservable()
.SubscribeToText(mText, x => x.ToString());
}
Select
Select 是转换操作符,用于将一个事件源中的值转换成另一个类型的值
public Button buttonA;
public Button buttonB;
private void Start()
{
// Select将空参数Unit转换为string
buttonA.OnClickAsObservable()
.Select(_ => "A")
.Subscribe(btnId =>
{
Debug.LogFormat("button {0} clicked", btnId);
});
buttonB.OnClickAsObservable()
.Select(_ => "B")
.Subscribe(btnId =>
{
Debug.LogFormat("button {0} clicked", btnId);
});
}
public ReactiveCollection<int> IntList = new ReactiveCollection<int>();
void Start()
{
// Select 把每个值进行平方
IntList.ObserveAdd()
.Select(x => x.Value * x.Value)
.Subscribe(value =>
{
Debug.Log("add:" + value);
});
// 添加后触发事件
IntList.Add(10);
}
ReactiveProperty(响应式属性)
监听属性的变化,发送通知
// 0 是默认值,可序列化展示在面板上
public IntReactiveProperty Age = new IntReactiveProperty(0);
// 泛型写法,序列化很麻烦
public ReactiveProperty<string> Name = new ReactiveProperty<string>("Tom");
void Start()
{
Age.Subscribe(age =>
{
Debug.Log("inner received age changed");
});
// 赋值后触发事件
Age.Value = 10;
}
ReactiveCollection(响应式集合)
ReactiveCollection 类似于 List,监听集合的变化,发送通知
public ReactiveCollection<int> IntList = new ReactiveCollection<int>();
void Start()
{
IntList.ObserveAdd().Subscribe(value => Debug.Log("add:" + value));
IntList.ObserveRemove().Subscribe(value => Debug.Log("remove:" + value));
IntList.ObserveCountChanged().Subscribe(count => Debug.Log("count:" + count));
// 添加后触发事件
IntList.Add(10);
}
ReactiveDictionary(响应式字典)
ReactiveDictionary<string, string> mLanguageCode = new ReactiveDictionary<string, string>
{
{"cn","中文"},
{"en","英文"}
};
void Start()
{
mLanguageCode.ObserveAdd().Subscribe(addedLanguage => Debug.LogFormat("add:{0}", addedLanguage));
mLanguageCode.ObserveRemove().Subscribe(removedLanguage => Debug.LogFormat("remove:{0}", removedLanguage));
mLanguageCode.ObserveCountChanged().Subscribe(count => Debug.LogFormat("count:{0}", count));
mLanguageCode.Add("jp", "日文");
mLanguageCode.Remove("en");
}
MVP
View 依赖于 Presenter:View通过Presenter来获取数据并处理用户输入。
Presenter 依赖于 View 和 Model:负责处理View和Model之间的交互逻辑。
Model 不依赖于 View 或 Presenter:Model是独立的业务逻辑和数据层,它只负责数据的存储、处理和业务逻辑的实现。当Model的状态发生变化时,它可能会通过回调或事件通知Presenter。
// Presenter
public class EnemyExample : MonoBehaviour
{
// View
public Button attackBtn;
public Text HPText;
EnemyModel mEnemy = new EnemyModel(200);
void Start()
{
// UGUI组件和响应式属性绑定
attackBtn.OnClickAsObservable()
.Subscribe(_ =>
{
mEnemy.HP.Value -= 99;
});
mEnemy.HP.SubscribeToText(HPText);
mEnemy.IsDead
.Where(isDead => isDead)
.Select(isDead => !isDead)
.SubscribeToInteractable(attackBtn);
}
}
// Model
public class EnemyModel
{
public ReactiveProperty<long> HP;
public IReadOnlyReactiveProperty<bool> IsDead;
public EnemyModel(long initialHP)
{
HP = new ReactiveProperty<long>(initialHP);
IsDead = HP.Select(hp => hp <= 0).ToReactiveProperty();
}
}
ReactiveCommand
用法类似命令模式,ReactiveCommand 实现 IReactiveCommand<T> 接口
public interface IReactiveCommand<T> : IObservable<T>
{
IReadOnlyReactiveProperty<bool> CanExecute { get; } //内部使用,外部只读
bool Execute(T parameter); //外部调用的
}
当 CanExecute 为 true 时,调用 Execute,Command 才会执行,默认 CanExecute 为 true
void Start()
{
// 默认 CanExecute 为 true
var reactiveCommand = new ReactiveCommand();
reactiveCommand.Subscribe(_ =>
{
Debug.Log("每次Execute成功调用后执行");
});
reactiveCommand.Execute();
reactiveCommand.Execute();
}
泛型作为参数
void Start()
{
var reactiveCommand = new ReactiveCommand<int>();
reactiveCommand.Where(x => x % 2 == 0).Subscribe(x => Debug.LogFormat("{0}是偶数", x));
reactiveCommand.Where(x => x % 2 != 0).Timestamp().Subscribe(x => Debug.LogFormat("{0}是奇数,{1}", x.Value, x.Timestamp));
reactiveCommand.Execute(10);
reactiveCommand.Execute(11);
}
设置事件源
private void Start()
{
var mouseDownStream = Observable.EveryUpdate().Where(_ => Input.GetMouseButtonDown(0)).Select(_ => true);
var mouseUpStream = Observable.EveryUpdate().Where(_ => Input.GetMouseButtonUp(0)).Select(_ => false);
var isMouseUp = Observable.Merge(mouseUpStream, mouseDownStream);
// 设置事件源和 CanExecute 为 false
var reactiveCommand = new ReactiveCommand(isMouseUp, false);
reactiveCommand.Subscribe(_ =>
{
Debug.Log("鼠标按下");
});
Observable.EveryUpdate().Subscribe(_ =>
{
reactiveCommand.Execute();
});
}
AsyncOperation(异步操作)
加载资源
void Start()
{
Resources.LoadAsync<GameObject>("资源名称").AsAsyncOperationObservable()
.Subscribe(resourceRequest =>
{
Instantiate(resourceRequest.asset);
});
}
加载场景
void Start()
{
// 加载进度
var progressObservable = new ScheduledNotifier<float>();
// 加载 Build Settings 中第 0 个场景
SceneManager.LoadSceneAsync(0).AsAsyncOperationObservable(progressObservable)
.Subscribe(_ =>
{
Debug.Log("load done");
});
progressObservable.Subscribe(progress =>
{
Debug.LogFormat("加载了:{0}%", progress * 100);
});
}
类 LINQ 操作符
Distinct
Distinct 意思是清晰的,不同的,用于查询不重复的结果集
List<string> list = new List<string>
{
"张三", "张三", "李四"
};
list.ToObservable()
.Distinct()
.Subscribe(name =>
{
Debug.Log(name);
});
Last
取列表最后一个元素,和 First 相反
class Student
{
public string Name;
public int Age;
}
void Start()
{
List<Student> students = new List<Student>
{
new Student{ Name = "张三", Age = 10 },
new Student{ Name = "张三", Age = 15 },
new Student{ Name = "李四", Age = 21 },
};
students.ToObservable()
.Last(student => student.Name == "张三")
.Subscribe(student =>
{
Debug.Log(student.Age);
});
}
SelectMany
将一个序列中的每个元素投影到另一个序列,并将这些序列合并为一个单一的序列。具体来说,它会对每个元素进行遍历处理,然后将结果序列合并起来
List<string> list = new List<string>
{
"123", "456"
};
list.ToObservable()
.SelectMany(c => c)
.Subscribe(c =>
{
Debug.Log(c);
});
输出
1
2
3
4
5
6
还可以实现协程的顺序执行
void Start()
{
var aStream = Observable.FromCoroutine(A);
var bStream = Observable.FromCoroutine(B);
aStream.SelectMany(bStream)
.Subscribe(_ => Debug.Log("A,B结束"));
}
IEnumerator A()
{
yield return new WaitForSeconds(1f);
Debug.Log("A");
}
IEnumerator B()
{
yield return new WaitForSeconds(1f);
Debug.Log("B");
}
未完待续