博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Structrued Streaming业务数据实时分析
阅读量:4572 次
发布时间:2019-06-08

本文共 8204 字,大约阅读时间需要 27 分钟。

 

 

 

 

 

先启动spark-shell,记得启动nc服务

 

输入以下代码

scala> import org.apache.spark.sql.functions._import org.apache.spark.sql.functions._scala> import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.SparkSessionscala> import spark.implicits._import spark.implicits._scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()18/03/21 20:55:13 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.lines: org.apache.spark.sql.DataFrame = [value: string]scala> val words = lines.as[String].flatMap(_.split(" "))words: org.apache.spark.sql.Dataset[String] = [value: string]scala> val wordCounts = words.groupBy("value").count()wordCounts: org.apache.spark.sql.DataFrame = [value: string, count: bigint]scala> val query = wordCounts.writeStream.outputMode("complete").format("console").start()query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@4e260e04

 

 

 在nc输入几个单词

 

 

 

 我们再输入一些单词

 

 

 

 

我们改一下代码换成update模式

首先重新启动一次spark-shell,记得启动nc

 

 

 

 

 

 

换成append模式

scala> import org.apache.spark.sql.functions._import org.apache.spark.sql.functions._scala> import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.SparkSessionscala> import spark.implicits._import spark.implicits._scala> val lines = spark.readStream.format("socket").option("host", "bigdata-pro01.kfk.com").option("port", 9999).load()18/03/21 21:32:30 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.lines: org.apache.spark.sql.DataFrame = [value: string]scala> val words = lines.as[String].flatMap(_.split(" "))words: org.apache.spark.sql.Dataset[String] = [value: string]scala> val query = words.writeStream.outputMode("append").format("console").start()query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19d85bbe

 

 

 

 

 

 

 

 

因为我们之前的kafka的版本低了,我下载一个0.10.0版本的

下载地址 http://kafka.apache.org/downloads

 我们把kafka0.9版本的配置文件直接复制过来

为了快一点我直接在虚拟机里操作了

复制这几个配置文件

把kafka0.10的覆盖掉

 

 修改一下配置文件

 

 把kafka分发都另外的两个节点去

 

 

 

在节点2和节点3也把相应的配置文件修改一下

server.properties

 

 

 

 

 

在idea里重新建一个scala类

 

 

加上如下代码

 

