Spark SQL 2.4.8 操作 Dataframe的两种方式

目录
  • 一、测试数据
  • 二、创建DataFrame
    • 方式一:DSL方式操作
    • 方式二:SQL方式操作

一、测试数据

7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,JAMES,CLERK,7698,1981/12/3,9500,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10

二、创建DataFrame

方式一:DSL方式操作

  • 实例化SparkContext和SparkSession对象
  • 利用StructType类型构建schema,用于定义数据的结构信息
  • 通过SparkContext对象读取文件,生成RDD
  • 将RDD[String]转换成RDD[Row]
  • 通过SparkSession对象创建dataframe
  • 完整代码如下:
package com.scala.demo.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

object Demo01 {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkContext和SparkSession对象
    val sc = new SparkContext(new SparkConf().setAppName("Demo01").setMaster("local[2]"))
    val sparkSession = SparkSession.builder().getOrCreate()

    // 2. 使用StructType来定义Schema
    val mySchema = StructType(List(
      StructField("empno", DataTypes.IntegerType, false),
      StructField("ename", DataTypes.StringType, false),
      StructField("job", DataTypes.StringType, false),
      StructField("mgr", DataTypes.StringType, false),
      StructField("hiredate", DataTypes.StringType, false),
      StructField("sal", DataTypes.IntegerType, false),
      StructField("comm", DataTypes.StringType, false),
      StructField("deptno", DataTypes.IntegerType, false)
    ))
    // 3. 读取数据
    val empRDD = sc.textFile("file:///D:\\TestDatas\\emp.csv")

    // 4. 将其映射成ROW对象
    val rowRDD = empRDD.map(line => {
      val strings = line.split(",")
      Row(strings(0).toInt, strings(1), strings(2), strings(3), strings(4), strings(5).toInt,strings(6), strings(7).toInt)
    })

    // 5. 创建DataFrame
    val dataFrame = sparkSession.createDataFrame(rowRDD, mySchema)

    // 6. 展示内容 DSL
	dataFrame.groupBy("deptno").sum("sal").as("result").sort("sum(sal)").show()
  }
}

结果如下:

方式二:SQL方式操作

  • 实例化SparkContext和SparkSession对象
  • 创建case class Emp样例类,用于定义数据的结构信息
  • 通过SparkContext对象读取文件,生成RDD[String]
  • 将RDD[String]转换成RDD[Emp]
  • 引入spark隐式转换函数(必须引入)
  • 将RDD[Emp]转换成DataFrame
  • 将DataFrame注册成一张视图或者临时表
  • 通过调用SparkSession对象的sql函数,编写sql语句
  • 停止资源
  • 具体代码如下:
package com.scala.demo.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}

// 0. 数据分析
// 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
// 1. 定义Emp样例类
case class Emp(empNo:Int,empName:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptNo:Int)

object Demo02 {
  def main(args: Array[String]): Unit = {
    // 2. 读取数据将其映射成Row对象
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo02"))
    val mapRdd = sc.textFile("file:///D:\\TestDatas\\emp.csv")
      .map(_.split(","))

    val rowRDD:RDD[Emp] = mapRdd.map(line => Emp(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt))

    // 3。创建dataframe
    val spark = SparkSession.builder().getOrCreate()
    // 引入spark隐式转换函数
    import spark.implicits._
    // 将RDD转成Dataframe
    val dataFrame = rowRDD.toDF

    // 4.2 sql语句操作
    // 1、将dataframe注册成一张临时表
    dataFrame.createOrReplaceTempView("emp")
    // 2. 编写sql语句进行操作
    spark.sql("select deptNo,sum(sal) as total from emp group by deptNo order by total desc").show()

    // 关闭资源
    spark.stop()
    sc.stop()
  }
}

结果如下:

到此这篇关于Spark SQL 2.4.8 操作 Dataframe的两种方式的文章就介绍到这了,更多相关Spark SQL 操作 Dataframe内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2021-10-12

DataFrame:通过SparkSql将scala类转为DataFrame的方法

如下所示: import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, Da

浅谈DataFrame和SparkSql取值误区

1.DataFrame返回的不是对象. 2.DataFrame查出来的数据返回的是一个dataframe数据集. 3.DataFrame只有遇见Action的算子才能执行 4.SparkSql查出来的数据返回的是一个dataframe数据集. 原始数据 scala> val parquetDF = sqlContext.read.parquet("hdfs://hadoop14:9000/yuhui/parquet/part-r-00004.gz.parquet") df: or

SparkSQL使用IDEA快速入门DataFrame与DataSet的完美教程

