#实现自己的操作符
您可以实现自己的 Observable 操作符。本页将向您展示如何操作。
如果您的操作符旨在生成一个 Observable,而不是转换或响应源 Observable,请使用 create( ) 方法,而不是尝试手动实现 Observable。否则,请遵循以下说明。
以下示例展示了如何使用 lift( ) 操作符将自定义操作符(在本例中为:myOperator)与标准 RxJava 操作符链接
Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});
以下部分将展示如何构建操作符的框架,使其能够与 lift( ) 正确配合使用。
将您的操作符定义为一个实现 Operator 接口的公共类,如下所示
public class myOperator<T> implements Operator<T> {
public myOperator( /* any necessary params here */ ) {
/* any necessary initialization here */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* add your own onCompleted behavior here, or just pass the completed notification through: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* add your own onError behavior here, or just pass the error notification through: */
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* this example performs some sort of simple transformation on each incoming item and then passes it along */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
isUnsubscribed( ) 状态。不要浪费时间生成订阅者不感兴趣的项目。onNext( ) 方法,但这些调用必须是非重叠的。onCompleted( ) 或 onError( ) 方法中的一个,但不能同时调用,并且只能调用一次,并且之后不能再调用订阅者的 onNext( ) 方法。serialize( ) 操作符以强制执行正确的行为。onError( ) 调用通知订阅者。