泰克Tech 发表于 2024-6-7 15:37:27

泰涨知识 | 10分钟带你快速上手Flink编程

本帖最后由 泰克Tech 于 2024-6-7 15:38 编辑

一、套接字流作为Flink的数据源1. 编写pom.xml
<pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, &quot;Andale Mono&quot;, &quot;lucida console&quot;, &quot;Courier New&quot;, monospace; font-size: inherit;"> <!--   Flink依赖   -->
       <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-scala_2.12</artifactId>
         <version>1.14.0</version>
       </dependency>
       <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-scala_2.12</artifactId>
         <version>1.14.0</version>
       </dependency>
       <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-clients_2.12</artifactId>
         <version>1.14.0</version>
       </dependency></code></pre>

2. 编写代码



3.去到服务器,执行nc -l命令发送数据到Flink
<span style="font-family: Consolas, &quot;Liberation Mono&quot;, Menlo, Courier, monospace; white-space: pre; color: rgb(51, 51, 51); font-size: 14px; letter-spacing: 1px;">#如果服务器是最小化安装的,可能不存在nc命令,需要手动安装</span>

4.输出结果回到IDEA运行代码,在终端输入文字(使用空格分开),在idea终端会显示计算结果:https://pic1.zhimg.com/80/v2-51b4c193393559fc0ad7923920b84821_720w.webp?source=d16d100b


二、Kafka作为Flink的数据源1.执行逻辑https://picx.zhimg.com/80/v2-92cdee7f9b6baf4e0880ee5b75bf060f_720w.webp?source=d16d100b2.编写pom.xml
<pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, &quot;Andale Mono&quot;, &quot;lucida console&quot;, &quot;Courier New&quot;, monospace; font-size: inherit;"><!--   Flink集成Kafka   -->
       <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-kafka_2.12</artifactId>
         <version>1.14.0</version>
       </dependency></code></pre>


3.编写代码
<pre style="overflow-wrap: initial; background: rgb(248, 248, 250); border-radius: 4px; font-size: 0.9em; overflow: auto; padding: calc(0.888889em); word-break: initial; color: rgb(25, 27, 31);"><code class="language-text" style="background-color: inherit; border-radius: 0px; font-family: Menlo, Monaco, Consolas, &quot;Andale Mono&quot;, &quot;lucida console&quot;, &quot;Courier New&quot;, monospace; font-size: inherit;">import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

import java.util.Properties

object flink_demo3 {
def main(args: Array): Unit = {

   //1.创建执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment

   val text = env.socketTextStream("192.168.80.145",4444,'\n')

   //2.执行逻辑
   //2.1 kafka的主题
   val topic = "bigdata"
   //2.2 配置其他的Kafka参数
   val prop = new Properties()
   // Kafka的服务器路径及端口
   prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
   
   val myProducer = new FlinkKafkaProducer("192.168.80.145:9092",topic,new SimpleStringSchema())
   
   text.print()
   text.addSink(myProducer)

   //3.执行计算
   env.execute("kafka flink sink")
}
}</code></pre>

4.查看结果去到服务器启动Kafka:4.1 启动zookeeperzookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties

4.2 另起一个终端,启动Kafkakafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties

4.3 另起一个终端,启动生产者kafka-console-producer.sh --broker-list master:9092 --topic bigdata

4.4 在生产者端口输入数据,然后就能在IDEA终端(Flink)查看到结果https://pic1.zhimg.com/80/v2-1310531cfb90d104c8ad698a784e7cc8_720w.webp?source=d16d100b三、Kafka作为Flink的输出Sink1.执行逻辑套接字流作为Flink的数据源Source,将数据发送给Flink,Flink再将数据发送给Kafka(相当于Kafka作为Flink的Sink,或者说Flink作为Kafka的生产者),服务器终端的消费者可以消费到Flink发送过来的数据:https://pic1.zhimg.com/80/v2-88af547262a3bb2d09d1b41f8baf7c3c_720w.webp?source=d16d100b2.编写pom.xml和Kafka作为Flink的数据源一致:<!--   Flink集成Kafka   -->
       <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-kafka_2.12</artifactId>
         <version>1.14.0</version>
       </dependency>
3.编写代码import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

import java.util.Properties

object flink_demo3 {
def main(args: Array): Unit = {

   //1.创建执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment

   val text = env.socketTextStream("192.168.80.145",4444,'\n')

   //2.执行逻辑
   //2.1 kafka的主题
   val topic = "bigdata"
   //2.2 配置其他的Kafka参数
   val prop = new Properties()
   // Kafka的服务器路径及端口
   prop.setProperty("bootstrap.servers", "192.168.80.145:9092")
   
   val myProducer = new FlinkKafkaProducer("192.168.80.145:9092",topic,new SimpleStringSchema())
   
   text.print()
   text.addSink(myProducer)

   //3.执行计算
   env.execute("kafka flink sink")
}
}


