博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark_to_kakfa
阅读量:4970 次
发布时间:2019-06-12

本文共 2211 字,大约阅读时间需要 7 分钟。

package kafkaimport java.io.InputStreamimport java.text.SimpleDateFormatimport java.util.{Date, HashMap, Properties}import com.google.gson.JsonObjectimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionobject ExpandTimes {  val prop = new Properties()  val is: InputStream = this.getClass().getResourceAsStream("/conf.properties")  prop.load(is)  val ENVIRONMENT_SETING = "expandtimes_brokers_prd"  private val brokers = prop.getProperty(ENVIRONMENT_SETING)  // Zookeeper connection properties  private val props = new HashMap[String, Object]()  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,    "org.apache.kafka.common.serialization.StringSerializer")  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,    "org.apache.kafka.common.serialization.StringSerializer")  private val producer = new KafkaProducer[String, String](this.props)  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName("ExpandTimes")    val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()    val date = new Date(new Date().getTime - 86400000L)    val dateFormat = new SimpleDateFormat("yyyyMMdd")    val statisDate = dateFormat.format(date)    val querySql1 = "select member_id,times from sospdm.tdm_rpt_sign_expand_great_seven_d where statis_date = " + statisDate    val resultDF1 = spark.sql(querySql1)    if(!ENVIRONMENT_SETING.contains("prd")){      resultDF1.show(10)    }    resultDF1.rdd.foreach(row => {      val member_id: String = row.getAs[String]("member_id").toString()      val times: Int = row.getAs[Int]("times").toInt      val json = new JsonObject()      json.addProperty("memberId", member_id).toString      json.addProperty("times", times).toString      kafkaProducerSend(json.toString)    })    def kafkaProducerSend(args: String) {      if (args != null) {        val topic = "sign_status_count"        val message = new ProducerRecord[String, String](topic, null, args)        producer.send(message)      }    }  }}

 

转载于:https://www.cnblogs.com/yin-fei/p/10772873.html

你可能感兴趣的文章
[SDOI2008]洞穴勘测
查看>>
NOI2014 购票
查看>>
Difference between Linearizability and Serializability
查看>>
电影《绿皮书》
查看>>
IDEA使用操作文档
查看>>
如何对网课、游戏直播等进行录屏
查看>>
UIView
查看>>
有关去掉谷歌及火狐浏览器文本框 数字类型 上下箭头的方法
查看>>
MySQL数据迁移到SQL Server
查看>>
复杂链表的复制(python)
查看>>
添加日期选择控件
查看>>
jquery.cookie.js操作cookie
查看>>
javascript遍历数组
查看>>
bzoj4765: 普通计算姬 (分块 && BIT)
查看>>
thinkphp5-----模板中函数的使用
查看>>
POJ-3211 Washing Clothes[01背包问题]
查看>>
[BZOJ4832][Lydsy1704月赛]抵制克苏恩
查看>>
数据库三范式
查看>>
看完漫画秒懂区块链
查看>>
开发工具,做一个有效率的开发者
查看>>