您好,欢迎访问欧亿体育!

020-88886892

全国咨询热线

您现在所在位置: 主页 > 通知资讯

Apache SeaTunnel:新一代高性能、分布式、海量数据集成工具从入门到实践

更新时间:2024-01-08

关于Apache SeaTunnel

Apache SeaTunnel 原名 Waterdrop,在 2021 年 10 月更名为 SeaTunnel 并申请加入 Apache孵化器。目前 Apache SeaTunnel 已发布 40+个版本,并在大量企业生产实践中使用,包括 J.P.Morgan、字节跳动、Stey、中国移动、富士康、腾讯云、国双、中科大数据研究院、360、Shoppe、Bilibili、新浪、搜狗、唯品会等企业,广泛应用于海量异构数据集成、CDC 数据同步,SaaS 数据集成以及多源数据处理等场景中。

2021 年 12 月 9 日, Apache SeaTunnel 以全票通过的优秀表现正式成为 Apache 孵化器项目。

2023 年 5 月 17 日,Apache 董事会通过 Apache SeaTunnel 毕业决议,结束了为期 18 个月的孵化,正式确定 Apache SeaTunnel 成为 Apache 顶级项目

Apache SeaTunnel 是新一代高性能、分布式、海量数据集成工具,支持上百种数据源 ( Database/Cloud/SaaS ) 支持海量数据的实时 CDC 和批量同步,可以稳定高效地同步万亿级数据。

作为一款简单一易用、超高性能、支持实时流式和离线批量处理的数据集成平台,Apache SeaTunnel 整体的特征和优势包括:

  • 丰富且可扩展的连接器:SeaTunnel提供了一个不依赖于特定执行引擎的连接器API。基于此API开发的连接器(Source, Transform, Sink)可以在许多不同的引擎上运行,例如当前支持的SeaTunnel Engine, Flink和Spark。
  • 连接器插件:插件设计允许用户轻松开发自己的连接器并将其集成到SeaTunnel项目中。目前,SeaTunnel支持100多个连接器,而且这个数字还在飙升。下面是当前支持的连接器列表
  • 批处理流集成:基于SeaTunnel Connector API开发的连接器完美兼容离线同步、实时同步、全同步、增量同步等场景。它们大大降低了管理数据集成任务的难度。
  • 支持分布式快照算法,保证数据一致性。
  • 多引擎支持:SeaTunnel默认使用SeaTunnel引擎进行数据同步。SeaTunnel还支持使用Flink或Spark作为连接器的执行引擎,以适应企业现有的技术组件。SeaTunnel支持多个版本的Spark和Flink。
  • JDBC多路复用,数据库日志多表解析:SeaTunnel支持多表或整个数据库同步,解决了JDBC过度连接的问题;支持多表或全数据库的日志读取和解析,解决了CDC多表同步场景需要处理日志重复读取和解析的问题。
  • 高吞吐量和低延迟:SeaTunnel支持并行读写,提供稳定可靠的高吞吐量和低延迟的数据同步能力。
  • 完善的实时监控:SeaTunnel支持对数据同步过程中每一步的详细监控信息,让用户轻松了解同步任务读写的数据数量、数据大小、QPS等信息。
  • 支持两种作业开发方法:编码和画布设计。SeaTunnel web项目https://github.com/apache/seatunnel-web提供了作业、调度、运行和监控功能的可视化管理

SeaTunnel系统架构设计

SeaTunnel 安装部署

SeaTunnel 引擎是 SeaTunnel 的默认引擎。SeaTunnel的安装包中已经包含了SeaTunnel Engine的全部内容

SeaTunnel 安装包获取

  1. 可以通过直接下载编译的包进行安装

https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz

下载完毕之后上传到服务器上面并解压

# 解压到了/opt/module目录下
tar -zxvf apache-seatunnel-2.3.3-bin.tar.gz -C /opt/module
  1. 通过源代码方式进行编译获取安装包
  • 从https://seatunnel.apache.org/download或https://github.com/apache/seatunnel.git获取源码包
  • 使用maven命令构建安装包./mvnw -U -T 1C clean install -DskipTests -D"maven.test.skip"=true -D"maven.javadoc.skip"=true -D"checkstyle.skip"=true -D"license.skipAddThirdParty"
  • 然后就可以在 中获取安装包${Your_code_dir}/seatunnel-dist/target,例如:apache-seatunnel-2.3.3-SNAPSHOT-bin.tar.gz

