RxJava取消订阅的各种方式的实现

手动取消订阅

Consumer类型

Observable创建返回disposable取消

public class SecondActivity extends AppCompatActivity {

  private static final String TAG = "SecondActivity";
  private disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_second);
    disposable = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
          e.printstacktrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.d(TAG,"accept: "+s);
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG,"onDestroy: ");
    //取消订阅
    if(disposable != null && !disposable.isdisposed()){
      disposable.dispose();
      Log.d(TAG,"onDestroy: dispose");
    }
  }
}

普通类型Observer

在Observer中获取disposable然后取消

public class ThirdActivity extends AppCompatActivity {
  private static final String TAG = "ThirdActivity";
  disposable disposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_third);
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printstacktrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(disposable d) {
            disposable = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG,"onNext: "+s);
          }

          @Override
          public void onError(Throwable e) {
            Log.d(TAG,"onError: ");
          }

          @Override
          public void onComplete() {
            Log.d(TAG,"onComplete: ");
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Log.d(TAG,"onDestroy: ");
    //然后在需要取消订阅的地方调用即可
    if (disposable != null && !disposable.isdisposed()) {
      Log.d(TAG,"dispose: ");
      disposable.dispose();
    }
  }
}

disposableObserver类型

利用disposableObserver和SubscribeWith直接返回disposable,然后取消

public class FourthActivity extends AppCompatActivity {
  private static final String TAG = "FourthActivity";
  private disposableObserver<String> observer;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_fourth);
    observer = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printstacktrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(new disposableObserver<String>() {
      @Override
      public void onNext(String o) {
        Log.d(TAG,"onNext: "+o);
      }

      @Override
      public void onError(Throwable e) {
        Log.d(TAG,"onError: ");
      }

      @Override
      public void onComplete() {
        Log.d(TAG,"onComplete: ");
      }
    });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    if (observer != null && !observer.isdisposed()) {
      Log.d(TAG,"dispose: ");
      observer.dispose();
    }
  }
}

取消多个Observer

把多个Observer添加Compositedisposable,一次取消

public class ComdisposableActivity extends AppCompatActivity {

  private disposable disposable1;
  private disposable disposable2;
  private static final String TAG = "ComdisposableActivity";
  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_com_disposable);
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printstacktrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOndispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG,"run: Unsubscribing subscription from onCreate()");
          }
        })
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(disposable d) {
            disposable1 = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG,"onComplete: ");
          }
        });
    Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        try {
          Thread.sleep(5000);
          emitter.onNext("testInfo");
        } catch (InterruptedException e) {
          e.printstacktrace();
        }
      }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(disposable d) {
            disposable2 = d;
          }

          @Override
          public void onNext(String s) {
            Log.d(TAG,"onComplete: ");
          }
        });
  }

  @Override
  protected void onDestroy() {
    super.onDestroy();
    Compositedisposable compositedisposable = new Compositedisposable();
    //批量添加
    compositedisposable.add(disposable1);
    compositedisposable.add(disposable2);
    //最后一次性全部取消订阅
    compositedisposable.dispose();
  }
}

RxLifecyle取消

OnDestory取消

Observable.interval(1,TimeUnit.SECONDS)
        .doOndispose(new Action() {
          @Override
          public void run() throws Exception {
            Log.d(TAG,"Unsubscribing bindToLifecycle from onDestroy()");
          }
        })
        .compose(this.<Long>bindToLifecycle())
        .subscribe(new Consumer<Long>() {
          @Override
          public void accept(Long num) throws Exception {
            Log.d(TAG,"accept: " + num);
          }
        });

指定生命周期取消

Observable.interval(1,"Unsubscribing UbindUntilEvent from onPause()");
          }
        }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
        .subscribe(new Consumer<Long>() {
          @Override
          public void accept(Long aLong) throws Exception {
            Log.d(TAG,"bindUntilEvent accept: " + aLong);
          }
        });

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

相关文章

Android性能优化——之控件的优化 前面讲了图像的优化,接下...
前言 上一篇已经讲了如何实现textView中粗字体效果,里面主要...
最近项目重构,涉及到了数据库和文件下载,发现GreenDao这个...
WebView加载页面的两种方式 一、加载网络页面 加载网络页面,...
给APP全局设置字体主要分为两个方面来介绍 一、给原生界面设...
前言 最近UI大牛出了一版新的效果图,按照IOS的效果做的,页...