4.在终端启动套接字流去到服务器,执行nc -l命令发送数据到Flink:nc -l 4444
5.查看结果
5.1 前提:Zookeeper、Kafka已经启动了,如果没有启动,执行下述命令:#启动zookeeper
zookeeper-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/zookeeper.properties
#另起一个终端,启动Kafka
kafka-server-start.sh /usr/local/src/kafka_2.12-2.4.1/config/server.properties

5.2 回到IDEA即可看到套接字流发送到Flink的内容。
5.3 在服务器终端启动一个消费者,然后查看Flink是否将数据发送给Kafka。https://pica.zhimg.com/80/v2-70b40cddf94f9e4403e406acfaabbc99_720w.webp?source=d16d100b四、Redis作为Flink的输出Sink1.执行逻辑https://pica.zhimg.com/80/v2-efb3073c09fc0fd0c542bd647e39eea5_720w.webp?source=d16d100b2.编写pom.xml<!--   Flink集成Redis   -->
       <dependency>
         <groupId>org.apache.bahir</groupId>
         <artifactId>flink-connector-redis_2.12</artifactId>
         <version>1.1.0</version>
       </dependency>
3.编写代码import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.StringUtils

/**
* 使用Redis作为Flink的Sink
*/
object flink_demo4 {

class myRedisMapper extends RedisMapper[(String,String)]{
   override def getCommandDescription: RedisCommandDescription =
   new RedisCommandDescription(RedisCommand.SET)

   override def getKeyFromData(t: (String, String)): String = t._1

   override def getValueFromData(t: (String, String)): String = t._2
}

def main(args: Array): Unit = {


   //1.创建执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment

   //从套接字流获取数据(即使用socket作为Flink的Source)
   val text = env.socketTextStream("192.168.80.145", 4444, '\n')

   //隐式转换
   import org.apache.flink.api.scala._

   //对获取到的数据进行处理
   val data = text
    .filter(!StringUtils.isNullOrWhitespaceOnly(_))//过滤空行
    .flatMap(_.split(" "))//根据空格拆分
    .map((_,1))//转换为键值对
    .keyBy(0).sum(1)//根据键统计值,即相同key的值加1
   //到这个位置获取到的内容相当于("hadoop",1)("flink",2)
    .map(
   x=>{
       println(x)//在控制台打印键值对
      (x._1,x._2.toString) //因为之前的键值对的值是整型,但Redis不支持整型,故将其转换成字符串
    }
)
   //配置Redis连接池
   val redisConfig = new FlinkJedisPoolConfig.Builder().setHost("192.168.80.145").setPort(6379).build()

   //配置Redis为Flink的sink
   val redisSink = new RedisSink[(String,String)](redisConfig,new myRedisMapper)
   //指定redisSink为Flink的Sink
   data.addSink(redisSink)

   //执行代码
   env.execute()
}
}
4.修改Redis配置vi /usr/local/src/redis-6.2.7/redis.conf

#①在配置文件redis.conf修改bind 127.0.0.1,在最前面加入#注释他
#bind 127.0.0.1 -::1

#找到protected-mode yes将其修改为protected-mode no
5.启动Redis的服务,终端会卡住,不要停止它redis-server /usr/local/src/redis-6.2.7/redis.conf
6.另起一个终端,启动Redis命令行redis-cli
7.另起一个终端,启动套接字流nc -l 4444

8.回到IDEA,运行代码
然后再套接字流终端输入内容,即可在IDEA终端查看到输出的内容:https://picx.zhimg.com/80/v2-4f98f6430a9698ef0bb1677646d9bbb0_720w.webp?source=d16d100b
9.回到Redis命令行终端查看结果
回到Redis命令行终端,输入keys *命令即可查看到Flink输入Redis的内容:https://pica.zhimg.com/80/v2-a26b771d1dc8668fee006f0108f9df13_720w.webp?source=d16d100b

扫码获取更多学习资料、备考资讯、岗位内推机会~
https://cdn.hh010.com/data/attachment/forum/202405/11/170143odaf4kgz35tdgzjc.jpg

mawr1985 发表于 2024-6-7 17:07:33

{:6_267:}
页: [1]
查看完整版本: 泰涨知识 | 10分钟带你快速上手Flink编程