Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

一:准备数据源

在项目下新建一个student.txt文件,里面的内容为:

1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18 

二:实现

Java版:

1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

package com.cxd.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
 String sid;
 String sname;
 int sage;
 public String getSid() {
  return sid;
 }
 public void setSid(String sid) {
  this.sid = sid;
 }
 public String getSname() {
  return sname;
 }
 public void setSname(String sname) {
  this.sname = sname;
 }
 public int getSage() {
  return sage;
 }
 public void setSage(int sage) {
  this.sage = sage;
 }
 @Override
 public String toString() {
  return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
 }

}
		

2.转换,具体代码如下

package com.cxd.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class TxtToParquetDemo {
 public static void main(String[] args) {

  SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  reflectTransform(spark);//Java反射
  dynamicTransform(spark);//动态转换
 }

 /**
  * 通过Java反射转换
  * @param spark
  */
 private static void reflectTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  JavaRDD<Student> rowRDD = source.map(line -> {
   String parts[] = line.split(",");
   Student stu = new Student();
   stu.setSid(parts[0]);
   stu.setSname(parts[1]);
   stu.setSage(Integer.valueOf(parts[2]));
   return stu;
  });

  Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
  df.select("sid", "sname", "sage").
  coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
 }
 /**
  * 动态转换
  * @param spark
  */
 private static void dynamicTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  JavaRDD<Row> rowRDD = source.map( line -> {
   String[] parts = line.split(",");
   String sid = parts[0];
   String sname = parts[1];
   int sage = Integer.parseInt(parts[2]);

   return RowFactory.create(
     sid,
     sname,
     sage
     );
  });

  ArrayList<StructField> fields = new ArrayList<StructField>();
  StructField field = null;
  field = DataTypes.createStructField("sid", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sname", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
  fields.add(field);

  StructType schema = DataTypes.createStructType(fields);

  Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
  df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");

 }

}

scala版本:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {

 case class Student(id:Int,name:String,age:Int)
 def main(args:Array[String])
 {

 val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
 import spark.implicits._
 reflectCreate(spark)
 dynamicCreate(spark)
 }

 /**
	 * 通过Java反射转换
	 * @param spark
	 */
 private def reflectCreate(spark:SparkSession):Unit={
 import spark.implicits._
 val stuRDD=spark.sparkContext.textFile("student2.txt")
 //toDF()为隐式转换
 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
 //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
 stuDf.printSchema()
 stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //将查询结果写入一个文件
 nameDf.show()
 }

 /**
	 * 动态转换
	 * @param spark
	 */
 private def dynamicCreate(spark:SparkSession):Unit={
 val stuRDD=spark.sparkContext.textFile("student.txt")
 import spark.implicits._
 val schemaString="id,name,age"
 val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
 val schema=StructType(fields)
 val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
 val stuDf=spark.createDataFrame(rowRDD, schema)
  stuDf.printSchema()
 val tmpView=stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //将查询结果写入一个文件
 nameDf.show()
 }
}

注:

1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

2.此代码不适用于spark2.0以前的版本。

以上这篇Java和scala实现 Spark RDD转换成DataFrame的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

时间: 2018-06-07

深入学习java中的Groovy 和 Scala 类

前言 Java 传承的是平台,而不是语言.有超过 200 种语言可以在 JVM 上运行,它们之中不可避免地会有一种语言最终将取代 Java 语言,成为编写 JVM 程序的最佳方式.本系列将探讨三种下一代 JVM 语言:Groovy.Scala 和 Clojure,比较并对比新的功能和范例,让 Java 开发人员对自己近期的未来发展有大体的认识. Java 语言的开发人员精通 C++ 和其他语言,包括多继承(multiple inheritance),使得类可以继承自任意数量的父类.多继承带来的一

Java8与Scala中的Lambda表达式深入讲解

前言 最近几年Lambda表达式风靡于编程界.很多现代编程语言都把它作为函数式编程的基本组成部分.基于JVM的编程语言如Scala.Groovy及Clojure把它作为关键部分集成在语言中.而如今,(最终)Java 8也加入了这个有趣的行列. Java8 终于要支持Lambda表达式!自2009年以来Lambda表达式已经在Lambda项目中被支持.在那时候,Lambda表达式仍被称为Java闭包.在我们进入一些代码示例以前,先来解释下为什么Lambda表达式在Java程序员中广受欢迎. 1.为

浅析Java和Scala中的Future

随着CPU的核数的增加,异步编程模型在并发领域中的得到了越来越多的应用,由于Scala是一门函数式语言,天然的支持异步编程模型,今天主要来看一下Java和Scala中的Futrue,带你走入异步编程的大门. Future 很多同学可能会有疑问,Futrue跟异步编程有什么关系?从Future的表面意思是未来,一个Future对象可以看出一个将来得到的结果,这就和异步执行的概念很像,你只管自己去执行,只要将最终的结果传达给我就行,线程不必一直暂停等待结果,可以在具体异步任务执行的时候去执行其他操作

浅析Java ClassName.this中类名.this关键字的理解

