Spring Boot集成Akka Stream快速入门Demo

news2024/9/19 20:14:31

1.什么是Akka Stream?

Akka Streams是一个用于处理和传输元素序列的库。它建立在Akka Actors之上,使流的摄入和处理变得简单。由于它是建立在Akka Actors之上的,它为Akka现有的actor模型提供了一个更高层次的抽象。Akka流由3个主要部分组成--Source、Flow、Sink--任何非循环流至少由2个部分Source、Sink和任意数量的Flow元素组成。这里我们可以说Source和Sink是Flow的特殊情况。这里Flow位于Source和Sink之间,因为它们是应用于Source数据的转换。

stream

Akka流的特点

  • Akka-streams对于快速流数据非常有用。
  • 它避免了管理角色所需的大量模板代码。
  • 它最适合于基于大数据的应用。
  • 由于它是建立在Akka工具包上的,我们将获得所有Akka工具包的好处,如反应性、分布式、位置透明性、集群、Remoting等。
  • 它提供了可重用性,这意味着一旦我们设计了数据流图,我们就可以重复使用它的任何次数。

2.代码工程

 实验目标

熟悉akka stream 相关概念

pom.xml

<!-- Akka Streams -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.13</artifactId>
    <version>2.6.0</version>
</dependency>

config

package com.et.akka.config;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnClass(akka.stream.javadsl.Source.class)
public class AkkaConfig {

  private final ActorSystem system;

  @Autowired
  public AkkaConfig() {
    system = ActorSystem.create("SpringWebAkkaStreamsSystem");
  }

  @Bean
  @ConditionalOnMissingBean(ActorSystem.class)
  public ActorSystem getActorSystem() {
    return system;
  }

}

akka stream

1.源。

这是你的流的入口。每个流中必须至少有一个源。它需要两个类型参数。第一个代表它所发射的数据类型,第二个是它在运行时可以产生的辅助值的类型。如果不产生,我们就使用Akka提供的NotUsed 类型。它只有一个输出点。源可以被认为是发布者

Source<Integer, NotUsed> source = Source.range(1, 100);

2.Sink :

这是你的流的出口点。每个流中必须至少有一个水槽。Sink 是我们流的最后一个元素。基本上,它是一个由源发送/处理的数据的订阅者。通常它将其输入输出到一些系统IO。它是一个流的终点,因此消耗数据。一个汇有一个单一的输入通道,没有输出通道。当我们想以可重复使用的方式指定数据收集器的行为时,特别需要汇,而且不需要评估流。水槽可以被认为是用户。

Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

3.流:

流是流中的一个处理步骤。它结合了一个传入通道和一个传出通道,以及通过它的消息的一些转换。如果一个流被连接到一个源,一个新的源就是结果。同样地,一个流连接到一个汇,就会产生一个新的汇。而同时与一个源和一个汇相连的流的结果是RunnableFlow 。因此,它们位于输入和输出通道之间,但只要它们不与源或汇相连,它们本身就不对应于其中一种味道。这里,流位于源和汇之间,因为它们是应用于源数据的转换。

Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).filter(MyStream::isPrime);

4.RunnableGraph :

一个两端分别连接到Source和Sink的Flow可以被运行(),被称为RunnableGraph。即使通过连接所有的源、汇和不同的操作符来构建RunnableGraph,也不会有数据流经它。这就是Materialization的作用!

 RunnableGraph<NotUsed> graph = source.to(sink);
 graph.run(actorSystem);

5.Materializer :

Akka流中的流和图就像准备一个蓝图/执行计划。流的物化是将流的描述和分配它所需的所有必要资源的过程,以便运行。这意味着启动处理的Actor,以及根据流的需要,在引擎盖下的更多内容。在运行(物化)RunnableGraph后,我们会得到指定类型的物化值。每个流操作者都可以产生一个物化的值。Akka有.toMat ,以表明我们要转换源和汇的物化值。

source.via(flow).to(sink).run(materializer);

具体类的信息如下:

package com.et.akka.stream;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
import java.util.concurrent.CompletionStage;

