spark stream 3.0.0 scala版本写入kafka消息数据
这里实际上是调用kafka客户端来执行kafka消息数据写入的。这里模拟随机产生一系列数据,持续写入kafka,形成持续的消息流数据。

1. 添加依赖
org.apache.spark spark-core_2.12 3.0.0 org.apache.spark spark-streaming_2.12 3.0.0 org.apache.spark spark-streaming-kafka-0-10_2.12 3.1.0
2. 测试代码
package com.demoimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import scala.util.Randomobject MockerRealTime {/*** 模拟的数据** 格式 :timestamp area city userid adid* 某个时间点 某个地区 某个城市 某个用户 某个广告*/def generateMockData(): Array[String] = {val array: ArrayBuffer[String] = ArrayBuffer[String]()val CityRandomOpt = RandomOptions(RanOpt(CityInfo(1, "北京", "华北"), 30),RanOpt(CityInfo(2, "上海", "华东"), 30),RanOpt(CityInfo(3, "广州", "华南"), 10),RanOpt(CityInfo(4, "深圳", "华南"), 20),RanOpt(CityInfo(5, "天津", "华北"), 10))val random = new Random()// 模拟实时数据:// timestamp province city userid adidfor (i <- 0 to 50) {val timestamp: Long = System.currentTimeMillis()val cityInfo: CityInfo = CityRandomOpt.getRandomOptval city: String = cityInfo.city_nameval area: String = cityInfo.areaval adid: Int = 1 + random.nextInt(6)val userid: Int = 1 + random.nextInt(6)// 拼接实时数据array += timestamp + " " + area + " " + city + " " + userid + " " + adid}array.toArray}def createKafkaProducer(broker: String): KafkaProducer[String, String] = {// 创建配置对象val prop = new Properties()// 添加配置prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")// 根据配置创建 Kafka 生产者new KafkaProducer[String, String](prop)}def main(args: Array[String]): Unit = {// 获取配置文件 config.properties 中的 Kafka 配置参数val config: Properties = PropertiesUtil.load("config.properties")val broker: String = config.getProperty("kafka.broker.list")val topic = "test"// 创建 Kafka 消费者val kafkaProducer: KafkaProducer[String, String] = createKafkaProducer(broker)while (true) {// 随机产生实时数据并通过 Kafka 生产者发送到 Kafka 集群中for (line <- generateMockData()) {kafkaProducer.send(new ProducerRecord[String, String](topic, line))println(line)}Thread.sleep(2000)}}}
kafka生产者参数配置主要由createKafkaProducer完成。
主要的配置内容时kafka的ip地址,端口号,topic以及key和value的序列化。
3. kafka配置(config.properties)
# Kafka 配置
kafka.broker.list=192.168.22.56:9092
4. 辅助代码(PropertiesUtil.scala)
package com.demoimport java.io.InputStreamReader
import java.util.Propertiesobject PropertiesUtil {def load(propertiesName:String): Properties ={val prop=new Properties()prop.load(newInputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertiesName) , "UTF-8"))prop}}
5. 辅助代码(RandomOptions.scala)
package com.demoimport scala.collection.mutable.ListBuffer
import scala.util.Randomcase class RanOpt[T](value: T, weight: Int)object RandomOptions {def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {val randomOptions = new RandomOptions[T]()for (opt <- opts) {randomOptions.totalWeight += opt.weightfor (i <- 1 to opt.weight) {randomOptions.optsBuffer += opt.value}}randomOptions}}class RandomOptions[T](opts: RanOpt[T]*) {var totalWeight = 0var optsBuffer = new ListBuffer[T]def getRandomOpt: T = {val randomNum: Int = new Random().nextInt(totalWeight)optsBuffer(randomNum)}
}
6. 辅助代码(CityInfo.scala)
package com.demo/**** 城市信息表** @param city_id 城市 id* @param city_name 城市名称* @param area 城市所在大区*/
case class CityInfo (city_id:Long,city_name:String,area:String)
7. 执行程序测试
可以同时看到idea控制台和kafka的命令行消费者输出。
1645151518980 华南 深圳 6 6
1645151518980 华南 深圳 2 3
1645151518980 华南 深圳 4 6
1645151518980 华东 上海 3 6
1645151518980 华北 北京 2 4
1645151518980 华东 上海 6 2
1645151518980 华北 北京 2 1
kafka消息输出。

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