package com.spark.testimport org.apache.sparkimport org.apache.spark.sql.SparkSessionobject StructuredStreamingKafka {  def main(args: Array[String]): Unit = {     val spark=SparkSession.builder().master("local[2]").appName("streaming").getOrCreate()    val df = spark      .readStream      .format("kafka")      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")      .option("subscribe", "weblogs")      .load()    import spark.implicits._   val lines= df.selectExpr("CAST(value AS STRING)").as[String]    val words = lines.flatMap(_.split(" "))    val wordCounts = words.groupBy("value").count()    val query = wordCounts.writeStream      .outputMode("complete")      .format("console")      .start()    query.awaitTermination()  }}

 

跑一下我们的程序

 如果报错了提示需要0.10版本的可以先不用管

我们启动一下kafka

 

 

 

 可以看到程序已经在跑了

 

 

我们在kafak里创建一个生产者

bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com:9092 --topic weblogs

 

 

 我们输入几个单词

 

 

可以看到idea这边的结果

 

我们可以换成update模式

 

 程序跑起来了

 

输入单词

 

 这个是运行的结果

 

 

 

 我们把包上传上来(3个节点都这样做)

 

 

启动spark-shell

 

把代码拷贝进来

 

val df = spark      .readStream      .format("kafka")      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")      .option("subscribe", "weblogs")      .load()    import spark.implicits._   val lines= df.selectExpr("CAST(value AS STRING)").as[String]    val words = lines.flatMap(_.split(" "))    val wordCounts = words.groupBy("value").count()    val query = wordCounts.writeStream      .outputMode("update")      .format("console")      .start()    query.awaitTermination()

 

 这个时候一定要保持kafka和生产者是开启的

 我在生产者这边输入几个单词

 

 回到spark-shell界面可以看到统计结果

 

 

 

 

 

我们先把mysqld的test数据库的webCount的表的内容清除

 

打开idea,我们编写两个程序

 

package com.spark.testimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.streaming.ProcessingTime/**  * Created by Administrator on 2017/10/16.  */object StructuredStreamingKafka {  case class Weblog(datatime:String,                    userid:String,                    searchname:String,                    retorder:String,                    cliorder:String,                    cliurl:String)  def main(args: Array[String]): Unit = {    val spark  = SparkSession.builder()      .master("local[2]")      .appName("streaming").getOrCreate()    val df = spark      .readStream      .format("kafka")      .option("kafka.bootstrap.servers", "bigdata-pro01.kfk.com:9092")      .option("subscribe", "weblogs")      .load()    import spark.implicits._    val lines = df.selectExpr("CAST(value AS STRING)").as[String]    val weblog = lines.map(_.split(","))      .map(x => Weblog(x(0), x(1), x(2),x(3),x(4),x(5)))    val titleCount = weblog      .groupBy("searchname").count().toDF("titleName","count")    val url ="jdbc:mysql://bigdata-pro01.kfk.com:3306/test"    val username="root"    val password="root"    val writer = new JDBCSink(url,username,password)    val query = titleCount.writeStream      .foreach(writer)      .outputMode("update")        //.format("console")      .trigger(ProcessingTime("5 seconds"))      .start()    query.awaitTermination()  }}

 

 

package com.spark.testimport java.sql._import java.sql.{Connection, DriverManager}import org.apache.spark.sql.{ForeachWriter, Row}/**  * Created by Administrator on 2017/10/17.  */class JDBCSink(url:String, username:String,password:String) extends ForeachWriter[Row]{  var statement : Statement =_  var resultSet : ResultSet =_  var connection : Connection=_  override def open(partitionId: Long, version: Long): Boolean = {    Class.forName("com.mysql.jdbc.Driver")    //  connection = new MySqlPool(url,username,password).getJdbcConn();    connection = DriverManager.getConnection(url,username,password);      statement = connection.createStatement()      return true  }  override def process(value: Row): Unit = {    val titleName = value.getAs[String]("titleName").replaceAll("[\\[\\]]","")    val count = value.getAs[Long]("count");    val querySql = "select 1 from webCount " +      "where titleName = '"+titleName+"'"    val updateSql = "update webCount set " +      "count = "+count+" where titleName = '"+titleName+"'"    val insertSql = "insert into webCount(titleName,count)" +      "values('"+titleName+"',"+count+")"    try{      var resultSet = statement.executeQuery(querySql)      if(resultSet.next()){        statement.executeUpdate(updateSql)      }else{        statement.execute(insertSql)      }    }catch {      case ex: SQLException => {        println("SQLException")      }      case ex: Exception => {        println("Exception")      }      case ex: RuntimeException => {        println("RuntimeException")      }      case ex: Throwable => {        println("Throwable")      }    }  }  override def close(errorOrNull: Throwable): Unit = {//    if(resultSet.wasNull()){//      resultSet.close()//    }    if(statement==null){      statement.close()    }    if(connection==null){      connection.close()    }  }}

 

在pom.xml文件里添加这个依赖包

 

mysql
mysql-connector-java
5.1.27

 

 

我在这里说一下这个依赖包版本的选择上最好要跟你集群里面的依赖包版本一样,不然可能会报错的,可以参考hive里的Lib路径下的版本

 

 

 

 

 保持集群的dfs,hbase,yarn,zookeeper,都是启动的状态

 

 

 

 启动我们节点1和节点2的flume,在启动之前我们先修改一下flume的配置,因为我们把jdk版本和kafka版本后面更换了,所以我们要修改配置文件(3个节点的都改)

 

 

 启动节点1的flume

 

 启动节点1的kafka

bin/kafka-server-start.sh config/server.properties

 

 

启动节点2的flume

 

在节点2上把数据启动起来,实时产生数据

 

 回到idea我们把程序运行一下

 

 

注意了,现在程序是没有报错的,因为我前期工作做得不是太好,给idea分配的内存小了,所以跑得很慢

 

 

 

回到mysql里面查看webCount表,已经有数据进来了

 

 

 

 

 

 

 

我们把配置文件修改如下

 

 

[client]socket=/var/lib/mysql/mysql.sockdefault-character-set=utf8[mysqld]character-set-server=utf8datadir=/var/lib/mysqlsocket=/var/lib/mysql/mysql.sockuser=mysql# Disabling symbolic-links is recommended to prevent assorted security riskssymbolic-links=0[mysql]default-character-set=utf8[mysqld_safe]log-error=/var/log/mysqld.logpid-file=/var/run/mysqld/mysqld.pid

 

 

 把表删除了

 

 

 重新创建表

create table webCount( titleName varchar(255) CHARACTER SET utf8 DEFAULT NULL, count int(11) DEFAULT NULL )ENGINE=lnnoDB DEFAULT CHARSET=utf8;

 

 

重新在运行一次程序

 

 

可以看到没有中文乱码了。

 

同时我们通过可视化工具连接mysql查看

 

 

转载于:https://www.cnblogs.com/braveym/p/8620696.html

你可能感兴趣的文章
CentOS下安装pip
查看>>
缩略图生成脚本timthumb用法
查看>>
C#综合笔记
查看>>
类库中的Controller未被加载到项目中
查看>>
Java序列化与反序列化(实践)
查看>>
生产计划问题与投资问题
查看>>
《Python自动化运维之路》 业务服务监控(二)
查看>>
Cheatsheet: 2018 03.01 ~ 2018 03.31
查看>>
每日一库:microAjax.js
查看>>
Do not pour out HDU - 5954 数学积分
查看>>
IntelliJ Idea 常用快捷键列表及技巧大全
查看>>
自己开发的“万能数据库查询分析器”终于有了较大的成果
查看>>
和菜鸟一起深入学习国嵌实验之进程间通信
查看>>
JavaScript跨域总结与解决办法
查看>>
CSU1007: 矩形着色
查看>>
CentOS 6.5 X64 U盘启动盘制作
查看>>
java function
查看>>
SpringBoot 访问jsp文件报错Path with "WEB-INF" or "META-INF": [WEB-INF/jsp/welcome.jsp]的解决办法...
查看>>
CSS常用命名
查看>>
嘻嘻嘻,想不到吧
查看>>