配置 SEATUNNEL_HOME

安装Connectors插件

从2.2.0-beta开始,二进制包默认不提供connectors的依赖,因此在第一次使用它时,需要执行以下命令来安装连接器:(当然,您也可以从Apache Maven Repository[https://repo.maven.apache.org/maven2/org/apache/seatunnel/]手动下载连接器,然后手动移动到connectors/seatunnel目录)

sh bin/install-plugin.sh 2.3.3

如果需要指定connector的版本,以2.3.3版本为例,需要执行

sh bin/install-plugin.sh 2.3.3

一般情况下我们不需要所有的连接器插件,所以你可以通过配置config/plugin_config来指定你需要的插件,例如,你只需要connector-console插件,然后你可以修改plugin.properties,比如

--seatunnel-connectors--
connector-console
--end--

如果希望示例应用程序正常工作,则需要添加以下插件

--seatunnel-connectors--
connector-fake
connector-console
--end--

你可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称

如果想手动安装V2 connector插件,只需要下载自己需要的V2 connector插件,放到${SEATUNNEL_HOME}/connectors/seatunnel目录下即可

启动示例作业

定义一个作业配置文件

我们可以直接使用官方的模版config/v2.batch.config.template, 该模版分别定义了env、source、sink

env:表示运行的参数设置,比如并发数,作业运行模式:BATCH/STRAM、checkpoint等等

source:表示数据源的定义

sink:表示目标端的数据源定义

执行命令:

./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local

我是用的2.3.3,在运行后会报错,提示缺少:java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap

报错信息:

2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error,

2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues

2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed

2023-11-08 17:47:32,244 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
	at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
	at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
	at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
	at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
	at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
	at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
	at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
	at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
	at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
	at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
	at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
	at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
	at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
	at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan$Builder.(CheckpointPlan.java:66)

下载安装包

下载地址[https://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.2/seatunnel-hadoop3-3.1.4-uber-2.3.2-optional.jar]

将安装包放入 $SEATUNNEL_HOME/lib 下面

再次运行:

./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local

运行结果

2023-11-08 17:55:32,575 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 9223372036854775807
2023-11-08 17:55:32,621 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@774572734674894849) notify finished!
2023-11-08 17:55:32,621 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, checkpoint:org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint@7e4b2ce6
2023-11-08 17:55:32,627 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start clean pending checkpoint cause CheckpointCoordinator completed.
2023-11-08 17:55:32,628 INFO  org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn checkpoint_state_774572734674894849_1 state from null to FINISHED
2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}
2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1} complete with state FINISHED
2023-11-08 17:55:32,672 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}, state FINISHED
2023-11-08 17:55:32,674 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] turn to end state FINISHED.
2023-11-08 17:55:32,674 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] end with state FINISHED
2023-11-08 17:55:32,684 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
2023-11-08 17:55:32,684 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001} complete with state FINISHED
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000} complete with state FINISHED
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}, state FINISHED
2023-11-08 17:55:33,480 INFO  org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}, state FINISHED
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] turn to end state FINISHED.
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] turn to end state FINISHED.
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] end with state FINISHED
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] end with state FINISHED
2023-11-08 17:55:33,482 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] end with state FINISHED
2023-11-08 17:55:33,506 INFO  org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] resource
2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,507 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=3, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'}
2023-11-08 17:55:33,510 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] turn to end state FINISHED.
2023-11-08 17:55:33,511 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774572734674894849) end with state FINISHED
2023-11-08 17:55:33,523 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774572734674894849) end with state FINISHED
2023-11-08 17:55:33,546 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2023-11-08 17:55:30
End Time                  : 2023-11-08 17:55:33
Total Time(s)             :                   2
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
***********************************************

