spark编程 mysql得不到数据
这里说明一点:本文提到的解决SparkinsertIntoJDBC找不到Mysql驱动的方法是针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。这是因为在分布式环境下,加载mysql驱动包存在一个Bug,1.3及以前的版本--jars分发的jar在executor端是通过Spark自身特化的classloader加载的。而JDBCdrivermanager使用的则是系统默认的classloader,因此无法识别。可行的方法之一是在所有executor节点上预先装好JDBCdriver并放入默认的classpath。
不过Spark1.4应该已经fix了这个问题,即--jars分发的jar也会纳入YARN的classloader范畴。
今天在使用Spark中DataFrame往Mysql中插入RDD,但是一直报出以下的异常次信息:
[itelbog@iteblog~]$bin/spark-submit--masterlocal[2]
--jarslib/mysql-connector-java-5.1.35.jar
--classspark.sparkToJDBC./spark-test_2.10-1.0.jar
sparkassemblyhasbeenbuiltwithHive,includingDatanucleusjarsonclasspath
Exceptioninthread"main"java.sql.SQLException:Nosuitabledriverfoundfor
jdbc::3306/spark?user=root&password=123&useUnicode=
true&characterEncoding=utf8&autoReconnect=true
atjava.sql.DriverManager.getConnection(DriverManager.java:602)
atjava.sql.DriverManager.getConnection(DriverManager.java:207)
atorg.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1189)
atspark.SparkToJDBC$.toMysqlFromJavaBean(SparkToJDBC.scala:20)
atspark.SparkToJDBC$.main(SparkToJDBC.scala:47)
atspark.SparkToJDBC.main(SparkToJDBC.scala)
atsun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)
atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
atjava.lang.reflect.Method.invoke(Method.java:597)
atorg.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$
$runMain(SparkSubmit.scala:569)
atorg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
atorg.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
atorg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
atorg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
感觉很奇怪,我在启动作业的时候加了Mysql驱动啊在,怎么会出现这种异常呢??经过查找,发现在--jars参数里面加入Mysql是没有用的。通过查找,发现提交的作业可以通过加入--driver-class-path参数来设置driver的classpath,试了一下果然没有出现错误!
[itelbog@iteblog~]$bin/spark-submit--masterlocal[2]
--driver-class-pathlib/mysql-connector-java-5.1.35.jar
--classspark.SparkToJDBC./spark-test_2.10-1.0.jar
其实,我们还可以在spark安装包的conf/spark-env.sh通过配置SPARK_CLASSPATH来设置driver的环境变量,如下:
(这里需要注意的是,在Spark1.3版本中,在Spark配置中按如下进行配置时,运行程序时会提示该配置方法在Spark1.0之后的版本已经过时,建议使用另外两个方法;其中一个就是上面讲到的方法。另外一个就是在配置文件中配置spark.executor.extraClassPath,具体配置格式会在试验之后进行补充)
exportSPARK_CLASSPATH=$SPARK_CLASSPATH:/iteblog/com/mysql-connector-java-5.1.35.jar
这样也可以解决上面出现的异常。但是,我们不能同时在conf/spark-env.sh里面配置SPARK_CLASSPATH和提交作业加上--driver-class-path参数,否则会出现以下异常:
查看源代码打印帮助
[itelbog@iteblog~]$bin/spark-submit--masterlocal[2]
--driver-class-pathlib/mysql-connector-java-5.1.35.jar
--classspark.SparkToJDBC./spark-test_2.10-1.0.jar
SparkassemblyhasbeenbuiltwithHive,includingDatanucleusjarsonclasspath
Exceptioninthread"main"org.apache.spark.SparkException:
Foundbothspark.driver.extraClassPathandSPARK_CLASSPATH.Useonlytheformer.
atorg.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply
$7.apply(SparkConf.scala:339)
atorg.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply
$7.apply(SparkConf.scala:337)
atscala.collection.immutable.List.foreach(List.scala:318)
atorg.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:337)
atorg.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:325)
atscala.Option.foreach(Option.scala:236)
atorg.apache.spark.SparkConf.validateSettings(SparkConf.scala:325)
atorg.apache.spark.SparkContext.(SparkContext.scala:197)
atspark.SparkToJDBC$.main(SparkToJDBC.scala:41)
atspark.SparkToJDBC.main(SparkToJDBC.scala)
atsun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethod)
atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
atjava.lang.reflect.Method.invoke(Method.java:597)
atorg.apache.spark.deploy.SparkSubmit$.org$apache$spark$
deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
atorg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
atorg.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
atorg.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
atorg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Spark编程题
现有100W+条数据存储在hdfs中的userinfo文件夹中的多个文件中,数据格式如下:张三|男|23|未婚|北京|海淀李四|女|25|已婚|河北|石家庄1.数据中所有人的平均年龄2.数据中所有男性未婚的人数和女性未婚人数3.数据中20-30已婚数量前3的省份package spark08import org.apache.spark.rdd.RDDimport org.apache.spark.util.LongAccumulatorimport org.apache.spark.{SparkConf, SparkContext}*张三|男|23|未婚|北京|海淀*李四|女|25|已婚|河北|石家庄* 1.数据中所有人的平均年龄* 2.数据中所有男性未婚的人数和女性未婚人数* 3.数据中20-30已婚数量前3的省份* 4.未婚比例(未婚人数/该城市总人数)最高的前3个城市object UserInfo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val sc = new SparkContext(conf)//读取原始文件val strFile: RDD[String] = sc.textFile("D:\\data\\data\\userinfo")val srcRdd: RDD[(String, String, Int, String, String, String)] = strFile.map(t => {val strings: Array[String] = t.split("\\|")val name: String = strings(0)val gender = strings(1)val age = strings(2).toIntval isMarry: String = strings(3)val province = strings(4)val city = strings(5)(name, gender, age, isMarry, province, city)srcRdd.cache()//1.数据中所有人的平均年龄 李四|女|25|已婚|河北|石家庄val ageAccumulator: LongAccumulator = sc.longAccumulator //使用累加器统计总人数val ageCount: Int = srcRdd.map(t => {ageAccumulator.add(1)t._3}).reduce(_ + _)val ageNumber = ageAccumulator.valueval avgAge = ageCount.toLong/(ageNumber*1.0)println(s"所有人的平均年龄为${avgAge}")//2.数据中所有男性未婚的人数和女性未婚人数val genderAndMarryRDD: RDD[(String, Iterable[(String, String)])] = srcRdd.map(t => {(t._1, t._3) //性别,婚否}).filter(_._2.equals("未婚")).groupBy(_._1) //按性别分组val res2RDD: RDD[(String, Int)] = genderAndMarryRDD.mapValues(t=>t.size)res2RDD.collect().foreach(println)//数据中20-30已婚数量前3的省份 李四|女|25|已婚|河北|石家庄val res3: Array[(Int, String)] = srcRdd.filter(t => {t._3 >= 20 && t._3 <= 30 && t._4.equals("已婚")})//删选出满足20-30已婚的数据,按省份分组,求v的size即是20-30已婚数量.groupBy(_._5).mapValues(_.size)//k,v互换取前3.map(t => (t._2, t._1)).top(3)res3.foreach(println)//(城市,(未婚人数,已婚人数))//未婚比例(未婚人数/该城市总人数)最高的前3个城市 李四|女|25|已婚|河北|石家庄