泰涨知识 | 10分钟带你快速上手Flink编程
本帖最后由 泰克Tech 于 2024-6-7 15:38 编辑一、套接字流作为Flink的数据源1. 编写pom.xml
<pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, "Andale Mono", "lucida console", "Courier New", monospace; font-size: inherit;"> <!-- Flink依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency></code></pre>
2. 编写代码
3.去到服务器,执行nc -l命令发送数据到Flink
<span style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; white-space: pre; color: rgb(51, 51, 51); font-size: 14px; letter-spacing: 1px;">#如果服务器是最小化安装的,可能不存在nc命令,需要手动安装</span>
4.输出结果回到IDEA运行代码,在终端输入文字(使用空格分开),在idea终端会显示计算结果:https://pic1.zhimg.com/80/v2-51b4c193393559fc0ad7923920b84821_720w.webp?source=d16d100b
二、Kafka作为Flink的数据源1.执行逻辑https://picx.zhimg.com/80/v2-92cdee7f9b6baf4e0880ee5b75bf060f_720w.webp?source=d16d100b2.编写pom.xml
<pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, "Andale Mono", "lucida console", "Courier New", monospace; font-size: inherit;"><!-- Flink集成Kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.0</version>
</dependency></code></pre>
3.编写代码
<pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, "Andale Mono", "lucida console", "Courier New", monospace; font-size: inherit;">import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import java.util.Properties
object flink_demo3 {
def main(args: Array): Unit = {
//1.创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("192.168.80.145",4444,'\n')
//2.执行逻辑
//2.1 kafka的主题
val topic = "bigdata"
//2.2 配置其他的Kafka参数
val prop = new Properties()
// Kafka的服务器路径及端口
prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
val myProducer = new FlinkKafkaProducer("192.168.80.145:9092",topic,new SimpleStringSchema())
text.print()
text.addSink(myProducer)
//3.执行计算
env.execute("kafka flink sink")
}
}</code></pre>
4.查看结果去到服务器启动Kafka:4.1 启动zookeeperzookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties
4.2 另起一个终端,启动Kafkakafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties
4.3 另起一个终端,启动生产者kafka-console-producer.sh --broker-list master:9092 --topic bigdata
4.4 在生产者端口输入数据,然后就能在IDEA终端(Flink)查看到结果https://pic1.zhimg.com/80/v2-1310531cfb90d104c8ad698a784e7cc8_720w.webp?source=d16d100b三、Kafka作为Flink的输出Sink1.执行逻辑套接字流作为Flink的数据源Source,将数据发送给Flink,Flink再将数据发送给Kafka(相当于Kafka作为Flink的Sink,或者说Flink作为Kafka的生产者),服务器终端的消费者可以消费到Flink发送过来的数据:https://pic1.zhimg.com/80/v2-88af547262a3bb2d09d1b41f8baf7c3c_720w.webp?source=d16d100b2.编写pom.xml和Kafka作为Flink的数据源一致:<!-- Flink集成Kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.0</version>
</dependency>
3.编写代码import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import java.util.Properties
object flink_demo3 {
def main(args: Array): Unit = {
//1.创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("192.168.80.145",4444,'\n')
//2.执行逻辑
//2.1 kafka的主题
val topic = "bigdata"
//2.2 配置其他的Kafka参数
val prop = new Properties()
// Kafka的服务器路径及端口
prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
val myProducer = new FlinkKafkaProducer("192.168.80.145:9092",topic,new SimpleStringSchema())
text.print()
text.addSink(myProducer)
//3.执行计算
env.execute("kafka flink sink")
}
}
4.在终端启动套接字流去到服务器,执行nc -l命令发送数据到Flink:nc -l 4444
5.查看结果
5.1 前提:Zookeeper、Kafka已经启动了,如果没有启动,执行下述命令:#启动zookeeper
zookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties
#另起一个终端,启动Kafka
kafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties
5.2 回到IDEA即可看到套接字流发送到Flink的内容。
5.3 在服务器终端启动一个消费者,然后查看Flink是否将数据发送给Kafka。https://pica.zhimg.com/80/v2-70b40cddf94f9e4403e406acfaabbc99_720w.webp?source=d16d100b四、Redis作为Flink的输出Sink1.执行逻辑https://pica.zhimg.com/80/v2-efb3073c09fc0fd0c542bd647e39eea5_720w.webp?source=d16d100b2.编写pom.xml<!-- Flink集成Redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
3.编写代码import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.StringUtils
/**
* 使用Redis作为Flink的Sink
*/
object flink_demo4 {
class myRedisMapper extends RedisMapper[(String,String)]{
override def getCommandDescription: RedisCommandDescription =
new RedisCommandDescription(RedisCommand.SET)
override def getKeyFromData(t: (String, String)): String = t._1
override def getValueFromData(t: (String, String)): String = t._2
}
def main(args: Array): Unit = {
//1.创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//从套接字流获取数据(即使用socket作为Flink的Source)
val text = env.socketTextStream("192.168.80.145", 4444, '\n')
//隐式转换
import org.apache.flink.api.scala._
//对获取到的数据进行处理
val data = text
.filter(!StringUtils.isNullOrWhitespaceOnly(_))//过滤空行
.flatMap(_.split(" "))//根据空格拆分
.map((_,1))//转换为键值对
.keyBy(0).sum(1)//根据键统计值,即相同key的值加1
//到这个位置获取到的内容相当于("hadoop",1)("flink",2)
.map(
x=>{
println(x)//在控制台打印键值对
(x._1,x._2.toString) //因为之前的键值对的值是整型,但Redis不支持整型,故将其转换成字符串
}
)
//配置Redis连接池
val redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.80.145").setPort(6379).build()
//配置Redis为Flink的sink
val redisSink = new RedisSink[(String,String)](redisConfig,new myRedisMapper)
//指定redisSink为Flink的Sink
data.addSink(redisSink)
//执行代码
env.execute()
}
}
4.修改Redis配置vi /usr/local/src/redis-6.2.7/redis.conf
#①在配置文件redis.conf修改bind 127.0.0.1,在最前面加入#注释他
#bind 127.0.0.1 -::1
#找到protected-mode yes将其修改为protected-mode no
5.启动Redis的服务,终端会卡住,不要停止它redis-server /usr/local/src/redis-6.2.7/redis.conf
6.另起一个终端,启动Redis命令行redis-cli
7.另起一个终端,启动套接字流nc -l 4444
8.回到IDEA,运行代码
然后再套接字流终端输入内容,即可在IDEA终端查看到输出的内容:https://picx.zhimg.com/80/v2-4f98f6430a9698ef0bb1677646d9bbb0_720w.webp?source=d16d100b
9.回到Redis命令行终端查看结果
回到Redis命令行终端,输入keys *命令即可查看到Flink输入Redis的内容:https://pica.zhimg.com/80/v2-a26b771d1dc8668fee006f0108f9df13_720w.webp?source=d16d100b
扫码获取更多学习资料、备考资讯、岗位内推机会~
https://cdn.hh010.com/data/attachment/forum/202405/11/170143odaf4kgz35tdgzjc.jpg
{:6_267:}
页:
[1]