#实现自己的操作符
您可以实现自己的 Observable 操作符。本页将向您展示如何操作。
如果您的操作符旨在生成一个 Observable,而不是转换或响应源 Observable,请使用 create( )
方法,而不是尝试手动实现 Observable
。否则,请遵循以下说明。
以下示例展示了如何使用 lift( )
操作符将自定义操作符(在本例中为:myOperator
)与标准 RxJava 操作符链接
Observable foo = barObservable.ofType(Integer).map({it*2}).lift(new myOperator<T>()).map({"transformed by myOperator: " + it});
以下部分将展示如何构建操作符的框架,使其能够与 lift( )
正确配合使用。
将您的操作符定义为一个实现 Operator
接口的公共类,如下所示
public class myOperator<T> implements Operator<T> {
public myOperator( /* any necessary params here */ ) {
/* any necessary initialization here */
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> s) {
return new Subscriber<t>(s) {
@Override
public void onCompleted() {
/* add your own onCompleted behavior here, or just pass the completed notification through: */
if(!s.isUnsubscribed()) {
s.onCompleted();
}
}
@Override
public void onError(Throwable t) {
/* add your own onError behavior here, or just pass the error notification through: */
if(!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onNext(T item) {
/* this example performs some sort of simple transformation on each incoming item and then passes it along */
if(!s.isUnsubscribed()) {
transformedItem = myOperatorTransformOperation(item);
s.onNext(transformedItem);
}
}
};
}
}
isUnsubscribed( )
状态。不要浪费时间生成订阅者不感兴趣的项目。onNext( )
方法,但这些调用必须是非重叠的。onCompleted( )
或 onError( )
方法中的一个,但不能同时调用,并且只能调用一次,并且之后不能再调用订阅者的 onNext( )
方法。serialize( )
操作符以强制执行正确的行为。onError( )
调用通知订阅者。