Observable novel = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("连载1"); emitter.onNext("连载2"); emitter.onNext("连载3"); emitter.onComplete(); } });
//Observer who observe something intresested
Observer<String> reader=new Observer<String>() { @Override public void onSubscribe(disposable d) { mdisposable=d; Log.e(TAG,"onSubscribe"); } @Override public void onNext(String value) { if ("2".equals(value)){ mdisposable.dispose(); return; } Log.e(TAG,"onNext:"+value); } @Override public void onError(Throwable e) { Log.e(TAG,"onError="+e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"onComplete()"); } };
//something observable is Now subscribed by observer
novel.subscribe(reader);//一行代码搞定
Observable.from(folders) .flatMap(new Func1<File,Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File,Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File,Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});