@Component
public class MyStream {
    @Autowired
    private ActorSystem actorSystem;
    @PostConstruct
    public void run() {
        Source<Integer, NotUsed> source = Source.range(1, 10);
        Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

        RunnableGraph<NotUsed> graph = source.to(sink);

        graph.run(actorSystem);
    }
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)

3.测试

  • 启动Spring Boot工程
  • 控制台输出所有质数

4.引用

  • https://doc.akka.io/docs/akka/current/stream/stream-quickstart.html
  • Spring Boot集成Akka Stream快速入门Demo | Harries Blog™

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2132452.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从0开始学习RocketMQ:快速部署启动

快速部署 快速部署一个单节点单副本 RocketMQ 服务&#xff0c;并完成简单的消息收发。 安装Apache RocketMQ 下载地址&#xff1a;RocketMQ官网下载 这里我们下载二进制包&#xff1a;rocketmq-all-5.3.0-bin-release.zip 直接解压即可&#xff1a;tar -zxvf rocketmq-all…

光伏开发:工商业光伏的流程管理全面解析

一、项目准备阶段 1、资源寻觅与沟通 首要任务是寻找适合的工商业屋顶或空地资源&#xff0c;并与业主初步交流&#xff0c;了解其意向、屋顶条件及用电情况。这一阶段的关键在于建立信任关系&#xff0c;为后续工作奠定基础。 2、资料收集与核查 全面收集业主资料&#xff…

算法练习题26——多项式输出(模拟)

输入格式 输入共有 2 行 第一行 1 个整数&#xff0c;n&#xff0c;表示一元多项式的次数。 第二行有 n1 个整数&#xff0c;其中第 i 个整数表示第 n−i1 次项的系数&#xff0c;每两个整数之间用空格隔开。 输出格式 输出共 1 行&#xff0c;按题目所述格式输出多项式。…

Navicat BI 中创建自定义字段:计算字段

在数据库设计和开发中&#xff0c;避免存储任何可以从其他字段计算或重建的数据是一种惯例。因此&#xff0c;在 Navicat BI 中构建图表时&#xff0c;你可能会缺少一些数据。但这不是问题&#xff0c;因为 Navicat BI 提供了专门用于此目的的计算字段。在今天的博客中&#xf…

网站按钮检测系统源码分享

网站按钮检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Vis…

浅谈MVC设计模式

1 前言 1.1 内容概要 熟悉使用JSON工具&#xff0c;完成Java对象&#xff08;Map&#xff09;和Json字符串之间的相互转换&#xff08;注意提供构造器和getter/setter方法&#xff09; 注意事项&#xff1a;不管使用的是什么JSON工具&#xff0c;都要提供类的无参构造方法和…

基于SpringBoot的宠物寄领养网站管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【前后端分离】基于JavaSpringBootVueMySQL的宠物寄领养网站…

北斗卫星系统信号介绍

覆盖范围亚太区域全球范围 卫星数量35颗区域服务卫星30颗全球服务卫星 信号频段B1I, B2IB1C, B2a, B3, 兼容GPS/Galileo 定位精度区域内10米全球2.5~5米&#xff0c;中国内更高 新增功能区域短报文通信全球短报文通信、星基增强、精密定位 抗干扰能力相对有限更强 互操作…

无人机 PX4 飞控 | 如何检测状态估计EKF性能

无人机 PX4 飞控 | 如何检测状态估计EKF性能 前言检查EKF性能缺少pyulog问题解决脚本崩溃没有输出文件生成对应文件 结语 前言 ECL &#xff08;Estimation and Control Library&#xff0c;估计和控制库&#xff09;&#xff0c;其中的状态估计使用扩展卡尔曼滤波算法&#x…

图像检测【YOLOv5】——深度学习

Anaconda的安装配置&#xff1a;&#xff08;Anaconda是一个开源的Python发行版本&#xff0c;包括Conda、Python以及很多安装好的工具包&#xff0c;比如&#xff1a;numpy&#xff0c;pandas等&#xff0c;其中conda是一个开源包和环境管理器&#xff0c;可以用于在同一个电脑…