一.this关键字主要有三个应用: (1)this调用本类中的属性,也就是类中的成员变量: (2)this调用本类中的其他方法: (3)this调用本类中的其他构造方法,调用时要放在构造方法的首行. 关键字this用于指代当前的对象.因此,类内部可以使用this作为前缀引用实例成员: this()代表了调用另一个构造函数,至于调用哪个构造函数根据参数表确定.this()调用只能出现在构造函数的第一行. 当在内部类中使用关键字this,指的就是内部类的对象, 为了访问外层类对象,就可以使用外层类名

浅析Java设计模式编程中的单例模式和简单工厂模式

单例模式 动机 有时候只有一个类的实例是很重要的.比如,一个系统应该只有一个窗口管理实例. 单例模式是最简单设计模式:类负责实例化自己,确保只有一个实例,并且提供一个访问这个实例的入口. 目的 1. 确保只有一个实例被创建. 2. 提供访问这个实例的入口. 使用final确保被创建一次,private的构造函数确保不被实例化.public的getInstance方法确保外部能够访问.下面是饿汉模式: public class Singleton { private static final Si

老生常谈java中的Future模式

jdk1.7.0_79 本文实际上是对上文<简单谈谈ThreadPoolExecutor线程池之submit方法>的一个延续或者一个补充.在上文中提到的submit方法里出现了FutureTask,这不得不停止脚步将方向转向Java的Future模式. Future是并发编程中的一种设计模式,对于多线程来说,线程A需要等待线程B的结果,它没必要一直等待B,可以先拿到一个未来的Future,等B有了结果后再取真实的结果. ExecutorService executor = Executors.

深入浅析java web log4j 配置及在web项目中配置Log4j的技巧

在上篇文章给大家介绍了Java log4j详细教程,本文给大家介绍java web log4j配置及web项目中配置log4j的技巧.具体详情请看下文吧. 首先给大家提供log4j.jar下载:http://logging.apache.org/log4j/1.2/download.html 一.java web项目使用log4j 1.在web.xml文件中添加 <!-- 配置log4j --> <context-param> <param-name>webAppRoo

浅析java中stringBuilder的用法

String对象是不可改变的.每次使用 System.String类中的方法之一时,都要在内存中创建一个新的字符串对象,这就需要为该新对象分配新的空间.在需要对字符串执行重复修改的情况下,与创建新的 String对象相关的系统开销可能会非常昂贵.如果要修改字符串而不创建新的对象,则可以使用System.Text.StringBuilder类.例如,当在一个循环中将许多字符串连接在一起时,使用 StringBuilder类可以提升性能. 通过用一个重载的构造函数方法初始化变量,可以创建 Strin

浅析java中String类型中“==”与“equal”的区别

一.前言 1.1.首先很多人都知道,String中用"=="比较的是地址,用equals比较的是内容,很多人对此用的是记忆法,通过记忆来加强此的引用,但是其真正的原理其实并不难,当我们真正明白其为什么的时候,用起来也会更加灵活,更加有底气(形容得不太好,朋友别见怪): 二相关知识的准备 类型常量池 运行时常量池 字符串常量池 我们今天讨论的主题是当然是字符串常量池: 为什么在这要把另外两个常量池拿出说一下呢,首先小生我在网上或者cnds上看到很多人在争论字符串常量池是存在与方法区还是堆

深入浅析Java中的volatile

内存可见性 volatile是Java提供的一种轻量级的同步机制,在并发编程中,它也扮演着比较重要的角色.同synchronized相比(synchronized通常称为重量级锁),volatile更轻量级,相比使用synchronized时引起的线程上下文切换所带来的庞大开销,倘若能恰当的合理的使用volatile,自然是美事一桩. 为了能比较清晰彻底的理解volatile,我们一步一步来分析.首先来看看如下代码 public class TestVolatile { boolean stat

浅析Java中Runnable和Thread的区别

线程的起动并不是简单的调用了你的RUN方法,而是由一个线程调度器来分别调用你的所有线程的RUN方法, 我们普通的RUN方法如果没有执行完是不会返回的,也就是会一直执行下去,这样RUN方法下面的方法就不可能会执行了,可是线程里的RUN方法却不一样,它只有一定的CPU时间,执行过后就给别的线程了,这样反复的把CPU的时间切来切去,因为切换的速度很快,所以我们就感觉是很多线程在同时运行一样. 你简单的调用run方法是没有这样效果的,所以你必须调用Thread类的start方法来启动你的线程.所以你启动

浅析Java中对象的创建与对象的数据类型转换

Java:对象创建和初始化过程 1.Java中的数据类型     Java中有3个数据类型:基本数据类型(在Java中,boolean.byte.short.int.long.char.float.double这八种是基本数据类型).引用类型和null类型.其中,引用类型包括类类型(含数组).接口类型.     下列语句声明了一些变量: int k ; A a; //a是A数据类型的对象变量名. B b1,b2,-,b10000;// 假定B是抽象类或接口. String s; 注意:从数据类型