2023-11-08 17:55:33,546 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2023-11-08 17:55:33,548 INFO  com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-610439] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2023-11-08 17:55:33,548 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-610439] [5.1] Removed connection to endpoint: [localhost]:5801:6f7f4921-7d71-42af-b26a-6f11a7960118, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33602->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 17:55:33.543, lastWriteTime=2023-11-08 17:55:33.523, closedTime=2023-11-08 17:55:33.547, connected server version=5.1}
2023-11-08 17:55:33,548 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2023-11-08 17:55:33,549 INFO  com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-610439] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=38fbe3e1-876c-48e9-b145-1989888393ab, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699437330660, latest clientAttributes=lastStatisticsCollectionTime=1699437330713,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699437330653,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=15730012160,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5290000000,os.systemLoadAverage=0.86,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994659352,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2424,runtime.usedMemory=34517992, labels=[]}
2023-11-08 17:55:33,550 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2023-11-08 17:55:33,550 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2023-11-08 17:55:33,551 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTTING_DOWN
2023-11-08 17:55:33,553 INFO  com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-610439] [5.1] Shutdown request of Member [localhost]:5801 - 6f7f4921-7d71-42af-b26a-6f11a7960118 this is handled
2023-11-08 17:55:33,557 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down connection manager...
2023-11-08 17:55:33,558 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down node engine...
2023-11-08 17:55:35,980 INFO  com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-610439] [5.1] Destroying node NodeExtension.
2023-11-08 17:55:35,980 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Hazelcast Shutdown is completed in 2427 ms.
2023-11-08 17:55:35,980 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTDOWN
2023-11-08 17:55:35,980 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
2023-11-08 17:55:35,981 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2023-11-08 17:55:35,981 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal

日志输入如上内容,说明配置成功

Seatunnel示例:mysql-to-mysql

接下来,我们在测试一下使用seatunnel实现从mysql同步到mysql的配置

首先,需要下载mysql jdbc的的依赖,这里我们可以选择在plugin-mapping.properties文件中配置connector-jdbc ,也可以直接将connector-jdbc的jar包放入到 $SEATUNNEL_HOME//connectors/seatunnel 下面

创建mysql库和表

create database test_01;
create table test_01.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);

create database test_02;
create table test_02.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);

插入数据

insert into test_01.user (username) values ("zhangsan");
insert into test_01.user (username) values ("lisi");

创建同步配置文件

env {
  job.mode = "BATCH"
}

# 配置数据源
source {
  jdbc {
    url = "jdbc:mysql://172.1.1.54:3306/test_01"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "admin"
    password = "xxxxxxx"
    generate_sink_sql = true
    database = "test_01"
    table = "user"
    query = "select * from test_01.user"
  }
}

transform {
}

# 配置目标库
sink {
  jdbc {
    url = "jdbc:mysql://172.1.1.54:3306/test_02"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "admin"
    password = "xxxxxx"
    generate_sink_sql = true
    database = "test_02"
    table = "user"
  }

}

运行命令:

./bin/seatunnel.sh -e LOCAL -c ./config/mysql-to-mysql.conf

输出如下信息表示同步成功:

023-11-08 20:55:00,468 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
2023-11-08 20:55:00,468 INFO  org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'}
2023-11-08 20:55:00,470 INFO  org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774617897816293377), Pipeline: [(1/1)] turn to end state FINISHED.
2023-11-08 20:55:00,471 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774617897816293377) end with state FINISHED
2023-11-08 20:55:00,483 INFO  org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774617897816293377) end with state FINISHED
2023-11-08 20:55:00,511 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand -
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2023-11-08 20:54:58
End Time                  : 2023-11-08 20:55:00
Total Time(s)             :                   1
Total Read Count          :                   2
Total Write Count         :                   2
Total Failed Count        :                   0
***********************************************

2023-11-08 20:55:00,511 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN
2023-11-08 20:55:00,514 INFO  com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-250845] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side
2023-11-08 20:55:00,514 INFO  com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-250845] [5.1] Removed connection to endpoint: [localhost]:5801:1373b501-f19f-47a2-9ef2-66d14e5a31c0, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33254->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 20:55:00.508, lastWriteTime=2023-11-08 20:55:00.483, closedTime=2023-11-08 20:55:00.512, connected server version=5.1}
2023-11-08 20:55:00,515 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED
2023-11-08 20:55:00,516 INFO  com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-250845] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699448098390, latest clientAttributes=lastStatisticsCollectionTime=1699448098443,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699448098383,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=12655243264,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5340000000,os.systemLoadAverage=0.24,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994543624,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2526,runtime.usedMemory=34633720, labels=[]}
2023-11-08 20:55:00,516 INFO  com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN
2023-11-08 20:55:00,516 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client......
2023-11-08 20:55:00,517 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTTING_DOWN
2023-11-08 20:55:00,520 INFO  com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-250845] [5.1] Shutdown request of Member [localhost]:5801 - 1373b501-f19f-47a2-9ef2-66d14e5a31c0 this is handled
2023-11-08 20:55:00,523 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down connection manager...
2023-11-08 20:55:00,525 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down node engine...
2023-11-08 20:55:03,377 INFO  com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-250845] [5.1] Destroying node NodeExtension.
2023-11-08 20:55:03,377 INFO  com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Hazelcast Shutdown is completed in 2858 ms.
2023-11-08 20:55:03,378 INFO  com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTDOWN
2023-11-08 20:55:03,378 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ......
2023-11-08 20:55:03,378 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ......
2023-11-08 20:55:03,379 INFO  org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal

