KafkaWriter 是一个用于将数据写入 Kafka 的 Scala 类,以下是一个简单的 KafkaWriter 样例代码,包括创建 KafkaWriter 实例、配置参数以及发送消息的方法。
(图片来源网络,侵删)import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} class KafkaWriter(brokers: String, topic: String) { private val props = new Properties() props.put("bootstrap.servers", brokers) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") private val producer = new KafkaProducer[String, String](props) def sendMessage(key: String, value: String): Unit = { val record = new ProducerRecord[String, String](topic, key, value) producer.send(record) } def close(): Unit = { producer.close() } } object KafkaWriterExample { def main(args: Array[String]): Unit = { val kafkaWriter = new KafkaWriter("localhost:9092", "testtopic") // 发送消息 kafkaWriter.sendMessage("key1", "value1") kafkaWriter.sendMessage("key2", "value2") // 关闭生产者 kafkaWriter.close() } }
在这个示例中,我们首先导入了所需的 Kafka 相关类,我们创建了一个名为KafkaWriter
的类,它接受两个参数:brokers
(Kafka 集群地址)和topic
(要写入的主题)。
在KafkaWriter
类中,我们定义了一个名为props
的属性对象,用于存储 Kafka 生产者的配置信息,我们创建了一个名为producer
的 KafkaProducer 实例,并使用props
作为配置参数。
我们还定义了一个名为sendMessage
的方法,该方法接受一个键和一个值作为参数,并将它们封装在一个ProducerRecord
对象中,我们使用producer.send()
方法将记录发送到 Kafka。
我们定义了一个名为close
的方法,用于关闭 KafkaProducer 实例。
在KafkaWriterExample
对象中,我们创建了一个KafkaWriter
实例,并向其发送了两条消息,我们调用close
方法关闭生产者。
最新评论
本站CDN与莫名CDN同款、亚太CDN、速度还不错,值得推荐。
感谢推荐我们公司产品、有什么活动会第一时间公布!
我在用这类站群服务器、还可以. 用很多年了。