目录 1.使用IDEA开发Spark SQL 1.1创建DataFrame/DataSet 1.1.1指定列名添加Schema 1.1.2StructType指定Schema 1.1.3反射推断Schema 1.使用IDEA开发Spark SQL 1.1创建DataFrame/DataSet 1.指定列名添加Schema 2.通过StrucType指定Schema 3.编写样例类,利用反射机制推断Schema 1.1.1指定列名添加Schema //导包 import org.apache.sp

spark rdd转dataframe 写入mysql的实例讲解

dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行

pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换实例

代码如下,步骤流程在代码注释中可见: # -*- coding: utf-8 -*- import pandas as pd from pyspark.sql import SparkSession from pyspark.sql import SQLContext from pyspark import SparkContext #初始化数据 #初始化pandas DataFrame df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1

java 日期各种格式之间的相互转换实例代码

java 日期各种格式之间的相互转换实例代码 java日期各种格式之间的相互转换,直接调用静态方法 实例代码: java日期各种格式之间的相互转换,直接调用静态方法 package com.hxhk.cc.util; import java.text.SimpleDateFormat; import java.util.Date; import com.lowagie.text.pdf.codec.postscript.ParseException; public class DateUtil

pytorch numpy list类型之间的相互转换实例

如下所示: import torch from torch.autograd import Variable import numpy as np ''' pytorch中Variable与torch.Tensor类型的相互转换 ''' # 1.torch.Tensor转换成Variablea=torch.randn((5,3)) b=Variable(a) print('a',a.type(),a.shape) print('b',type(b),b.shape) # 2.Variable转换

spark: RDD与DataFrame之间的相互转换方法

DataFrame是一个组织成命名列的数据集.它在概念上等同于关系数据库中的表或R/Python中的数据框架,但其经过了优化.DataFrames可以从各种各样的源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD. DataFrame API 可以被Scala,Java,Python和R调用. 在Scala和Java中,DataFrame由Rows的数据集表示. 在Scala API中,DataFrame只是一个类型别名Dataset[Row].而在Java API中,用户需要

Python实现从SQL型数据库读写dataframe型数据的方法【基于pandas】

本文实例讲述了Python实现从SQL型数据库读写dataframe型数据的方法.分享给大家供大家参考,具体如下: Python的pandas包对表格化的数据处理能力很强,而SQL数据库的数据就是以表格的形式储存,因此经常将sql数据库里的数据直接读取为dataframe,分析操作以后再将dataframe存到sql数据库中.而pandas中的read_sql和to_sql函数就可以很方便得从sql数据库中读写数据. read_sql 参见pandas.read_sql的文档,read_sql主

Python pandas DataFrame操作的实现代码

1. 从字典创建Dataframe >>> import pandas as pd >>> dict1 = {'col1':[1,2,5,7],'col2':['a','b','c','d']} >>> df = pd.DataFrame(dict1) >>> df col1 col2 0 1 a 1 2 b 2 5 c 3 7 d 2. 从列表创建Dataframe (先把列表转化为字典,再把字典转化为DataFrame) >

python中pandas.DataFrame的简单操作方法(创建、索引、增添与删除)

前言 最近在网上搜了许多关于pandas.DataFrame的操作说明,都是一些基础的操作,但是这些操作组合起来还是比较费时间去正确操作DataFrame,花了我挺长时间去调整BUG的.我在这里做一些总结,方便你我他.感兴趣的朋友们一起来看看吧. 一.创建DataFrame的简单操作: 1.根据字典创造: In [1]: import pandas as pd In [3]: aa={'one':[1,2,3],'two':[2,3,4],'three':[3,4,5]} In [4]: bb=

python pandas dataframe 行列选择,切片操作方法

SQL中的select是根据列的名称来选取:Pandas则更为灵活,不但可根据列名称选取,还可以根据列所在的position(数字,在第几行第几列,注意pandas行列的position是从0开始)选取.相关函数如下: 1)loc,基于列label,可选取特定行(根据行index): 2)iloc,基于行/列的position: 3)at,根据指定行index及列label,快速定位DataFrame的元素: 4)iat,与at类似,不同的是根据position来定位的: 5)ix,为loc与i

将pandas.dataframe的数据写入到文件中的方法

导入实验常用的python包.如图2所示. [import pandas as pd]pandas用来做数据处理.[import numpy as np]numpy用来做高维度矩阵运算.[import matplotlib.pyplot as plt]matplotlib用来做数据可视化. pandas数据写入到csv文件中: [names = ['Bob','Jessica','Mary','John','Mel']]创建一个names列表[ births = [968,155,77,578,

pandas.DataFrame的pivot()和unstack()实现行转列

示例:有如下表需要进行行转列: 代码如下: # -*- coding:utf-8 -*- import pandas as pd import MySQLdb from warnings import filterwarnings # 由于create table if not exists总会抛出warning,因此使用filterwarnings消除 filterwarnings('ignore', category = MySQLdb.Warning) from sqlalchemy im