登录
首页 >  文章 >  java教程

通过异步和非阻塞架构实现 Java 整体现代化以获得更好的性能

来源:dev.to

时间:2024-11-23 15:49:30 456浏览 收藏

从现在开始,努力学习吧!本文《通过异步和非阻塞架构实现 Java 整体现代化以获得更好的性能》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!

通过异步和非阻塞架构实现 Java 整体现代化以获得更好的性能

在最近的一个项目中,我对用 dropwizard 编写的老化的整体 java web 服务进行了现代化改造。该服务通过 aws lambda 函数处理许多第三方 (3p) 依赖项,但由于架构的同步、阻塞性质,性能滞后。该设置的 p99 延迟为 20 秒,在等待无服务器功能完成时阻塞请求线程。这种阻塞导致线程池饱和,导致流量高峰时请求频繁失败。

识别性能瓶颈

问题的症结是每个对 lambda 函数的请求都会占用 java 服务中的一个请求线程。由于这些 3p 函数通常需要相当长的时间才能完成,因此处理它们的线程将保持阻塞状态,从而消耗资源并限制可扩展性。以下是此阻塞行为在代码中的示例:

// blocking code example
public string calllambdaservice(string payload) {
    string response = externallambdaservice.invoke(payload);
    return response;
}

在此示例中,calllambdaservice 方法会等待,直到 externallambdaservice.invoke() 返回响应。同时,没有其他任务可以使用该线程。

解决方案:迁移到异步、非阻塞模式

为了解决这些瓶颈,我使用异步和非阻塞方法重新构建了服务。此更改涉及使用调用 lambda 函数的 http 客户端来使用 org.asynchttpclient 库中的 asynchttpclient,该库在内部使用 eventloopgroup 异步处理请求。

使用 asynchttpclient 有助于卸载阻塞操作,而无需消耗池中的线程。以下是更新后的非阻塞调用的示例:

// non-blocking code example
public completablefuture<string> calllambdaserviceasync(string payload) {
    return completablefuture.supplyasync(() -> {
        return asynchttpclient.invoke(payload);
    });
}

利用 java 的 completablefuture 来链接异步调用

除了使单个调用成为非阻塞之外,我还使用 completablefuture 链接了多个依赖项调用。使用 thencombine 和 thenapply 等方法,我可以异步获取并组合来自多个源的数据,从而显着提高吞吐量。

completablefuture<string> future1 = calllambdaserviceasync(payload1);
completablefuture<string> future2 = calllambdaserviceasync(payload2);

completablefuture<string> combinedresult = future1.thencombine(future2, (result1, result2) -> {
    return processresults(result1, result2);
});

使用自定义 safeasyncresponse 类引入类型安全

在实现过程中,我观察到 java 的默认 asyncresponse 对象缺乏类型安全性,允许传递任意 java 对象。为了解决这个问题,我创建了一个带有泛型的 safeasyncresponse 类,它确保只能返回指定的响应类型,从而提高可维护性并降低运行时错误的风险。如果多次写入响应,此类还会记录错误。

public class safeasyncresponse<t> {
    private static final logger logger = logger.getlogger(safeasyncresponse.class.getname());
    private final asyncresponse asyncresponse;
    private final atomicinteger invocationcount = new atomicinteger(0);

    private safeasyncresponse(asyncresponse asyncresponse) {
        this.asyncresponse = asyncresponse;
    }

    /**
     * factory method to create a safeasyncresponse from an asyncresponse.
     *
     * @param asyncresponse the asyncresponse to wrap
     * @param <t>           the type of the response
     * @return a new instance of safeasyncresponse
     */
    public static <t> safeasyncresponse<t> from(asyncresponse asyncresponse) {
        return new safeasyncresponse<>(asyncresponse);
    }

    /**
     * resume the async response with a successful result.
     *
     * @param response the successful response of type t
     */
    public void withsuccess(t response) {
        if (invocationcount.incrementandget() > 1) {
            logerror("withsuccess");
            return;
        }
        asyncresponse.resume(response);
    }

    /**
     * resume the async response with an error.
     *
     * @param error the throwable representing the error
     */
    public void witherror(throwable error) {
        if (invocationcount.incrementandget() > 1) {
            logerror("witherror");
            return;
        }
        asyncresponse.resume(error);
    }

    /**
     * logs an error message indicating multiple invocations.
     *
     * @param methodname the name of the method that was invoked multiple times
     */
    private void logerror(string methodname) {
        logger.severe(() -> string.format(
            "safeasyncresponse method '%s' invoked more than once. ignoring subsequent invocations.", methodname
        ));
    }
}

safeasyncresponse 的示例用法

@get
@path("/example")
public void exampleendpoint(@suspended asyncresponse asyncresponse) {
    safeasyncresponse<string> saferesponse = safeasyncresponse.from(asyncresponse);

    // simulate success
    saferesponse.withsuccess("operation successful!");

    // simulate multiple invocations (only the first one will be processed)
    saferesponse.witherror(new runtimeexception("this should not be processed"));
    saferesponse.withsuccess("this will be ignored");
}

测试和性能提升

为了验证这些更改的有效性,我使用虚拟线程编写了负载测试来模拟单台计算机上的最大吞吐量。我生成了不同级别的无服务器函数执行时间(范围从 1 到 20 秒),发现新的异步非阻塞实现在执行时间较短时将吞吐量提高了 8 倍,在执行时间较长时吞吐量提高了约 4 倍。

在设置这些负载测试时,我确保调整客户端级别的连接限制以最大化吞吐量,这对于避免异步系统中的瓶颈至关重要。

发现 http 客户端中的隐藏错误

在运行这些压力测试时,我在我们的自定义 http 客户端中发现了一个隐藏的错误。客户端使用连接超时设置为 integer.max_value 的信号量,这意味着如果客户端用完可用连接,它将无限期地阻塞线程。解决此错误对于防止高负载场景中潜在的死锁至关重要。

虚拟线程和传统异步代码之间的选择

人们可能想知道为什么我们不简单地切换到虚拟线程,虚拟线程可以通过允许线程阻塞而不需要大量的资源成本来减少对异步代码的需求。然而,虚拟线程当前存在一个限制:它们在同步操作期间被固定。这意味着当虚拟线程进入同步块时,它无法卸载,可能会阻塞操作系统资源,直到操作完成。

例如:

synchronized byte[] getData() {
    byte[] buf = ...;
    int nread = socket.getInputStream().read(buf);  // Can block here
    ...
}

在此代码中,如果由于没有可用数据而导致读取阻塞,则虚拟线程将被固定到操作系统线程,从而防止其卸载并阻塞操作系统线程。

幸运的是,随着 jep 491 的出现,java 开发人员可以期待虚拟线程行为的改进,其中可以更有效地处理同步代码中的阻塞操作,而不会耗尽平台线程。

结论

通过将我们的服务重构为异步非阻塞架构,我们实现了显着的性能改进。通过实现 asynchttpclient、引入 safeasyncresponse 来实现类型安全以及进行负载测试,我们能够优化 java 服务并极大提高吞吐量。该项目是单体应用程序现代化方面的一次有价值的实践,并揭示了适当的异步实践对可扩展性的重要性。

随着 java 的发展,我们将来也许能够更有效地利用虚拟线程,但就目前而言,异步和非阻塞架构仍然是高延迟、依赖第三方的服务中性能优化的重要方法。

以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于文章的相关知识,也可关注golang学习网公众号。

声明:本文转载于:dev.to 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>