登录
首页 >  文章 >  java教程

RxJava2.x中如何优雅地在Observable的onComplete后取消订阅?

时间:2024-12-04 12:40:03 155浏览 收藏

珍惜时间,勤奋学习!今天给大家带来《RxJava2.x中如何优雅地在Observable的onComplete后取消订阅?》,正文内容主要涉及到等等,如果你正在学习文章,或者是对文章有疑问,欢迎大家关注我!后面我会持续更新相关内容的,希望都能帮到正在学习的大家!

RxJava2.x中如何优雅地在Observable的onComplete后取消订阅?

rxjava的observable订阅时在oncomplete被调用时取消订阅

在非android环境中使用rxjava2.x时,如果需要在observable的oncomplete被调用时取消订阅,可以采用以下方法:

在oncomplete中设置completablefuture.complete通知调用方已结束:

table.subscribe(tableins -> {
            // system.out.println("-------information-------");
            system.out.println(tableins);
        }, throwable -> {
            throw new schemaexportexception(throwable);
        }, new action() {
            @override
            public void run() throws exception {
                system.out.println("complete");
                // 在这里取消订阅
                completablefuture.complete();
            }
        });

调用方可以根据completablefuture来判断observable是否已完成:

long startStamp = System.currentTimeMillis();
// Flowable
Flowable<Table> tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function<Table, Publisher<Table>>() {
            @Override
            public Publisher<Table> apply(@NonNull Table table) throws Exception {
                return result.getTableColumn(table).flatMap(new Function<List<Column>, SingleSource<Table>>() {
                    @Override
                    public SingleSource<Table> apply(@NonNull List<Column> columns) throws Exception {
                        return Single.just(table.fillColumn(columns));
                    }
                }).flatMapPublisher(new Function<Table, Publisher<? extends Table>>() {
                    @Override
                    public Publisher<? extends Table> apply(@NonNull Table table) throws Exception {
                        return Flowable.just(table);
                    }
                });
            }
        });
Disposable disposable = null;
try {
            disposable = out.flush(info, tableFlowable);
            CompletableFuture<String> future = out.getFuture();
            while (!future.isDone()) {
                logger.info("[ERE-Flowable]未完成,线程休眠1秒");
                Thread.currentThread().sleep(1000, 0);
            }
            String result = future.get();
            logger.info("[ERE-Flowable]完成, 结果:" + result);
            if (result.equals("OK")) {
                long finishStamp = System.currentTimeMillis();
                clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: " + (finishStamp - startStamp));
            }
        } catch (Exception e) {
            clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: " + e.getMessage());
        }

今天关于《RxJava2.x中如何优雅地在Observable的onComplete后取消订阅?》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!

相关阅读
更多>
最新阅读
更多>
课程推荐
更多>