登录
首页 >  文章 >  php教程

并发循环中使用AmpPromises全解析

时间:2025-09-27 10:39:31 148浏览 收藏

在使用Amp框架进行并发编程,尤其是在循环中使用Promises时,你是否遇到了Promise不返回或程序hang住的问题?本文以一个实际的下载场景为例,深入解析了如何在并发循环中正确使用`Amp\Promise\all()`,避免Promise执行上下文错误。我们将提供一种有效的解决方案,即通过`Amp\call()`确保循环代码在正确的协程上下文中执行,从而解决Promise不返回或抛出异常的难题。阅读本文,你将全面掌握Amp Promises在并发循环中的应用技巧,优化你的异步编程模型,提升应用性能。

并发循环中的 Amp Promises 使用指南

本文将解决在使用 Amp 框架进行并发编程时,特别是在循环中使用 Promises 时遇到的问题。通过一个实际的下载场景示例,展示了如何正确地使用 Amp\Promise\all() 来并发执行多个 Promise,并提供了一种解决 Promise 在循环中不返回或抛出异常的方案,帮助开发者更好地理解和应用 Amp 的异步编程模型。

在使用 Amp 框架进行异步编程时,经常会遇到需要在循环中并发执行多个任务的场景。例如,从多个 URL 下载数据,或者处理大量数据块。然而,直接在循环中使用 Amp\Promise\all() 可能会遇到一些问题,例如 Promise 不返回结果或者没有抛出异常,导致程序hang住。

下面的示例代码模拟了一个下载场景,将一个大文件分割成多个 chunk,然后并发下载这些 chunk:

<?php

use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;

class Downloader {

    private $chunkCount = 10;
    private $chunkSize = 1024;
    private $connections = 3;
    private $size = 10240;
    private $path = '/tmp';

    public function __construct(private $app) {}

    /**
     * Start Download
     *
     * @return void
     */
    private function download() {
        $chunks = [];
        for ($i = 0; $i < $this->chunkCount + 20; $i++) {
            $start = $i * $this->chunkSize;
            $end = ($i + 1) * $this->chunkSize;

            if ($i == $this->chunkCount - 1) {
                $end = $this->size;
            }

            $chunks[] = (object) ['id' => ($i + 1), 'start' => $start, 'end' => $end, 'path' => $this->path . "/" . $i];
        }

        $chunkedChunks = array_chunk($chunks, $this->connections);

        foreach ($chunkedChunks as $key => $chunkedChunk) {
            $urls = [
                'https://secure.php.net',
                'https://amphp.org',
                'https://github.com',
            ];

            $promises = [];
            foreach ($urls as $url) {
                $promises[$url] = \Amp\call(function() use ($url) {
                    $deferred = new Deferred();

                    Loop::delay(3 * 1000, function () use ($url, $deferred) {
                        $deferred->resolve($url);
                    });

                    return $deferred->promise();
                });
            }

            $responses = yield Promise\all($promises);

            foreach ($responses as $url => $response) {
                \printf("Read %d bytes from %s\n", \strlen($response), $url);
            }
        }
    }

    public function runDownload() {
        Loop::run(function () {
            yield $this->download();
        });
    }
}

// 使用示例
Loop::run(function () {
    $downloader = new Downloader(null);
    yield \Amp\call(function() use ($downloader) {
        $downloader->download();
    });
});

在这个例子中,我们首先将要下载的内容分割成多个 chunk,然后将这些 chunk 分组,每组包含一定数量的 chunk,数量由 $this->connections 决定。然后,我们遍历这些分组,为每个分组创建一组 Promise,并使用 Amp\Promise\all() 来并发执行这些 Promise。

问题分析与解决方案

最初的代码中,yield Amp\Promise\all($promises) 并没有返回任何结果,也没有抛出任何异常。这可能是因为 Promise 的执行上下文出现问题。

解决方案是将整个 foreach 循环块包裹在一个 Amp\call() 中,这样可以确保循环中的代码在正确的 Amp 协程上下文中执行。修改后的代码如下:

<?php

use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;

class Downloader {

    private $chunkCount = 10;
    private $chunkSize = 1024;
    private $connections = 3;
    private $size = 10240;
    private $path = '/tmp';

    public function __construct(private $app) {}

    /**
     * Start Download
     *
     * @return void
     */
    private function download() {
        $chunks = [];
        for ($i = 0; $i < $this->chunkCount + 20; $i++) {
            $start = $i * $this->chunkSize;
            $end = ($i + 1) * $this->chunkSize;

            if ($i == $this->chunkCount - 1) {
                $end = $this->size;
            }

            $chunks[] = (object) ['id' => ($i + 1), 'start' => $start, 'end' => $end, 'path' => $this->path . "/" . $i];
        }

        $chunkedChunks = array_chunk($chunks, $this->connections);

        yield \Amp\call(function() use ($chunkedChunks) {
            foreach ($chunkedChunks as $key => $chunkedChunk) {
                $urls = [
                    'https://secure.php.net',
                    'https://amphp.org',
                    'https://github.com',
                ];

                $promises = [];
                foreach ($urls as $url) {
                    $promises[$url] = \Amp\call(function() use ($url) {
                        $deferred = new Deferred();

                        Loop::delay(3 * 1000, function () use ($url, $deferred) {
                            $deferred->resolve($url);
                        });

                        return $deferred->promise();
                    });
                }

                $responses = yield Promise\all($promises);

                foreach ($responses as $url => $response) {
                    \printf("Read %d bytes from %s\n", \strlen($response), $url);
                }
            }
        });
    }

    public function runDownload() {
        Loop::run(function () {
            yield $this->download();
        });
    }
}

// 使用示例
Loop::run(function () {
    $downloader = new Downloader(null);
    yield \Amp\call(function() use ($downloader) {
        $downloader->download();
    });
});

注意事项与总结

  • 协程上下文: 在使用 Amp 进行并发编程时,务必确保代码在正确的协程上下文中执行。Amp\call() 可以用来创建一个新的协程,并确保代码在其中执行。
  • 错误处理: 在并发执行 Promise 时,需要注意错误处理。可以使用 try-catch 块来捕获 Promise 执行过程中抛出的异常。
  • 性能优化: 并发连接数 $this->connections 需要根据实际情况进行调整,以达到最佳的下载速度。过多的并发连接可能会导致服务器压力过大,而过少的并发连接可能会导致下载速度过慢。

通过本文的介绍,相信你已经掌握了在循环中使用 Amp Promises 的基本方法,并能够解决 Promise 不返回或抛出异常的问题。在实际开发中,可以根据具体场景进行调整和优化,以达到最佳的并发性能。

今天带大家了解了的相关知识,希望对你有所帮助;关于文章的技术知识我们会一点点深入介绍,欢迎大家关注golang学习网公众号,一起学习编程~

相关阅读
更多>
最新阅读
更多>
课程推荐
更多>