查看同步结果

MySQL [(none)]>
MySQL [(none)]> select * from test_02.user;
+--------+----------+
| userid | username |
+--------+----------+
|      1 | zhangsan |
|      2 | lisi     |
+--------+----------+
2 rows in set (0.00 sec)

Seatunnel 集成Flink&Spark引擎

编辑seatunnel-env.sh文件

修改FLINK_HOME为flink部署目录

修改Spark_HOME为spark部署目录

FLINK_HOME = /data/flink-1.14.5/
SPRK_HOME  = /data/spark-2.4.6/

启动Flink集群

./bin/start_cluster.sh

执行命令,还是拿mysql-to-msyq.conf 为例

注意:如果是同步mysql的话,需要将jdbc的jar包放在flink/lib目录下,这次其实和使用flink做一些数据同步一样,相关的依赖包都给到flink。

./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql-to-mysql.conf

执行结果如下:

Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /mnt/apache-seatunnel-2.3.3/starter/seatunnel-flink-13-starter.jar --config ./config/mysql-to-mysql.conf --name SeaTunnel
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/kmr/flink1/1/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/mnt/kmr/hadoop/1/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Job has been submitted with JobID a643a1990705f817479b1d2880c9f038
Program execution finished
Job with JobID a643a1990705f817479b1d2880c9f038 has finished.
Job Runtime: 2356 ms

清空test_02.user表,使用spark导入

MySQL [(none)]> truncate table test_02.user;
Query OK, 0 rows affected (0.01 sec)

执行导入命令

./bin/start-seatunnel-spark-2-connector-v2.sh \
--master local[2] \
--deploy-mode client \
--config ./config/mysql-to-mysql.conf

执行结果如下:

23/11/08 21:19:57 INFO executor.FieldNamedPreparedStatement: PrepareStatement sql is:
INSERT INTO `test_02`.`user` (`userid`, `username`) VALUES (?, ?)

23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Commit authorized for partition 0 (task 0, attempt 0, stage 0.0)
23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Committed partition 0 (task 0, attempt 0, stage 0.0)
23/11/08 21:19:57 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1148 bytes result sent to driver
23/11/08 21:19:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1486 ms on localhost (executor driver) (1/1)
23/11/08 21:19:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
23/11/08 21:19:57 INFO scheduler.DAGScheduler: ResultStage 0 (save at SinkExecuteProcessor.java:117) finished in 1.568 s
23/11/08 21:19:57 INFO scheduler.DAGScheduler: Job 0 finished: save at SinkExecuteProcessor.java:117, took 1.610054 s
23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e is committing.
23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e committed.
23/11/08 21:19:57 INFO execution.SparkExecution: Spark Execution started
23/11/08 21:19:57 INFO spark.SparkContext: Invoking stop() from shutdown hook
23/11/08 21:19:57 INFO server.AbstractConnector: Stopped Spark@6e8a9c30{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
23/11/08 21:19:57 INFO ui.SparkUI: Stopped Spark web UI at http://kmr-b55b8d33-gn-0a6e9139-az1-master-1-2.ksc.com:4040
23/11/08 21:19:57 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/08 21:19:57 INFO memory.MemoryStore: MemoryStore cleared
23/11/08 21:19:57 INFO storage.BlockManager: BlockManager stopped
23/11/08 21:19:57 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
23/11/08 21:19:57 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/08 21:19:57 INFO spark.SparkContext: Successfully stopped SparkContext
23/11/08 21:19:57 INFO util.ShutdownHookManager: Shutdown hook called
23/11/08 21:19:57 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6dfc2825-0da5-4e4c-9623-bddcccab0849

查询数据表,已导入成功

MySQL [(none)]> truncate table test_02.user;
Query OK, 0 rows affected (0.01 sec)

MySQL [(none)]> select * from test_02.user;
+--------+----------+
| userid | username |
+--------+----------+
|      1 | zhangsan |
|      2 | lisi     |
+--------+----------+

在线客服

ONLINE SERVICE

联系电话

020-88886892

返回顶部