什么是 Spring Cloud Data Flow
Spring Cloud Data Flow 为创建可组合数据的微服务提供统一的服务,这些服务是基于流和基于ETL的数据处理模式。
什么是 ETL
ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过萃取(extract)、转置(transform)、加载(load)至目的端的过程。ETL 的应用领域:
- 大型装备工况数据的采集分析
- 内部业务系统的数据集成
- 日志分析
体系结构
Spring Cloud Data Flow简化了专注于数据处理的应用程序的开发和部署。该体系结构的主要概念是应用程序(Applications),数据流服务器(Data Flow Server)和目标运行时(target runtime)。
应用程序有两种形式:
- 长期存活的应用程序。这些应用程序可以是Stream的一部分,在这种情况下,通过消息传递中间件消耗或生成无限量的数据。或者,它们可以是长期存在的任意应用程序,并且不一定使用消息传递中间件。
- 短期任务应用程序。处理有限的数据集然后终止。
根据运行时的不同,应用程序可以通过两种方式打包:
- Spring Boot uber-jar(可执行文件),托管在maven存储库,文件或HTTP(S)中。
- Docker 镜像。
运行时是应用程序执行的位置。指的是其他应用程序部署的平台。支持的平台有:
- Cloud Foundry
- Kubernetes
- Local Server
Spring Cloud Data Flow Server将流应用程序的部署和运行时状态委派给Spring Cloud Skipper Server。从而使它具有在运行时在Stream中更新和回滚应用程序的功能,以及将应用程序部署到多个平台的功能。Spring Cloud Data Flow Server 还负责:
- 通过解释和执行Stream DSL,来确定多个长期存活的应用程序的数据流逻辑。
- 启动一个长期存活的任务应用程序。
- 解释和执行组合任务DSL,确定多个短期存活应用程序的数据流逻辑。
- 执行应用程序到运行时映射的部署清单 – 例如,设置初始实例数,内存要求和数据分区。
- 提供已部署应用程序的运行时状态。

在此体系结构中,Stream DSL(domain-specific language)将发布到Data Flow Server。基于DSL应用程序名称映射到Maven文件和Docker镜像,http-source和cassandra-sink应用程序部署在目标运行时上。把数据发送到HTTP应用程序,之后存储在Cassandra中。
流
Stream是一组长期存活的Spring Cloud Stream应用程序,它们通过消息中间件相互通信。基于文本的DSL定义了应用程序之间的配置和数据流。Spring提供了许多应用程序来实现常见用例,但您通常会创建一个自定义Spring Cloud Stream应用程序来实现自定义业务逻辑。Stream的一般生命周期是:
- 注册申请。
- 创建流定义(Stream DSL)。
- 部署流。
- 取消部署或销毁流。
- 流中的升级或回滚应用程序。
对于部署流,Data Flow Server 将部署工作委派给 Spring Cloud Skipper。
应用程序
Spring Cloud Stream Application Starters 提供了开箱即用的Spring Cloud Stream应用程序。应用程序的分类:
- suorce – 连接到外部资源以轮询接收数据,并发布到默认“输出”通道;
- processor – 从“输入”通道接收数据并对其进行处理,将结果发送到默认的“输出”通道上;
- sink – 连接到外部资源的接收器,将接收的数据发送到默认的“输入”通道。
示例中用到的应用:
- Http Source – 它侦听HTTP请求并将正文作为消息有效内容发出。如果Content-Type 匹配 text/* 或 application/json,则有效负载为 String,否则有效负载为字节数组。
- Log Sink – 输出应用程日志数据以供检查。
示例演示
事先准备
- 安装 MySQL
- 安装 RabbitMQ
下载服务应用程序包并启动
- Spring Cloud Data Flow Server
- Spring Cloud Data Flow Shell
- Spring Cloud Skipper
启动服务先先要增加配置文件:
spring.datasource.url=jdbc:mysql://localhost:3306/spring_dataflow
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=org.mariadb.jdbc.Driver
spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.host=192.168.153.130
spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.port=5672
spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.username=admin
spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.password=admin
部署流应用
注册应用
dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.0.RELEASE
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.1.0.RELEASE
创建流
dataflow:>stream create --name httptest --definition "http --server.port=9000 | log" --deploy
参考文档
http://docs.spring.io/spring-cloud-dataflow/docs/2.0.2.RELEASE/reference/htmlsingle/