public class ConfigurableFilterStage extends GraphStage> {
private final Inlet in = Inlet.create("filter.in");
private final Outlet out = Outlet.create("filter.out");
private final Shape shape = FlowShape.of(in, out);
private final AtomicReference isEnabled = new AtomicReference<>(true);
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape) {
{
setHandler(in, new AbstractInHandler() {
@Override
public void onPush() {
final T elem = grab(in);
if (isEnabled.get()) push(out, elem);
else pull(in);
}
});
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() {
pull(in);
}
});
}
// 安全暴露的更新入口(必须由流线程调用)
public void updateEnabled(boolean enabled) {
isEnabled.set(enabled);
}
};
}
// 提供 Materialized Value:返回可安全调用的更新器
@Override
public Attributes initialAttributes() {
return super.initialAttributes().and(Attributes.name("configurable-filter"));
}
public static class ConfigUpdater {
private final GraphStageLogic logic;
public ConfigUpdater(GraphStageLogic logic) {
this.logic = logic;
}
public void enable(boolean enabled) {
// 确保在流执行线程中运行
logic.getAsyncCallback((Boolean b) -> ((ConfigurableFilterStage) logic.stage()).updateEnabled(b))
.invoke(enabled);
}
}
}
使用时:
final Source source = Source.from(Arrays.asList("x", "y", "z"));
final Flow> flow =
Flow.fromGraph(new ConfigurableFilterStage<>());
final Tuple2, ConfigurableFilterStage.ConfigUpdater> materialized =
source.viaMat(flow, Keep.right()).toMat(Sink.foreach(System.out::println), Keep.both())
.run(materializer);
// 外部任意线程均可安全调用
materialized._2.enable(false); // 立即生效,无竞态
方案三:委托给 Actor 进行集中决策(适合复杂策略)
当配置逻辑涉及状态机、外部 API 调用或需强一致性时,使用 ask 将每个元素路由至 Actor:
val decisionActor = system.actorOf(Props[DecisionActor])
val flow = Flow[String].mapAsyncUnordered(4) { s =>
decisionActor.ask[Boolean](AskDecision(s)).map { enabled =>
if (enabled) Some(s) else None
}.map(_.getOrElse(null))
}.collect { case s: String => s }