计算机网络基本概述

欢迎大家订阅【计算机网络】学习专栏&#xff0c;开启你的计算机网络学习之旅&#xff01; 文章目录 前言一、网络的基本概念二、集线器、交换机和路由器三、互连网与互联网四、网络的类型五、互连网的组成1. 边缘部分2. 核心部分 六、网络协议 前言 计算机网络是现代信息社会…

安装node 报错需要:glibc >= 2.28

--> 解决依赖关系完成 错误&#xff1a;软件包&#xff1a;2:nodejs-18.20.4-1nodesource.x86_64 (nodesource-nodejs) 需要&#xff1a;libm.so.6(GLIBC_2.27)(64bit) 错误&#xff1a;软件包&#xff1a;2:nodejs-18.20.4-1nodesource.x86_64 (nodesource-nodej…

【数据结构篇】~排序(1)之插入排序

排序~插入排序 前言插入排序1.直接插入排序&#xff08;时间复杂度&#xff1a;O(N^2)&#xff09;1.思想2.代码 2.希尔排序(时间复杂度&#xff1a;O(N∙))1.思路简易证明希尔排序的复杂度 2.代码 前言 四大排序&#xff0c;今天解决插入排序 堆排序和冒泡排序已经写过了&am…

C++笔记---继承(上)

1. 继承的简单介绍 1.1 继承的概念 继承(inheritance)机制是面向对象程序设计使代码可以复用的最重要的手段&#xff0c;它允许我们在保持原有类特性的基础上进行扩展&#xff0c;增加方法(成员函数)和属性(成员变量)&#xff0c;这样产生新的类&#xff0c;称派生类。 继承呈…

如何利用 Smarter Balanced 塑造教育领域的 AI 治理

目录 定义挑战 以人为本的设计引领 融入多样性 探索以学生为中心的价值观 探索效果的层次和不同的影响 部位于加利福尼亚州的Smarter Balanced Assessment Consortium 是一个由会员主导的公共组织&#xff0c;为 K-12 和高等教育领域的教育工作者提供评估系统。该组织成立…

09_Tensorflow2图像处理大赏:让你的图片笑出AI感,惊艳朋友圈!

1. 图像处理案例 1.1 逆时针旋转90度 import tensorflow as tf import matplotlib.pyplot as plt import matplotlib.cm as cm import numpy import osdef show_pic(pic,name,cmapNone):显示图像plt.imshow(pic,cmapcmap) plt.axis(off) # 打开坐标轴为 on # 设置图像标题…

C语言数据类型、变量及数据类型的长度、取值范围

文章目录 一、数据类型介绍1.字符型2.整型3.浮点型4.布尔类型 二、变量1.变量的创建2.变量的分类 三、数据类型的长度(字节)1.sizeof 操作符2.各种数据类型的长度3.sizeof中表达式不计算 四、各种类型的取值范围1.signed和unsigned2.数据类型的取值范围 五、整型提升练习1练习2…

【Obsidian】当笔记接入AI,Copilot插件推荐

当笔记接入AI&#xff0c;Copilot插件推荐 自己的知识库笔记如果增加AI功能会怎样&#xff1f;AI的回答完全基于你自己的知识库余料&#xff0c;是不是很有趣。在插件库中有Copilot插件这款插件&#xff0c;可以实现这个梦想。 一、什么是Copilot&#xff1f; 我们知道githu…

el-input-number设置了min值,希望默认值展示为空

data() {return {editForm: {num: undefined, //input}} } <el-input-number v-model.trim"editForm.num" controls-position"right" :min"1" placeholder"请输入" clearable /> 展示效果如下:

C++中的左值(Lvalue)和右值(Rvalue)详解

C中的左值&#xff08;Lvalue&#xff09;和右值&#xff08;Rvalue&#xff09;详解 在C中&#xff0c;左值&#xff08;Lvalue&#xff09;和右值&#xff08;Rvalue&#xff09;的概念是理解表达式和变量的重要基础。为了提高C的性能和灵活性&#xff0c;C11引入了一些新的…