接下来做的操作是:(这个操作,将程序打成jar包到集群中运行)
(1)编写spark程序在线上的hive中创建表并导入数据
(2)查询hive中的数据
(3)将查询结果保存到MySQL中
代码:
创新互联专注于隆回网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供隆回营销型网站建设,隆回网站制作、隆回网页设计、隆回网站官网定制、小程序制作服务,打造隆回网络公司原创品牌,更为您提供隆回网站排名全网营销落地服务。
object SparkSqlTest {
def main(args: Array[String]): Unit = {
//屏蔽多余的日志
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
//构建编程入口
val conf: SparkConf = new SparkConf()
conf.setAppName("SparkSqlTest")
val spark: SparkSession = SparkSession.builder().config(conf)
.enableHiveSupport() //这句话表示支持hive
.getOrCreate()
//创建sqlcontext对象
val sqlContext: SQLContext = spark.sqlContext
//创建sparkContext
val sc: SparkContext = spark.sparkContext
//创建数据库
var sql=
"""
|create database if not exists `test`
""".stripMargin
spark.sql(sql)
//使用当前创建的数据库
sql=
"""
|use `test`
""".stripMargin
spark.sql(sql)
//创建hive表
sql=
"""
|create table if not exists `test`.`teacher_basic`(
|name string,
|age int,
|married boolean,
|children int
|) row format delimited
|fields terminated by ','
""".stripMargin
spark.sql(sql)
//加载数据
sql=
"""
|load data local inpath 'file:///home/hadoop/teacher_info.txt'
|into table `test`.`teacher_basic`
""".stripMargin
spark.sql(sql)
//执行查询操作
sql=
"""
|select * from `test`.`teacher_basic`
""".stripMargin
val hiveDF=spark.sql(sql)
val url="jdbc:mysql://localhost:3306/test"
val table_name="teacher_basic"
val pro=new Properties()
pro.put("password","123456")
pro.put("user","root")
hiveDF.write.mode(SaveMode.Append).jdbc(url,table_name,pro)
}
}
打jar包到集群中运行:https://blog.51cto.com/14048416/2337760
作业提交shell:
spark-submit \
--class com.zy.sql.SparkSqlTest \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 512M \
--total-executor-cores 1 \
file:////home/hadoop/SparkSqlTest-1.0-SNAPSHOT.jar \
然后满怀期待的等待着success,不幸的是,当程序运行到一半的时候异常终止了:
我查看了一下打印的日志:
我上网查了好多资料,都说是hive的版本过高,what? I‘not why!!
然后想了想,我在集群中,使用spark的程序,去在hive表中进行操作,那么是不是spark需要和hive整合一下啊,然后我又上网查了spark如何整合hive,总的来说就是将hive的元数据库共享出去,让spark可以访问。
具体操作:
①在hive的hive-site.xml加入:
hive.metastore.uris
thrift://hadoop01:9083 #在哪里启动这个进程
②在相应的节点上启动在hive-site.xml中配置的进程
nohup hive --service metastore 1>/home/hadoop/logs/hive_thriftserver.log 2>&1 &
ps:这里需要注意一下,nohup是后台启动的,而且所有的信息都是定向输出的,这条命令使用之后,一定要检查一下这个命令是否真的执行成功了:
使用:jsp查一下是否有相应的进程启动,如果没有表示启动失败,肯定是 /home/hadoop/logs这个父目录没有创建,然后创建这个目录之后,在启动,在检查是否启动成功!!!!!!!
③将hive-site.xml复制到$SPARK_HOME/conf下(注意是每一个节点都要复制)
④测试是否成功:spark-sql,如果正确进入并且可以访问hive的表,表示spark整合hive成功!!!
之后我有将原来的程序,重新跑了一次,结果 没有报错,程序运行成功!!!
我不敢相信,我又查看了一下MySQL的表:
确认 程序成功!!!!!!